This part covers the concurrent parts of ZIO including STM, fiber, queue, promise, and semaphore. It is based on the video Atomically {delete your actors}.

1 STM

STM is Software Transactional Memory. It provides the ability to atomically commit a series of reads and writes to transactional memeory where a set of conditions is satisfied. STM retries when the conditions are not met and rollbacks when there is a concurrent change.

STM[E, A] is a transaction which models reads and writes and can fail, retry or suceed. TRef[A] is a transactional reference to an immutable value, which is read and written inside STM transactions. STMs are composable.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
trait TRef[A] {
  val get: STM[Nothing, A]
  def set(newValue: A): STM[Nothing, Unit]
  def update(f: A => A): STM[Nothing, A]
  def modify[B](f: A => (B, A)): STM[Nothing, B]
}

object TRef {
  def make[A](a: A): STM[Nothing, TRef[A]]
}

trait STM[+E, +A] {
  def commit: IO[E, A] = STM.atomically { this }
}

object STM {
  def atomically[E, A](stm: STM[E, A]): IO[E, A]
}

STM has map, flatMap, succeed, fail, fold, foldM, retry (asynchronously and atomatically retry), zip, check (suspend if condition is not satisfied), filter (check and return), orElse, collect (filter and map).

2 Sample Implementation of Semaphore, Promise and Queue

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Semaphore
type Semaphore = TRef[Int]

def makeSemaphore(n: Int): UIO[Semaphore] = TRef.make(n).commit

def acquire(semaphore: Semaphore, n: Int): UIO[Unit] =
  (for {
    value <- semaphore.get
    _ <- STM.check(value >= n)
    _ <- semaphore.set(value - n)
  } yield()).commit

def release(semaphore: Semaphore, n: Int): UIO[Unit] =
  semaphore.update(_ + n).commit

// Promise is an asynchronous variable that can be set only once
type Promise[A] = TRef[Option[A]]

def makePromise[A]: UIO[Promise[A]] = TRef.make(None).commit

def complete[A](promise: Promise[A], v: A): UIO[Boolean] =
  (for {
    value <- promise.get
    change <- value match{
      case Some(_) => STM.succeed(false)
      case None => promise.set(Some(v)) *> STM.succeed(true)
    }
  } yield change).commit

def await[A](promise: Promise[A]): UIO[A] =
  promise.get.collect { case Some(a) => a }.commit


// Queue: doublely backpressured-queue
case class Queue[A](capacity: Int, tref: TRef[ScalaQueue[A]])

def makeQueue[A](capacity: Int): UIO[Queue[A]] =
  TRef.make(ScalaQueue.empty[A]).commit.map(Queue(capacity, _))

def offer[A](queue: Queue[A], a: A): UIO[Unit] =
  (for {
    q <- queue.tref.get
    _ <- STM.check(q.length < queue.capacity)
    _ <- queue.tref.update(_ enqueue a)
  } yield ()).commit

def def take[A](queue: Queue[A]): UIO[A] =
  (for {
    q <- queue.tref.get
    a <- q.dequeueOption match {
      case Some((a, as)) => queue.tref.set(as) *> STM.succeed(a)
      case _ => STM.retry
    }
  } yield a).commit

3 Concurrency Data Types

3.1 Semaphore

Use Semaphore.make(permits=count) to create a semaphore with specified permits and use Semaphore#withPermit(count)(effect) to run an effect when the requried permits are acquired.

3.2 Promise

A Promise[E, A] is a variable of IO[E, A] type that is set exactly once. It is often used by fibers to pass values to each other. Promise.make[E, A] returns UIO[Promise[E, A] that is a description of the promise creation. A promise should be created inside IO.

A promise can be completed by the following methods:

  • succeed: succeed with a value of type A
  • done: succed with Exit[E, A]
  • complete: execute an effect once and the result will be propagated to all waiting fibers
  • completeWith: execute an effect for each of the waiting fiber
  • fail: fail with an error type E
  • die: defect with Throwable
  • halt: fail with Cause[E]
  • interrupt: interrupt

The completion of a promise result in an UIO[Boolean] that represents whether the value has been set ture or was already set false.

Other methods include await to wait and get result, poll to get without wait, isDone check if it is completed.

3.3 Queue

The ZIO Queue is composable, transparent back-pressure, asynchronous (no locks or blocking), purely-functional and type-safe. It has tow basic operations: offer and take. A queue can be unbounded Queue.unbounded or bounded. For bounded, it could be blocking Queue.bounded, drop new Queue.dropping, drop old Queue.sliding.

Other operations include offerAll, takeUpTo, takeAll, poll, shutdown, awaitShutdown.

Use ZQueue to transform queues. The methods are map, mapM, contramapM, bothWith.