This is a note for learning Akka Actor and Stream based on the official document.

Actors

An Actor extends AbstractActor class and defines message handling methods. The Actor should handle all messages, usually with a default case matchAny, to avoide UnhandledMessage event.

Creation

Use ReceiveBuilder to create an AbstractActor.Receive that is the actor’s initial behavior. Akka supports hotswapping the Receive using the context.become() or context.unbecome() method. This makes an actor a FSM.

Actors are created by passing a Props instance into the actorOf method of ActorSystem or ActorContext. Props is an immutable configuration class for actor creation. Use one of the three methods:

1
2
3
4
import akka.actor.Props;
Props props1 = Props.create(MyActor.class);
Props props2 = Props.create(ActorWithArgs.class, "arg");
Props props3 = Props.create(ActorWithArgs.class, () -> new ActorWithArgs("arg"));

The lambda variant (resulting in a non-serializable Props) is used outside of actors.

Actors are automatically started asynchronously when created.

A good practice is to provide a static factor method in each Actor to create its Props. Another good practice is to declare messages within its actor.

Use an implementation of IndirectActorProducer to define an actor factory. Avoid singleton actor.

Actor API

  • getSelf: a reference to itself.
  • getSender: a reference to the sender of the last received message.
  • supervisorStrategy: the strategy to supervise child actors.
  • getContext: the contextual information including fctory methods such as actorOf, the actor system, parent supervisor, lifecycle monitoring, hotswap behavior stack and etc.
  • preStart, preRestart, postRestart, postStop: lifecycle hooks.

Actor Path, UID and ActorRef

When an Actor is created, a path and a random UID is created to identify the Actor. The path and random UID are reused when an actor is restarted. An ActorRef represnts an incarnation for each restarted Actor.

ActorSelection points to the path, not the ActorRef thus it cannot be watched. Sending Identify message to ActorSelection or call resolveOne method to get an ActorRef.

Communication

tell means “fire-and-forget”. It sends a message asynchronously and return imediately. ask returns a CompletionStage representing a possible reply.

Use target.forward(result, getContext()) to forward a message while keeping the original sender addresss/reference.

A good practice is not to call methods or access mutable state on the enclosing actor from within the callback. The callback is scheduled concurrently to the enclosing actor.

Inbox allows receiving multiple replies (subscribing a notification service) and wathcing another actor’s lifecycle.

Exception

An exception may be thrown when an actor is processing a message. When it happens, the message is lost. The mailbox is not affect. The actor is suspended and the supervison process is started.

Persistence

The Akka persistence extension comes with few built-in persistence plugins, including in-memory heap based journal, local filesystem based snapshot-store and LevelDB based journal. The pesistence enables stateful actors to persist their state so that it can be recovered when it restarts (recover from error or migration). Only the events received by the actor are persisted, not the actual state of the actor. Event persistence is append only.

One type of persistence is “Event Sourcing”. An actor receives a command, validates the command and generates as the effect of the command. The event is then persisted and is used to change the actor’s state by an event handler – no commands between the persist call and the execution of the the associated event handler. When the actor needs to be recovered, it replays events to recove to the latest state. In cases of query commands, an actor doesn’t change state.

Streams

To make an Actor streaming-ready, you need to take care of overflow of buffer and mailboxes. Messages can be lost too. Akka Streams API is created to support back-pressure and bounded resource usage. Akka Streams API is decoupled from the Reactifve Streams interfaces.

The official document gives a good description about the relationship between Akka Stream API (an end-user API) and Reactive Streams:

1
While Akka Streams focus on the formulation of transformations on data streams the scope of Reactive Streams is to define a common mechanism of how to move data across an asynchronous boundary without losses, buffering or resource exhaustion. The relationship between these two is that the Akka Streams API is geared towards end-users while the Akka Streams implementation uses the Reactive Streams interfaces internally to pass data between the different operators

This SO thread gives a good introduction of Akka streams. The concept is also explained in this SO thread.

The Idea

Streams always start flowing from a Source<Out,M1> then can continue through Flow<In,Out,M2> elements or more advanced operators to finally be consumed by a Sink<In,M3>. The first type parameter designates the kind of elements produced by the source while the M type parameters describe the object that is created during materialization. NotUsed (from the scala.runtime package) means that no value is produced, it is the generic equivalent of void.

Sometimes we want to split source stream into two streams or vice versa. These elemets form “fan-out” or “fan-in” structures referred to as “junctions” in Akka Streams. Akka separates the linear stream structures (Flows) from the non-linear, branching ones (Graphs). The graph has a different set of APIs that can express arbitrarily complex stream setups.

Akka can propagate back-pressure or control the amount of incoming data using a bounded-buffer. For example: buffer(10, OverflowStrategy.dropHead()).

Design Principles

Akka streams are designed that

  • all features are explicit in the API, no magic
  • supreme compositionality: combined pieces retain the function of each part
  • exhaustive model of the domain of distributed bounded stream processing

The “exhaustive” means that all tools are provided: back-pressure, buffering, transformation, failure recovery, etc. All things built are reusable in a larger context.

Dropped stream elements are not sent to the dead letter office. An error is a normal data element, while a failure is collapsing. In the low level, an error is received in onNext while a failure is caught in onError. Stream fialure will tear down the stream without waiting for processing to finish, all in-flight elements are discarded. Stream cancellation will propagate upstream leading to upstream processing steps being terminated without having processed all of their inputs. User need to handle failures outside of the Akka Streams facilities by methods such as cleaning up after timeout.

A materialization function produces a materialized value of a graph that is a specific object useful for interactions such as shutting it down or extracting matrics. The ActorMaterializer is bound to the lifecycle of the ActorRefFactory it is created from, either an ActorSystem or ActorContext. Avoid create a new actor materializer inside actor by passing context.system because the actor materializer is bound to the ActorSystem that may cause leaks.

Akka treats the Reactive Streams specification as an SPI that is hidden behind Source and Sink. All materialization are restricted to having a single Subscriber. All fan-out is done using Broadcast[T].

Terms

  • Graph: a stream processing topology defines the pathways through which elements shall flow when the stream is running. A graph has one or more operators. A runnable graph starts from a source, optinally connects data via one or more flows, and ends in a sink. A graph is an immutable process definiont that only runs by a materializer.
  • Operator: an immutable building block used to build up a graph. In Java, an operator is actually a Graph<S extends Shape, M>. An operator is made from different Shape. For example, the bi-directional flow has a signature of BidiFlow<BidiShape<I1, O1, I2, O2>, Mat>. A RunnableGraph is an operator(flow) that has both ends attached to a Source and a Sink respectively, and is ready to run.
  • Shape: A shape is actually a list of input (Inlet) and output (Outlet). Common shapes include BidiShape (a bi-directional, one input-output flow for each direction), ClosedShape (a graph has no input and no ouput, can be materialized, used by RunnableGraph), FanInShape (multipel inputs), FanOutShape (multiple outputs), FlowShape(one input and one output shape), SinkShape (only one input), SourceShape (only one output).
  • Materialization: Stream materialization takes a graph/flow and run it with required resources. It is triggered by “terminal operations” that include the various forms of the run, runWith, and runForeach method. It runs synchrounously on the materializing thread, usually from a pre-configured thread pool.
  • Operator Fusion: by default, Akka fuses the stream operators, i.e., all processing steps of a flow are executed within the same Actor. As a result, the results are passing in the single thread and steps run sequentially. To change the default behavior, use async() call in the middle of a flow.

Combining Materialized Values

Every operator can provide a materialized value therefore it is helpful to combine the resulting values. Some examples:

  • Source.<Integer>maybe(): an empty source that can be shutdown.
  • throttler: throttle elements to 1/second.
  • Sink.head(): return the first.
  • source.via(flow).to(sink): a common pattern that keeps the leftmost stage materialization value.
  • source.viaMat(flow, Keep.right()).to(sink): select the flow right value.
  • source.via(flow).toMat(sink, Keep.right()): select the sink right value.
  • source.via(flow).runWith(sink, mat): return the materialized value of the stage added by runwith. Here it is the materialized value of sink.
  • flow.to(sink).runWith(source, mat): return the materialized value of soruce.
  • flow.runWith(source, sink, mat): return a pair of materialized values of source and sink.
  • source.viaMat(flow, Keep.both()).to(sink): return a pair of materialized values of source and flow.
  • source.via(flow).toMat(sink, Keep.both()): return a pair of materialized values of source and sink.
  • source.viaMat(flow, Keep.both()).toMat(sink, Keep.both()) return a pair of values whose first value is another pair of materialized value of source and flow. The 2nd value is the materialized value of sink.
  • source.viaMat(flow, Keep.right()).toMat(sink, Keep.both()) return a pair of materialized values of flow and sink.
  • Use mapMaterializedValue to get the materilized values.

For materialized value powered sources, like Source.queue, Source.actorRef, or Source.maybe, you can call prematerialize(mat) to get a pair of the materialized alue and another source.

Working Graphs

Akka uses a graph-resembling DSL to make building graph easy. Graphs are built from simple flows and junctions. Akka provides the following junctions:

  • Fan-out: Broadcast<T>, Balance<T>, UnzipWith<In, A, B, ...>, Unzip<A, B>.
  • Fan-in: Merge<In>, MergePreferred<In>, MergePrioritized<In>, MergeLatest<In>, ZipWith<A, B, ..., Out>, Zip<A, B>, Concat<A>.

The graph builder object is mutable to enable simpler creation of complex graphs. The builder.add() method makes a copy of the blueprint and return the inlets and outlets of the resulting copy so that they can be wired up. Another alternative is to passing existing graphs into the factory method to produce a new graph. The difference is that builder.add() ignores the materialized value of the imported grpah while importing via the factory method allows its inclusion.

You can use GraphDSL.create to create partial graphs and then connect them togehter. A partial graph verifies that all ports are either connected or part of the returned shape. Sometimes it is useful to expose a complex graph as a simple structures such as a Source, Sink, or Flow.

To create a Source, you must have a Graph with a SourceShape Source.fromGraph(GraphDSL.create( builder -> {... return SourceShape.of(outlet)})). To create a Sink, use Source.fromGraph(GraphDSL.create( builder -> {... return SinkShape.of(inlet)})). Similarly, to create a Flow, using Flow.fromGraph(GraphDSL.create(builder -> ... return FlowShap.of(inlet, outlet))).

There are simplified API to combine sources and sinks. It is easy to build reusable, encapsulated components of arbitrary input and output ports using the graph DSL.

Akka has a set of predefined shapes:

  • SourceShape, SinkShape, FlowShape for simple shapes.
  • UniformFanInShape, UniformFanOutShape for junctions with multiple input/output ports of the same type.
  • FanInShape1, FanInShape2,…, FanOutShape1, FanOutShape12, … for junctions with multiple input/output ports of different types.