1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
use std::time::Duration;
use ferrite_session::prelude::*;
use tokio::time::sleep;
type IntStream = Rec<SendValue<u64, Z>>;
fn producer(count: u64) -> Session<IntStream>
{
fix_session(step(async move {
println!("[producer] Producing value: {}", count);
sleep(Duration::from_secs(1)).await;
send_value(count, producer(count + 1))
}))
}
fn consumer<A: Protocol>() -> Session<ReceiveChannel<IntStream, A>>
{
receive_channel(move |stream| {
unfix_session(
stream,
receive_value_from(stream, move |count| {
println!("[consumer] Received value: {}", count);
include_session(consumer(), |next| {
send_channel_to(next, stream, forward(next))
})
}),
)
})
}
pub fn stream_session() -> Session<End>
{
let p1 = producer(0);
let p2 = consumer();
apply_channel(p2, p1)
}
#[tokio::main]
pub async fn main()
{
run_session(stream_session()).await;
}