This part covers the ZIO tests. It is based on the ZIO document of Data Types. A small number of data types help you develop applications in a functional approach. All the data types are just values that

  • describe effectful or asynchronous actions
  • manage resources
  • are used to make concurrent programming safe and simple.

1 Data Types for Side Effects

IO and newwork code interacte with outside thus they are impure, partial and having side effects. Most these calls can be encapsulated by the IO[E, A], Task[A] and UIO[A] data types.

  • Pure values: All the apply and succeed meethods of UIO, IO and ZIO can be used. UIO represents code that can not fail.
  • Effectful synchronous code:
    • code can fail: the effect method of ZIO and IO.
    • code cannot fail: effectTotal method of UIO ,IO and ZIO.
    • handle failures: refineOrDie keeps some errors and fails with the rest.
  • Effectful asynchronous code:
    • code can fail: effectAsync, effectAsyncMaybe, effectAsyncM of IO and ZIO.
    • code cannot fail: effectAsync, effectAsyncMaybe, effectAsyncM of UIO

All the effectful async method take a register parameter that has a type of (ZIO[R, E, A] => Unit) => Any. The first part is a callback function of type ZIO[R, E, A] => Unit. The callback function will be used to wrap the async results that first converted into an effect.

The following code is an example of file processing. It may die with an exception because of the ZIO.orDie call.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import zio.{ App, Task, UIO }
import java.io.{ File, FileInputStream }
import java.nio.charset.StandardCharsets

object Main extends App {

  // run my bracket
  def run(args: List[String]) =
    mybracket.orDie.as(0)

  def closeStream(is: FileInputStream) =
    UIO(is.close())

  // helper method to work around in Java 8
  def readAll(fis: FileInputStream, len: Long): Array[Byte] = {
    val content: Array[Byte] = Array.ofDim(len.toInt)
    fis.read(content)
    content
  }

  def convertBytes(is: FileInputStream, len: Long) =
    Task.effect(println(new String(readAll(is, len), StandardCharsets.UTF_8))) // Java 8
  //Task.effect(println(new String(is.readAllBytes(), StandardCharsets.UTF_8))) // Java 11+

  // mybracket is just a value. Won't execute anything here until interpreted
  val mybracket: Task[Unit] = for {
    file   <- Task(new File("/tmp/hello"))
    len    = file.length
    string <- Task(new FileInputStream(file)).bracket(closeStream)(convertBytes(_, len))
  } yield string
}

2 Fiber

A fiber is a lightweight thread of exeuction that are good for computation-intensive operation or async operations. Fibers are created by forking ZIO effects. Fibers can be joined or interrupted. Interruption terminates a fiber and safely releasing all resources.

2.1 Creating a Fiber

The fork method of an effect generates an URIO[R, Fiber[E, A]], An IO effect generates an UIO[Fiber[E, A]]. A Fiber[E, A] fails with an erorr of type E or succeeds with a value of type A.

2.2 Terminating a Fiber

A fiber may be terminated for the following reasons:

  • A fiber is slef-terminated or interrupted. The “main” fiber cannot be interrupted.
  • A fiber fails to handle some error of type E. This can happen only when an IO.fail is not handled.
  • A fiber has a defect that leads to a non-recoverable error. There are two possible ways this can happen:
    • A parital function (a function that may throw an exception) is passed to a high order function. Don’t pass inpure function to pure funciton.
    • Error-throwing code was embedded into some value vai IO.effectTotal. Use IO.effect.

2.3 Parallelism

Parallelism is achieved by using the Par suffix methods such as zipPar, zipWithPar, collectAllPar, foreachPar, reduceAllPar, and mergeAllPar.

Two IO actions can race.

2.4 Scheduling and Shifting Thread

Fibers may shift thread. Fibers attempt to execute on the same thread for a configurable minimum period of time before yielding to other fibers. Fibers that resume from async callbacks will resume on the intiating thread and continue some time before yielding and resuming on the runtime thread pool. The configuration can be changed in Runtime.

2.5 FiberRef

It’s the fiber’s counterpart for Java’s ThreadLocal. Value is automatically propagated to child on fork and merged back in after joining child.

The FiberRef.make[A] method will create an UIO[FiberRef[A]]

3 Managed Resources

ZIO use ZManaged[R, E, A] for a resource of type A that require release after use. It automatically acquire resources before the use and release resources after use. The ZManaged value wraps a value of type ZIO[R, E, Reservation[R, E, A]] where the result has a type of final case class Reservation[-R, +E, +A](acquire: ZIO[R, E, A], release: Exit[Any, Any] => ZIO[R, Nothing, Any]).

It comes with 5 related types:

1
2
3
4
5
type RManaged[-R, +A]  = ZManaged[R, Throwable, A]
type URManaged[-R, +A] = ZManaged[R, Nothing, A]
type Managed[+E, +A]   = ZManaged[Any, E, A]
type UManaged[+A]      = ZManaged[Any, Nothing, A]
type TaskManaged[+A]   = ZManaged[Any, Throwable, A]

The most used type is Managed[E, A] that created by Magaged object methods. For example:

1
2
3
4
def doSomething(queue: Queue[Int]): UIO[Unit] = IO.unit

val managedResource = Managed.make(Queue.unbounded[Int])(_.shutdown)
val usedResource: UIO[Unit] = managedResource.use { queue => doSomething(queue) }

A queue is created when use is called and shutdown will be call when doSomthing completes.

A managed can be created from an effect using Managed.fromEffect or a pure value by Managed.succeed, both don’t need a release function.

Use ZManaged if the resource needs an environment of type R.

Values of Managed can be combined using flatMap.

4 Promise

Use Promise to communicate between two or more fibers.

A Promise[E, A] is a variable of type IO[E, A] that can be set excactly once. Creation of promise involves mutable memory allocation that should be described by an IO. Use Promise.make to create a value of type UIO[Promise[E, A]]. There are many ways to set its value:

  • succeed(a): succeed with a value of type A
  • complete(effect) or completeWith(effect): complete with an effect of type IO[E, A]
  • fail(e): fail
  • die(new Throwable(...)): die with a value of Throwable type
  • halt(e): fail with an error of type Cause[E]
  • interrupt: interrupt

All set methods returen a value of UIO[Boolean] type that shows whether it is set (true) or was already set(false).

Use await method to get a value from a promise. The awaiting fiber is suspended until the promise is set. Use poll or isDone method to check without blocking.

5 Queue

A Queue[A] contatins values of type A and has two basic operations: offer and take. Queue types include bounded (back-pressured), dropping, sliding and unbounded.

The general type of ZQueue[RA, RB, EA, EB, A, B] means that the queue can be offered A, depends on RA and may fail with EA. It can yield value of type B, depends on RB and may fail with EB.

A Queue[A] is defined as type Queue[A] = ZQueue[Any, Nothing, Any, Nothing, A, A] that means it cannot fail, doesn’t depend on any resource and provide the same type of value as its offered value type.

Queue operations include map, mapM, contramapM, both and bothWith.

6 Ref

The ZRef type is used for concurrently read and write a single immutable-value. The Ref object exposes two methods of ZRef:

1
2
3
4
5
6
object Ref extends Serializable {

  def make[A](a: A): UIO[Ref[A]] = ZRef.make(a)

  def makeManaged[A](a: A): UManaged[Ref[A]] = ZRef.makeManaged(a)
}

A ZRef[EA, EB, A, B] has two fundamental operations: set and get. set sets a value of type A and may fail with an error of type EA. get gets a value of B and may fail with an error of type EB. When the error and value types of the ZRef are unified as ZRef[E, E, A, A], the ZRef also supports atomic modify and update operations. All operations are guaranteed to be safe for concurrent access. The value of ZRef should be immutable to keep this gurantee because it is implemented using campare and swap operatons in a loop rather than synchronization for performance reason.

7 Semaphore

A Semaphore data type synchronize fibers by safely acquiring and releasing a permit. It is based on Ref[A].

  • Semaphore.make(permits): to create a semaphore of type UIO[Semaphore] with the specified number of permits.
  • available: the nubmer of available permits
  • withPermit(task): acquire a permit for a task

When task is completed, whether succeeds or fails, the acquired permits are released.

8 Schedule

Schedules are composable recurrence used for repetition and retries.

  • Repetition
    • IO#repeat
    • IO#repeateOrElse
    • IO#repeatOrElse0
  • Retires
    • IO#retry
    • IO#retryOrElse
    • IO#retryOrElse0

A Schedule[R, A, B] consumes A values, and based on the inputs and the internal state, decides whether to continue or halt. Every decsioin has a (possibly zero) delay, and an output value of type B. Schedules can be composed by && (use the longer one), || (use the shorter one), <||>/andThen(run in sequence, one after another one).

Basic schedules:

  • Schedule.forever
  • Schedule.never
  • Schedule.recurs(10)
  • Schedule.spaced(10.milliseconds)
  • Schedule.exponential(10.milliseconds)
  • Schedule.fibonacci(10.milliseconds)

Customization: delayed, jittered.

9 Chunk

A Chunk[A] represents a chunk of values of type A. It is backed by arrays but expose a purely functional, safe interface and is lazy on operations.

10 STM Data Types

  • TArray
  • TMap
  • TPromise
  • TQueue
  • TRef
  • TSet