This is a study note of the Lagom framework. It is based on the Lagom documents.

1 Introduction

Lagom is an opinioned framework that provides tools and APIs acting like guide rails to develop reactive microservice application. Lagom handles service location by service registry and service gateway, communication protocols by service descriptor API, data persistence(ES and CQRS) by persistent entity, publish-subscribe model by message broker API, and service client creation for you.

Build and Running in Dev Mode

Every service has an API project and an implementation project. The API project defines the service interface as well as the request and response data models. The implementation project depends on the API project and provides API implementation.

All projects should add Lagom sbt plugin addSbtPlugin("com.lightbend.lagom" % "lagom-sbt-plugin" % "X.Y.Z") that provides functions to build, run and deploy an application. The API project needs Lagom API library libraryDependencies ++= Seq(lagomScaladslApi). The implementation project needs to enable LagomScala plugin to add necessary settings and dependencies to run the project in development mode.

Behind the runAll, there are serveral services start running: an embedded service locator, an embedded service gateway, a Cassandra server, a Kafka server and services of your applications.

2 Writing Lagom Services

2.1 Service API Definition

A service API definition defines a service descriptor that includes the following members:

  • a name of the service
  • the calls the service provides
  • the topics and topic properties the service provides -a list of ServiceAcl (a combination of a REST method and a route path string)
  • a service locator flag: default is true to make it locatable
  • a service gateway flag: default is false to not publish the routes
  • a circuit breaker
  • a header filter
  • an exception serializer.

A service method returns a ServiceCall that has the following definition:

1
2
3
trait ServiceCall[Request, Response] {
  def invoke(request: Request): Future[Response]
}

Therefore calling a service returns a call handler that can be invoked.

Each service call needs an identifier that specifies the routing information. The identifier is one of three types: a static name, a static path, and a rest path. A name identifier is either the method name call(methodName) or namedCall("newName", methodName). A path identifier is configured by pathCall("path", methodName). If there is no input type, as in ServiceCall[NotUse, _], the HTTP method will be get. Otherwise, it will be post. The rest path uses restCall(Method.OP, "path", methodName) to define a service call.

Every service has a request type and a response type. akka.NotUsed can be used in their places. A data type can be stric or streamed. A strict data is represented by a Scala object. When both request and response are strict, the call is a reqest-response synchronous call.

A streamed data is has a type of Source. Lagom uses WebSocket as treaming protocol. The WebSocket is closed when either direction closes.

2.2 Serialization

The call, namedCall, pathCall and restCall take an implicit MessageSerializer for each of the request and repsone data. Lagom provides serializers for basic Scala types such as String. Addtitionally, Lagom provides an implicit converter to convert a Play JSON Format type class to a message serializer.

Define a request/response data type using case class MyMessage(...) and define a JSON format in its companion object object MyMessage { implicit val format: Format[MyMessage] = Json.format[MyMessage] }.

2.3 Service Call Implementation

The ServiceCall.apply() factory method takes a function of Request => Future[Response] and returns a ServiceCall instance whose invoke method delegates to the function. Calling a service method returns the same ServiceCall instance. The reason behind this is for function composition.

Though ServiceCall provides handleRequestHeader and handleResponseHeader methods to access request header and response header, it is recommended to use ServerServiceCall.invokeWithHeader() method that comes with a requestHeader parameter and returns a responseHeader in addition to the request parameter and the response result.

To compose service calls, use ServerServiceCall.compose(...) or ServerServiceCall.composeAsync(...). The compose method takes a request header and returns a service call.

2.4 Application Loader

Lagom uses compile time DI. The application loader needs to provides all dependencies for loading an application. Lagom provides two help classes: LagomApplicationLoader and LagomApplication.

The LagomApplication has two required abstract fields: a service binding and a service locator. Additionally, you can mix in the web service (AhcWSComponents),persistence component(SlickPersistenceComponents and HikariCPComponents or other persistent components), message component (LagomKafkaComponents, LagomKafkaClientComponents) and other objects used in your application such as the service binding, entity serializer registry, read repository, cluster sharding initialization.

LagomApplicationLoader is an abstract class that requires defintion for load, loadDevMode, and describeService. In load method, you create an application instance by providing the service locator that is missing in the abstract LagomApplication class. For Akka services, use new MyApplication(context) with AkkaDiscoveryComponents. If you don’t use other serivces, set override def serviceLocator = ServiceLocator.NoServiceLocator.

2.5 Consuming Service

The LagomServiceClientComponents, inclucded in LagomApplication, provides a serviceClient that can be used to bind a service client: serviceClient.implement[HelloService].

All service calls with Lagom service clients are by default using circuit breakers. Circuit breakers are used and configured on the client side, but the granularity and configuraiton identifiers are defined by the service provider.

3 Persistent and Cluster

3.1 Domain Model

Domain model is defined in terms of command, event and state. The unit of a domain model is an entity. An entity is an aggregate root in DDD. An entity may have multiple states and each state may process different set of commands and events. In Akka Cluster, a sharded actor is called an entity.

Akka persistence uses states to model the basic Akka Actor Behavior. An event handler may change state to change its behavior. This is differnt from an Aka actor in that behavior is often defined as a method with parameters (states) that calls the Behaviors.receive method depend on its current state.

The key concept is that the state tencodes the command it can handle, event it can persist and states it can transit. The recommended style for an actor is to define a trait as the state base, then define a case class extending the trait for each state. These case classes are called entities or DDD aggregates. Each entity defines command handlers and event handers for its states.

The commands define what an entity can receive. Each command usually defines a reply of type replyTo: ActorRef[R]. The reply tells the send whether the command is succeeded or rejected, or return the query data. The only communication between the actor and caller must be done via this reply parameter.

The command handler usually 1) validates a command, 2) persist one or more events that express the mutation, and 3) after the events are persisted, reply a message to sender. The successful event persistence will corresponding trigger event handlers. Because an aggregate is intended to model a consistency boundary, the validation should only use the command data and local state data. Any external call should be considered a smell because it means that the Aggregate is not in full control of the invariants it’s supposed to be protecting.

Applying a command returns a ReplyEffect[Event, MyEntity]. There are two ways to create the returns: using Effect.reply(replayTo)(replyMessage) when there is no event to be persisted and Effect.persist(event).thenReply(replyTo)(entity => process(entity)) when there is an event. The process(entity) use the updated entity to generate a reply. You may run side effects inside the command handler.

The event handlers are pure functions that mutate the state by applying events to it.An event handler has to be pure because it will be used in instantiating the state and replaying the event journal. It has a signature of Event => State. It takes an event and return a new entity state that may change the actor behavior. All Lagom events to be persisted must extends AggregateEvent for tagging purpose.

Because there is only one state class for persistence actor, use immutable case class and return a new copy for the new state.

A command handler method is to give it a name of onCommandName. Similarly, an event handler method has a name of onEventName.

3.2 Use Persistent Entity

Persisted events are used 1) for replay of the state of the aggregate when it neds to be instantiated; 2) to generate read-side views; and 3) to publish events to a message broker for external consumption. Lagom provides an AggregateEventTag trait and its companion object create an event tag. The read-side processor and topic producers use this tag. Define an Event in the state’s companion object:

1
2
3
object Event {
  val Tag: AggregateEventShards[Event] = AggregateEventTag.sharded[Event](numShards = 10)
}

The Akka Persistece Typed requires a tager function of Event => Set[String]. It is provided by AkkaTaggerAdapter.fromLagom(entityContext, Event.Tag)).

Each entity needs an persistence id. The id should be the business id combined with entity type in an Akka Cluster. An EntityTypeKey has a name to uniquely identify the type of an entity in the cluster. It is defined as in the entity’s companion object as val typeKey: EntityTypeKey[MyCommand] = EntityTypeKey[MyCommand]("MyModelName"). The persistence id is create by PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId.

To run the model as an actor with snapshot defintion, use the following code template in the state companion object:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def apply(entityContext: EntityContext[Command]): Behavior[Command] = {
  EventSourcedBehavior
    .withEnforcedReplies[Command, Event, ShoppingCart](
      persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
      emptyState = ShoppingCart.empty,
      commandHandler = (cart, cmd) => cart.applyCommand(cmd),
      eventHandler = (cart, evt) => cart.applyEvent(evt)
    )
    .withTagger(AkkaTaggerAdapter.fromLagom(entityContext, Event.Tag))
    .withRetention(
        RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
}

During application loading, initialize the Akka cluster with entity type key.

1
2
3
4
5
 clusterSharding.init(
    Entity(MyEntity.typeKey) { entityContext =>
      MyEntity(entityContext)
    }
  )

3.3 Use and Manage Entities

To access an entity instance in a service implemenation, use ClusterSharding.entityRefFor(MyState.entityTypeKey, id). Akka Cluster will create a new one or use an existing one for the specified entity type and id. The EntityRef is similar to an ActorRef but it denotes the sharded actor.

You should use ask pattern to access an aggregate entity. The ask method take a function of ActorRef[Response] => MyCommand and an implicit timeout.

Akka supports programmatic passivation and automatic passivation. The default automatic passivation is generally good enough.

3.4 Serialization

All commands, replies, events, state snapshots and ActorRef[T] need to be serializable. Akka recommends Jackson-based serializer JSON. Each date type can use different serializers except that ActorRef[T] has to use Akka Jaskson serializer.

4 Read Side

4.1 Read Side

The read-side is responsible for the data query in CQRS. A read side processor ReadSideProcessor handles the event stream produced by persistent entities and tracks which event it has handled uisng offsets. The Lagom’s builtin Cassandra and realtional database read-side support handles the offset tracking automatically to make sure that an event is processed at least once. All events with a particular tag can be consumed as a sequential, ordered stream of events. You should check if an event is processsed before to have the exactly once semantics.

To implement a custom read-side process, override the ReadSideProcess[E].buildHandler(event, offset) method. Lagom provides a help class ReadSideHandler[E] to define the process. You need to register a read-side processor.Lagom will create many read side processor, one for each shard.

Lagom provides SlickReadSide to build a read side processor for relational database. It creates an offset table and loads offsets when there is a need. lagom.persistence.jdbc.create-tables.auto=false disables the auto creation of the offset table. In addition to the global prepare, use builder.setEventHandler[EventType](envolpe => dbIOAction) to process each event type.

Another tool to get a stream of persistent events is directly from PersistentEntityRegistry.eventStream(tag, fromOffset) method. It never complete. Each will queries the persistence database to fetch new events.

The read side connection pool should start and stop when Lagom starts and stops. blocking operation should run in a separate thread pool.

4.2 Publish-Subscribe

Know as a messaging pattern, the publish-subscribe pattern publishes messages to topics without knowledge of sbuscribers. Simialarly, the subscriber recives messages published to a topic without knowledge of publishers.

To publish and subscribe between services, you should use Lagom’s message broker support that provides at-least-once delivery.

5 Message Broker

Lagom provides a message broker API that allow services to communicate asynchronously.

To publish data to a topic, you need to declare the topic in its service descriptor. The Descriptor.withTopics method accepts a sequence of topic calls, each topic call can be defined via the topic method on the Service object. The latter takes a topic name (i.e., the topic identifier), and a method reference that returns a Topic instance.

It is better to take the stream of events from your persistent entities, and adapt that to a stream of messages sent to the message broker. In this way, you can ensure at least once processing of events by both publishers and consumers, which allows you to guarantee a very strong level of consistency throughout your system. During the publishing method, you can convert an event or fileter events.

Use TopicProducer.taggedStreamWithOffset(Event.Tag)(eventStream => Source) to create a topic. The source is created by persistentEntityRegistry.eventStream(tag, fromOffset). To subscribe to a topic, a service just needs to call Topic.subscribe on the topic of interest. You inject a service client first. When calling Topic.subscribe you will get back a Subscriber instance. The atLeatOnce method allows each message published to the greetings topic is received at least once, but possibly more. The subscriber also offers a atMostOnceSource that gives you at-most-once delivery semantics. If in doubt, prefer using at-least-once delivery semantics.

Subscribers are grouped together via Subscriber.withGroupId. A subscriber group allows many nodes in your cluster to consume a message stream while ensuring that each message is only handled once by each node in your cluster.