[From the sandbox] What you need to know before moving to the Akka toolkit to implement Event Sourcing and CQRS

[From the sandbox] What you need to know before moving to the Akka toolkit to implement Event Sourcing and CQRS


Hello, dear readers of Habr. My name is Rustem and I am the main developer in the Kazakhstan IT company DAR. In this article, I’ll tell you what you need to know before switching to Event Sourcing and CQRS using the Akka toolkit.


From about 2015, we began to design our ecosystem. After analyzing and relying on the experience of working with Scala and Akka, we decided to focus on the Akka toolkit. We also had successful implementations of Event Sourcing templates with CQRS and not so much. Accumulated expertise in this area, which I want to share with readers. We will look at how Akka implements these patterns, as well as what tools are available and talk about Akka pitfalls. I hope that after reading this article, you will have a better understanding of the risks of switching to Akka toolkit.


Many articles were written on Habré and other resources on the topics of CQRS and Event Sourcing. This article is intended for readers who already understand what CQRS and Event Sourcing are. In the article I want to concentrate on Akka.


Domain-Driven Design


A lot of materials have been written about Domain-Driven Design (DDD). There are both opponents and supporters of this approach. From myself I want to add that if you decide to switch to Event Sourcing and CQRS, then it would be good to learn DDD. In addition, the DDD philosophy is felt in all Akka tools.


In fact, Event Sourcing and CQRS are just a small part of the big picture called Domain-Driven Design. When designing and developing, you may have many questions about how to correctly implement these templates and integrate them into the ecosystem, and knowing DDD will make your life easier.


In this article, the term entity (entity by DDD) will denote Persistence Actor which has a unique identifier.

Why Scala?


People often ask us why Scala, not Java. One of the reasons is Akka. The framework itself is written in the Scala language with support for the Java language. Here you need to say that there is also an implementation on .NET , but this is another topic. In order not to provoke a discussion, I will not write than Scala is better or worse than Java. I just tell a couple of examples, which, in my opinion, Scala has an advantage over Java when working with Akka:


  • Unchangeable objects. In Java, you need to write immutable objects yourself. Believe me, it is not easy and not very convenient to constantly write the final parameters. In Scala, the case class is already unchangeable with the built-in copy function
  • Code writing style. When implemented in Java, you will still write in the Scala style, that is, functionally.

Here is an example of actor implementation on Scala and Java:


Scala:


  object DemoActor {
  def props (magicNumber: Int): Props = Props (new DemoActor (magicNumber))
 }

 class DemoActor (magicNumber: Int) extends Actor {
  def receive = {
  case x: int = & gt;  sender ()!  (x + magicNumber)
  }
 }

 class SomeOtherActor extends Actor {
  context.actorOf (DemoActor.props (42), "demo")
//...
 }  

Java:


  static class DemoActor extends AbstractActor {
  static Props props (Integer magicNumber) {
  return Props.create (DemoActor.class, () - & gt; new DemoActor (magicNumber));
  }

  private final Integer magicNumber;

  public DemoActor (Integer magicNumber) {
  this.magicNumber = magicNumber;
  }

  @Override
  public Receive createReceive () {
  return receiveBuilder ()
  .match (
  Integer.class
  i - & gt;  {
  getSender (). tell (i + magicNumber, getSelf ());
  })
  .build ();
  }
 }

 static class SomeOtherActor extends AbstractActor {
  ActorRef demoActor = getContext (). ActorOf (DemoActor.props (42), "demo");
//...
 }  

(An example is taken from from here )


Note the implementation of the createReceive () method using the example of the Java language. Inside, through the ReceiveBuilder factory, pattern-matching is implemented. receiveBuilder () is a method from Akka to support lambda expressions, namely pattern-matching in Java. In Scala, this is implemented natively. Agree, the code in Scala is shorter and easier to read.


  • Documentation and examples. Despite the fact that in the official documentation there are examples in Java, on the Internet almost all examples are on Scala. Also, it will be easier for you to navigate the source code of the Akka library.

In terms of performance, there will be no difference between Scala and Java, since everything runs in the JVM.

Storage


Before implementing Event Sourcing using Akka Persistence, I recommend pre-selecting a database for permanent data storage. The choice of base depends on the system requirements, on your desires and preferences. Data can be stored both in NoSQL and RDBMS, and in the file system, such as LevelDB from Google .


It is important to note that Akka Persistence is not responsible for writing and reading data from the database, but does this through a plugin that the Akka Persistence API has to implement.


After choosing a tool for data storage, you need to select the plugin from the list, or write it yourself. The second option I do not recommend, why reinvent the wheel.


For permanent data storage, we decided to stop at Cassandra. The fact is that we needed a reliable, fast and distributed base. In addition, Typesafe themselves accompany the plugin , which fully implements Akka Persistence API . It is constantly updated and in comparison with others, the Cassandra plugin has more complete documentation.


It is worth mentioning that the plugin also has several problems. For example, there is still no stable version (at the time of this writing, the latest version is 0.97). For us, the biggest problem we encountered when using this plugin was the loss of events when reading Persistent Query for some entities. For the full picture, below is a CQRS chart:



Persistent Entity distributes entity events to tags according to the consistent hash algorithm (for example, 10 shards):



Then, Persistent Query subscribes to these tags and starts a stream that adds data to Elastic Search. Since Cassandra is in a cluster, events will be scattered across the nodes. Some nodes may subside and will respond more slowly than others. There is no guarantee that you will receive events in a strict order. To solve this problem, the plugin is implemented so that if it receives an unordered event, such as entity-A event NR 2 , then it waits for a certain time for the original event and if it does not receive it, it will simply ignore all events of this entities. Even about this there were discussions in Gitter. If anyone is interested, you can read the correspondence between @kotdv and plug-in developers: Gitter

< br/>

How can this misunderstanding be resolved:


  • You need to update the plugin to the latest version. In the latest versions, the Typesafe developers have solved many problems related to the Eventual Consistency. But, we are still waiting for a stable version.
  • More precise settings have been added to the component that is responsible for receiving events.You can try to increase the wait time for unordered events for more reliable plug-in operation: c assandra-query-journal.events-by-tag.eventual-consistency.delay = 10s
  • Set up Cassandra as recommended by DataStax. Put garbage collector G1 and allocate as much memory as possible for Cassandra .

In the end, we solved the problem with missing events, but now there is a stable data delay on the Persistence Query side (from five to ten seconds). It was decided to leave the approach to the data that is used for analytics, and where speed is important, we manually post events to the bus. The main thing is to choose the appropriate mechanism for processing or publishing data: at-least-once or at-most-once. A good description from Akka can be found here . It was important for us to maintain the consistency of the data and therefore, after successfully writing data to the database, we entered a transition state that controls the successful publication of data on the bus. Below is a sample code:


 
 object someEntity {

  sealed trait Event {
  def uuid: String
  }

/**
  * The event that is sent to save.
  */
  case class DidSomething (uuid: String) extends Event

/**
  * Indicator that indicates that the last event has been published.
  */
  case class LastEventPublished (uuid: String) extends Event

/**
  * A container that stores the current state of an entity.
  * @param unpublishedEvents - stores events that are not published.
  */
  case class State (unpublishedEvents: Seq [Event])

  object state {
  def updated (event: Event): State = event match {
  case evt: DidSomething = & gt;
  copy (
  unpublishedEvents = unpublishedEvents: + evt
  )
  case evt: LastEventPublished = & gt;
  copy (
  unpublishedEvents = unpublishedEvents.filter (_. uuid! = evt.uuid)
  )
  }
  }
 }

 class SomeEntity extends PersistentActor {
  ...
  persist (newEvent) {evt = & gt;
  updateState (evt)
  publishToEventBus (evt)
  }
  ...
 }  

If for some reason it was not possible to publish the event, then the next time you start SomeEntity , he will know that the event DidSomething did not reach the bus and try again to republish data.


Serializer


Serialization is an equally important point in using Akka. It has an internal module, Akka Serialization . This module is used to serialize messages when exchanging them between actors and storing them through the Persistence API. The default is Java serializer, but another is recommended. The problem is that Java Serializer is slow and takes up a lot of space. There are two popular solutions: JSON and Protobuf. JSON, though slow, is easier to implement and maintain. If you need to minimize the cost of serialization and data storage, you can stop at Protobuf, but then the development process will go slower. In addition to the Domain Model, you will have to write another Data Model. Do not forget about the versioned data. Be prepared to constantly write mapping between the Domain Model and the Data Model.



Added a new event - write mapping. Change the data structure - write a new version of the Data Model and change the mapping function. Do not forget about the tests for serializers. In general, the work will not be enough, but in the end you will get loosely coupled components.


Conclusions


  • Carefully examine and select the appropriate database and plugin. I recommend to choose a plugin that is well accompanied and will not stop in the development.The area is relatively new, there are still a lot of flaws that have yet to be resolved
  • If you choose distributed storage, you will have to solve the problem with a delay of up to 10 seconds yourself, or accept this
  • The complexity of serialization. You can sacrifice speed and stop at JSON, or choose Protobuf and write a lot of adapters and support them.
  • There are also pluses to this template, these are loosely coupled components and independent development teams that build one large system.

Source text: [From the sandbox] What you need to know before moving to the Akka toolkit to implement Event Sourcing and CQRS