1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use tokio::{
task,
try_join,
};
use crate::internal::{
base::{
once_channel,
unbounded,
unsafe_create_shared_channel,
unsafe_run_session,
unsafe_run_shared_session,
Protocol,
Session,
SharedChannel,
SharedProtocol,
SharedSession,
Value,
},
protocol::{
End,
SendValue,
},
};
pub async fn run_session(session: Session<End>)
{
let (sender, receiver) = once_channel();
let child1 = task::spawn(async move {
unsafe_run_session(session, (), sender).await;
});
let child2 = task::spawn(async move {
receiver.recv().await.unwrap();
});
try_join!(child1, child2).unwrap();
}
pub async fn run_session_with_result<T>(
session: Session<SendValue<T, End>>
) -> T
where
T: Send + 'static,
{
let (provider_end, val_receiver) = <SendValue<T, End>>::create_endpoints();
let child1 = task::spawn(async move {
unsafe_run_session(session, (), provider_end).await;
});
let (Value(val), end_receiver) = val_receiver.recv().await.unwrap();
end_receiver.recv().await.unwrap();
let _ = child1.await;
val
}
pub fn run_shared_session<A>(session: SharedSession<A>) -> SharedChannel<A>
where
A: SharedProtocol,
{
let (chan, _) = run_shared_session_with_join_handle(session);
chan
}
pub fn run_shared_session_with_join_handle<A>(
session: SharedSession<A>
) -> (SharedChannel<A>, task::JoinHandle<()>)
where
A: SharedProtocol,
{
let (sender1, receiver1) = unbounded();
let (session2, receiver2) = unsafe_create_shared_channel();
task::spawn(async move {
info!("[run_shared_session] exec_shared_session");
unsafe_run_shared_session(session, receiver1).await;
info!("[run_shared_session] exec_shared_session returned");
});
let handle = task::spawn(async move {
loop {
let m_senders = receiver2.recv().await;
debug!("[run_shared_session] received sender3");
match m_senders {
Some(senders) => {
sender1.send(senders).unwrap();
}
None => {
info!("[run_shared_session] terminating shared session");
return;
}
}
}
});
(session2, handle)
}