1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
use std::time::Duration;

use ferrite_session::prelude::*;
use tokio::time::sleep;

struct WrapIntStream;

impl Wrapper for WrapIntStream
{
  type Unwrap = IntStream;
}

type IntStream = SendValue<u64, Wrap<WrapIntStream>>;

fn producer(count: u64) -> Session<IntStream>
{
  step(async move {
    println!("Producing value: {}", count);
    sleep(Duration::from_secs(1)).await;

    send_value(count, wrap_session(producer(count + 1)))
  })
}

fn consumer() -> Session<ReceiveChannel<IntStream, End>>
{
  receive_channel(|stream| {
    receive_value_from(stream, move |count| {
      println!("Received value: {}", count);
      unwrap_session(
        stream,
        include_session(consumer(), |next| {
          send_channel_to(next, stream, forward(next))
        }),
      )
    })
  })
}

pub fn stream_session() -> Session<End>
{
  let p1 = producer(0);

  let p2 = consumer();

  apply_channel(p2, p1)
}

#[tokio::main]

pub async fn main()
{
  run_session(stream_session()).await
}