This part covers the ZIO stream. It is based on the [ZIO stream doc] and two videos:Scalaz Stream: Rebirth and Modern Data Driven Applications with ZIO Streams.

Basic ZIO Stream

A Stream[E, A] represents an effectful stream. Use Stream() or Stream.fromIterable() to create a stream. Use map, foreach to process a stream. Use Sink[E, A0, A, B] to consumes values of type A, produce either an error of type E, or a value of type B together with a remainder of type A0. Use run method with a sink to run a stream. A sink has operators such as foldLeft. Streams can be merged, zipped.

Motivation

  • Composable, leak-free data processing: no need to manage resources manually.
  • Incremental computation: easy to handle events.
  • Infinite input sets: handle mass data.

There are three areas of stream processing: first, file and socket processing, second, graph processing, and third, reactive and dataflow. Akka streams combines the first and the second. FS2 combines the second and the third. ZIO stream focuses on the file and socket processing. ZIO stream doesn’t support recursive processing and cycles.

ZIO streams is lazy, leak-free, uniform (interruptable and managed effects), type inferred, economical (small APIs) and integrated with ZIO.

API

Operations

Only has two concepts: Stream and Sink. Stream and Sink operators:

  • Monoid: append, zero
  • Applicative: map, point, ap
  • Monda: bind
  • Bifunctor: lefetMap, bimap
  • laternative: alt, empty

The sink additionally supports:

  • Category: id, compose
  • Profunctor: lmap, rmap, dimap
  • Strong: first, second
  • Choice: left, right

Stream

To create a stream, use the following:

1
2
3
4
5
6
7
8
val empty: Stream[Nothing, Nothing] = Stream.empty
val singleton: Stream[Nothing, Int] = Stream.point(42)
val hundred: Stream[Nothing, Int] = Stream.fromIterable(0 to 100)
val userIn: Stream[IOException, String] = Stream.liftIO(getStrLn)
val intStream: Stream[Nothing, Int] = Stream.fromQueue(intQueue)
val integers: Stream[Nothing, Int] = Stream.unfold(0)(i => (i, i+ 1))
val effectfulLongs: Stream[Nothing, Long] = Stream.unfoldM(s)(f)
val fileBytes: Stream[IOException, Byte] = Stream.bracket(open)(close)(read)

The stream operations are:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
stream.map(f)
stream.flatMap(f)
stream.filter(f)
stream1 ++ stream2
stream.transduce(what)
stream.scan(z)(f)
stream.take(n)
stream.takeWhile(f)
stream.drop(n)
stream.dropWhie(f)
stream.foreach(f)
stream1.zip(stream2)
stream1.merge(stream2)
stream1.joinWith(stream2)(f)
stream.peel(what)

The stream runs with a sink: def run[R](sink: Sink[E, A, A, R]: IO(E, R)).

Sink

The sink operations:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
val answer: Sink[Nothing, Nothing, Any, Int] = Sink.point(42)
val lifted: Sink[Throwable, Nothing, Any, Array[Byte]] = Sink.liftIO(IO.syncThrowable(dangerousOp))
val squared: Sink[Unit, Nothing, Int, Int] = Sink.lift(i => i * i)
val faliure: Sink[DomainError, Nothing, Any, Nothing] = Sink.fail(BadOperation)

// pass any input element to result
val json: Sink[Unit, Nothing, Json, Json] = Sink.await

// comsume any and return ()
val drain: Sink[Nothing, Nothing, Any, Unit] = Sink.drain

// collect elements into a list
val collected: Sink[Nothing, Nothing, Json, List[Json]] = Sink.collect[Json]

// apply step to each element to create state and result
val parser: Sink[Nothing, Char, Char, Either[Error, Int]] =
  Sink.fold((ParserState.initial, List[Int]())) {
    case (ParserState.Digits, c) if c.isDigit => ...
  }

// others
sink.map(f)
sink.contramap(f)
sink.maperror(f)
sink.flatMap(f)
sink ~ otherSink
sink.?
sink.raceBoth(other)
sink.orElse(other)
sink.repeat

Design

There are two different ways to design a functional library: Initial encoding and final encoding.

Initial Encoding

You defina an initial type and then add more types. Use an interpreter the patern match and run a stream.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// initial type
sealed trait Stream[A]

// then add more types for operations
case class Emit[A](a: A) extends Stream[A]  // for emission
case class Map[A0, A](s: Stream[A0], f: A0 => A) extends Stream[A] // mapping
case class Join[A](s: Stream[Stream[A]]) extend Steram[A] // joining
case class Merge[A](l: [Stream[A], r: l: [Stream[A]) extend Steram[A] //  merging
case class Fold[A0, A](a: A, s: Stream[A0], f(A, A0) => ) extends Stream[A] // folding
case class Effect[A](io: IO[A]) extends stream[A] // effect

// run the code using an interpreter function
def drain[A](s: Stream[A]): IO[Unit] = s match {
  case Emit(a) => ...
}

Final Encoding

First choose the target representation. A common data structure is a tree of callback functions. Then define operations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
case class Stream[+A](run: (A => IO[Unit]) => IO[Unit]) {

  def map[B](f: A => B): Stream[B] = Stream(r => run(r.compose(f)))
  def merge[A1> : A](that: Stream[A1]) = Stream[A1](r => run(r).par(that.run(r)).voide)
  def fold[S](s: S)(f: (S, A) => S) = ...
}

object Stream{
  def emit[A](a: => A): Stream[A] = Stream[A](read => read(a))
  def join[A](s: Stream[Stream[A]]) = Stream(r => s.run(sa => sa.run(r)))
  def liftIO[A](io: IO[A]): Stream[A] = Stream[A](io.flatMap(_))
}

Comparison

Feature Initial Final
Flexibl Yes No
Lookahead optimizaton Yes No
Structrued No Yes
Unfirom No Yes

The final encoding is easy to enforce consistency such as cancellation and resource management.