This is a study note of the Akka Stream. It is based on the Stream documents and only covers basic concepts of stream creation, execution and management.

1 Introduction

1.1 Design Principles

Data often come as a stream. Sometimes it can be too big to be treated as a whole. All function calls, even the more general data communication, can also be modeled as a stream of data. The stream model brings in the time dimension and provides flow control (including back pressure, buffering, throttling, delay, etc.), error handling (retry, resume, replace, etc.), and steam topology constructing (fan-in, fan-out, balance, etc.) in addition to the data processing facilities.

The design principles of Akka Stream are

  • APIs are minimal and consistent, may not be intuitive
  • All features are explicit in the API, no magic
  • All components are composable
  • The model of distributed stream processing is exhaustive

Akka strem element can be dropped for a number of reasons:

  • map an element to a different one
  • drop operations such as take, drop, filter, etc.
  • stream failure

If a JVM object needs cleaning is sent into a stream, the cleaning should happen outside the stream via time out or finalizer etc.

The compositionality principle requires the separation of immutable blueprint and execution (materialization). Therefore the blueprints can not be bound to live resources. Dynamic network (???) needs explicit definition.

The materialization process often needs to interact with the processing engine such as shutdown or metrics extraction. A stream execution has an materialized value.

Use of Akka Streams can be divided into two steps: defining reusable pieces that return operators that can be composed, then providing facilities that consume and materialize operators.

Akka Stream provides the following building blocks:

  • Source: one output stream
  • Sink: one input stream
  • Flow: one input and one output stream
  • BidiFlow: two inputs and two output stream
  • Graph: a packaged stream processing topoloty that exposes a certain set of input and output ports. The input and output ports are characterized by a Shape object.

The Reactive Streams interface (Publisher/Subscription/Subscriber) is modeling the low-level infrastructure for passing streams between execution (!!!) units and is a Service Provider Interface (SPI). Akka Stream hides this implementation details and provides user APIs.

There is only limited support for treating onError (called failures in Akka Streams) in Akka Streams compared to the operators that are available for the transformation of data elements (including errors). A recovery element (i.e. any transformation that absorbs an onError signal and turns that into possibly more data elements followed normal stream completion) acts as a bulkhead that confines a stream collapse to a given region of the stream topology. Within the collapsed region buffered elements may be lost, but the outside is not affected by the failure.

1.2 Core Concepts

Akka Streams is a library that process a sequence of elements using a chain of processing entities (graphs) that have bounded buffer space. The core concepts are:

  • Stream: an active process that moves and transforms data.
  • Element: the unit of processing that a stream transform and transfer from upstream to downstream. The buffer size is defined as the number of elements. null is not allowed as an element. Use Option or Either for missing values.
  • Graph: a stream processing topology that defines the pathways elements flow.
  • Operator: a building block of a graph. Operators include processing (map, filter), GraphStage instances, and graph junctions (Merge or Broadcast).
  • Source: an operator with exactly one input.
  • Sink: an operator with exactly one output.
  • Flow: an operator with exactly one input and one output.
  • RunnableGraph: a Flow that has both ends attached to a Source and a Sink respectively. It is ready to run. A Flow attached to a Flow is a new composite Source. A Flow attached to a Sink is a new composite Sink. It is immutable and thread-safe.
  • Materializing: the running of a RunnableGraph. Akka streams starts up Actors to execute the processing with other required resources such as filer handlers or sockect connections. Materilization is performed synchornously on the materializing thread - the globl Materializer of the ActorSystem. A stream can be materailized multiple times and each retruns a new value.
  • Materialized value: the result of a materailization. It is triggered by a teriminal operation such as run(), runWith(), or runForEach() etc. Every operator can produce a materialized value and the composition of those operators decide which materailized value is returned as the final result.
  • Back-presure: a means of flow-control that a consumer tells its producer about how many elements to be sent. Operators use message passing, a non-blocking and asynchronous method, to communicate with each other.
  • Operator Fusion: executing operators wihtin the same Actor. It means a sequential single-thread processing without inter-Actor message passing.
  • Async Boundary: using async or Attributes.asyncBoundary to let operator communicate with the downstream in an asynchrounous fashion. This enables parallel processing.

The materialized value can be anything used for many purposes usch as a promise that feed to a source Source.maybe[T], or a cancellable throttler throttler. By default, via() and to() keep the left value, runWith() keep the right value. Use viaMat or toMat to select the Keep.left, Keep.right or Keep.both. If matrialized values are nested, use mapMaterializedValue to access nested value.

You can use Source.preMaterialize() method to get a materialized value and a new source.

Akka Streams operators preseve input order of element, even for mapAsync. Use mapAsyncUnordered when order is not required. However, juntions such as Merge doesn’t define the order for elments from different input ports. MergePreferred, MergePrioritized or GraphStage can be used to define ordering.

The Materializer is a component used to run a stream. The SystemMaterializer Akka extension provides an ActorSystem wide Materializer when there is an implice ActorSytem.

The Materializer’s life-cycle is bound to the lifecycle of the ActorRefFactory that can be either an ActorSystem (the system materializer) or an ActorContext (when the materializer is created within an Actor). You can shutdown a materializer by calling its shutdown() method.

When a materializer is terminated, it terminates all streams it runs abruptly.

2 Graph

Graphs are stream processing topologies. Graphs are used to define junctions to model fan-in or fanout topologies. Some common operators such as concat are defined as graph junctions. Akka Streams provides the following junctions:

  • Fan-out
    • Broadcast[T]: 1 input emits to each of the multiple outputs.
    • Balance[T]: 1 input emits to one of its multiple outputs.
    • UnzipWith[In, A, B, ...]: takes a function of 1 input that give a value for each input emits N (N<= 20)output elments.
    • UnZip[A, B]: splilt a stream of (A, B) tuples into two streams, one of type A and one of type B.
  • Fan-in
    • Merge[In]: emit randomly from one of mutiple inputs to its output.
    • MergePreferred[In]: emit from the preferred port if it has elements, otherwise, randomly from other ports.
    • MergePrioritized[In]: emit randomly from all inputs based on their priority.
    • MergeLatest[In]: emits List[In], when i-th input stream emits element, the i-th element in emitted list is updated.
    • ZipWith[A, B, ,,, Out]: takes a function of N inputs and emit an output element.
    • Zip[A, B]: Zip two inputs into a (A, B) tuple stream.
    • Concat[A]: first consumes one input then the second one..

2.1 Graph DSL

Graph is defined using the Graph DSL. Junctions must be created with defined type parameters, as otherwise the Nothing type will be inferred.

The import GraphDSL.Implicits._ brings into the ~> operator, read as “edge”, “via”, or “to”. The <~ defines the edge in opposite direction. A graph is immutable, use builder.add() to make a copy of its blueprint that is used in creating a new graph. builder.add() ignores the materialized value of the imported graph. The alternative is to passing exsiting graphs into the factory method GraphDSL.create(graph1, graph2, ...)((_, _, ...) {...}. It is also possible to construct from a list of graphs and collect a list of materialized values.

Use a Shape other than ClosedShape to create a partial graph with any shape of input and output ports. The Source.combine method or Sink.combine method can be used to simplify junction creation with Broadcast, Balance, Merge and Concat.

In general a custom Shape needs to be able to provide 1) all its input and output ports 2) a constrcutor and 3) a deep copy method. The commonly predefined shapes are SourceShape, SinkShape, FlowShape, UniformFanInShape, UniformFanOutShape. Other shapes such as FanInShape1, FanInShape2, …, FanOutShape1, FanOutShape2, … are for juncitons with multiple input/output ports of different types.

The BidiShape has two inputs and two outputs. BidiShape.fromFlows(flow1, flow2) has a shortcut of BidiFlow.fromFunctions(f1, f2).

For complex data processing, create a GraphStage to represent a reusable graph stream processing operator.

Inside a graph, the builder.materializedValue (???) gives an Outlet that can be used in the graph as an ordinary source or outlet, and will eventually emit the materialized value. An Outlet is a typed output of a shape.

To avoid deadlocks in a graph cycle, you can drop elements or define a buffer with OverflowStrategy.fail to fail the stream.

2.2 Modularity, Composition and Hierarchy

Akka Streams composites linear chains using methods of Source, Flow, and Sink classes. Nested Source, Flow, or Sink can have different attributes using withAttributes() or named() methods.

GraphDSL compoistes general complex graphs that contain fan-in, fan-out, directed and non-directed cycles. The RunnableGraph is used to create a general, closed and runnable graph. It is factory that creates

  • a newtwork of running processing entities, inaccessible from the outside.
  • a materialized value, optionally providing a controlled interaction capability with the network.

For linear chains, viaMat and toMat can select the materialized value using Keep.left, Keep.right, and Keep.both. GraphDSL.create() and builder.materializedValue access materialized values at a low level.

Attributes are used to fine tune running entities such as buffer size and name. For hierarchical composition, attributes are inherited by nested modules if they are not defined in the nested modules.

3 Manage Streams

3.1 Buffers and Rate

Use .async to run an operator asynchronously. The operator handles out an element to its downstream running in a different actor and immediately process the next message. Akka Streams uses a windowed, batching backpressure strategy to improve the async processing performance. Akka Streams uses a buffer with a default size of 16 to optimize async operators.

For rate driven operators, setting input buffer to 1 may solve some running issues.

When buffer is full, you can define different strategies to handle the overflow:

  • OverflowStrategy.dropTail: drop from the tail of the buffer.
  • OverflowStrategy.dropNew: drop the new elements.
  • OverflowStrategy.dropHead: drop the head of the buffer.
  • OverflowStrategy.dropBuffer: drop the whole buffer when the buffer is full.
  • OverflowStrategy.fail: fail the stream when the buffer is full.

For fast producer, use batch to buffer elements or conflate to summarize upstream elements.

For slow producer, use extrapolate or expand to generate more elements.

3.2 Dynamic Stream Handling

The KillSwitch trait defines two methods that let you complete or fail a stream from the outside. A stream is completed by both cancelling its upstream and completing (shutdown)/failing (abort) its downstream. The KillSwitches object defines three switches single, shared and singleBidi that can be used to kill one flow stream, multiple flow streams or a single bidi graph.

The Graph DSL doesn’t allow dynamic Sink, Source or Flow. Hubs provide means to contruct Sink and Source paris that are attached to other, allowing one of them be materialized multiple time to implement dynamic fain-in or fan-out.

  • MergeHub implements a dynamic fan-in junction that multiple producers can emit in a FIFO mode, only after the MergeHub.source(...).to() is materialized.
  • BraodcastHub implements a dynamic fan-out junction that multiple consumers can pull the same elements, only after the source.toMat(MergeHub.sink(...)(Keep.right) is materialized.
  • PartitionHub is a special streaming hub that is able to route streamed elements to a dynamic set of consumers. The route can be stateless or stateful.

3.3 Strem Reference

Stream references are references for Source and Sink. They allow running Akka Streams across multiple nodes in an Akka Cluster. Akka Streams manages flow-control over network and use fail-fast for node failures within an Akka Cluster.

Stream references use actor messaging to tranport elemnts and signals. Message can be lost. A dropped demand will be re-sent automatically. A dropped element will fail the stream.

3.4 Error Handling

Akka Streams provides the following error handling methods:

  • recover to emit a final element on upstream failure
  • recoverWithRetries to create a new upstream on upstream failure
  • RestartSource, RestartFlow, and RestartSink to restart the part after a backoff
  • Some oprators support supervision restart, resume, and stop
  • Wrap a stream in an Akka Actor and use Akka supervision strategy

3.6 Subtreams

Substreams are instances of SubSource or SubFlow when a Source or Flow is multiplexed into a stream of streams. SubFlows cannot contribute to the super-flow’s materialized value.

  • Multiplexing operators: groupBy, prefixAndTail, splitAfter, splitWhen.
  • Demultiplexing operators: mergeSubstreams, concatSubstreams, and mergeSubstreamsWithParallelism.
  • Multilexing and demultiplexing: flatMapConcat, flatMapMerge.

4 Custom Stream Processing

Defining operators using flows and the Graph DSL is in general easier and safer. However, to create arbitrary operators with any number of input or output ports, a custom GraphStage is necessary. A GraphStage is an indivisible operator that maintains its own states inside. Graph DSL creates new stream processing operators by composing others. The GraphStage is a subclass of Graph[S, M], where the S is the shape type and M is the materialized value type.

To define a new operator, define a subclass of the GraphStage abstract class, or to generate a materialized value, define a subclass of GraphStageWithMaterializedValue. The class should define a shape that contains the input and output ports, and a createLogic method (or createLogicAndMaterializedValue if there is a materialized value) that returns a GraphStageLogic. Operators are immutable and each materialization will create a new instance of GraphStageLogic to process elements. Internal states should be kept inside the GraphStageLogic class.

For each output port in GraphStageLogic, you need to define setHandler(out, new OutHandler { override def onPull(): Unit = {...}}). The onPull() callback emits element when it is ready to emit an element. push(out, elem), complete(out) and fail(out, exception) are operations on the output port. The default implementation of the onDownStreamFinish() (or onDownstreamFinish(cause: Throwable)) callback is to stop the operator. Override it if necessary.

For each input port in GraphStageLogic, you need to define setHandler(in, new InHandler { override def onPush(): Unit = {...}}). The onPush() callback is called when thre is a new element emitted from the upstream. Use grab(in) to get the element. pull(in) to request new elment and cancel(in) to close the input port. Override the onUpstreamFinish and onUpstreamFailure callbacks if necessary.

For a new operator that is a sink, it should define override def preStart(): Unit = pull(in) to request one element at the startup.

The completeStage() and failStage(exception) complete the operator and all its ports. Manual completion has the risk of resource leaking.

The above are signal-driven APIs. There is another set of APIs such as emit, read, abortEmitting and abortReading that handles emitting and reading in an active fashion.

Create a TimerGraphStageLogic to use timer functions. For async side-channels, use getAsyncCallback() to acquire a AsyncCallback and invoke its invoke method.

Because an operator runs inside an Akka Actor, the callbacks defined in GraphStageLogic are executed in a single thread, no synchornization needed. The mutable states shuould only be accessed within the callbacks. Clean resources in GraphStageLogic.postStop method.

5 Integration

5.1 Integrating with Actors

Use the ask method of the Source/Flow to send a request-reply message to the target actor. There are several ways to get an ActorRef: Sink.actorRefWithBackpressure, Source.actorRef, ActorSource.actorRef, ActorSource.actorRefWithBackpressure, ActorSink.actorRef and ActorSink.actorRefWithBackpressure.

Source.queue can be used for emitting to a stream from an actor or from anything running outside the stream.

5.2 Integrating with External Services

Use mapAsync or mapAsyncUnordered to transform and do side effect in non-stream based external services.

5.3 Integrating with Reactive Streams

Use Source.fromPublisher and Sink.fromSubscriber to integrate with the publisher and subscriber of reactive streams.A Flow can be materialized to a Processor by calling .toProcessor.run().

runWith(Sink.asPublisher) to return a Publisher from a source. runWith(Source.asSubscriber) to return a Subscriber.

5.4 Streaming IO

Akka Streams has several streaming IO including streaming TCP and Streaming File IO.