Friday, January 20, 2012

Building an Event-Sourced Web Application - Part 2: Projections, Persistence, Consumers and Web Interface

UPDATE: A successor of the example application described in this blog post is available here. It is based on the eventsourced library, a library for building reliable, scalable and distributed event-sourced applications in Scala.

A few weeks ago I started a blog post series to summarize my experiences in building an event-sourced web application using Scala and Akka. This was done based on an example application (see branch part-1). There, I gave an overview of the application architecture and presented some details of the immutable domain model and the service layer. Since then, the example application has been extended (see branch part-2) with a number of new features and enhancements. Here's an overview:

Enhancements are:
  • The STM-based state management was completely revised and generalized into the traits UpdateProjection and EventProjection. An UpdateProjection is similar to an Akka Agent: it applies state transition functions asynchronously and can participate in STM transactions. The major difference is that an UpdateProjection is specialized on domain object updates and can log captured events to a persistent event log. By default, update events are logged before state changes are visible via STM references (this is an important difference to the service layer of part 1). UpdateProjection implementors (such as the example application's InvoiceService) are domain event producers. EventProjection implementors, on the other hand, are domain event consumers. They internally use plain Akka Agents to manage state and derive new state values from received domain events (using an application-defined event projection function). EventProjection implementors are usually components that manage read models or coordinate business processes in event-driven architectures, for example.
  • The domain model was enhanced by introducing the domain classes DraftInvoice, SentInvoice and PaidInvoice to represent the states an invoice can have. Valid state transitions are defined by the methods on these domain classes and can therefore be checked by the compiler. This approach is explained more detailed here although the implementation used in our example application slightly differs. In part 1, we only had a single Invoice class and valid state transitions had to be checked at runtime.

New features include:
  • A revised EventLog trait together with two implementations: JournalioEventLog is based on Journal.IO and BookkeeperEventLog on Apache BookKeeper. An EventLog supports synchronous and asynchronous appending of events as well as iterating over stored events, either from the beginning or from an application-defined position in the event history. Event log entries are also assigned sequence numbers so that event consumers can detect gaps in event streams or re-order (resequence) them, if needed.
  • Event consumers. One example is InvoiceStatistics, an EventProjection that derives invoice update statistics from domain events. Here, a separate read model is used (following the CQRS pattern) to serve invoice statistic queries. Another example is InvoiceReplicator, an EventProjection that simply reconstructs the invoice map (as maintained by the InvoiceService) from invoice events at a different location. It can be used to replicate application state across different nodes and to serve (eventually consistent) reads. The replicated state could also be used by a snapshot service to take snapshots of application state. InvoiceReplicator needs to receive events in the correct order and is therefore configured to resequence the received event stream. A third example is the PaymentProcess. It coordinates the activities of InvoiceService and PaymentService. Instead of having these services sending commands to each other, it is the PaymentProcess that sends commands to (i.e. calls methods on) these services in reaction to domain events. This event-driven approach to implementing business processes not only decouples the services from each other but also lets other components extend (or monitor) the business process by subscribing to and reacting on the relevant domain events. The PaymentProcess is currently stateless. Processes that need to maintain state should implement EventProjection and recover the process state during application start (or failover) from the event history.
  • A RESTful web interface for invoices and invoice statistics with support for HTML, XML and JSON representations. These can be negotiated with the HTTP Accept header. The web layer is based on the Jersey web framework (the JAX-RS reference implementation). HTML representations are rendered with the Scalate template engine. The mapping between XML and JSON representations and immutable domain classes is based on JAXB annotations. A JAXB-based XML provider must be supported by any JAX-RS implementation but Jersey additionally comes with a JAXB-based JSON provider so that the same metadata (JAXB annotations) can be used to generate both XML and JSON representations. Following some simple rules, it is possible to JAXB-annotate Scala case classes without polluting them with Java collection types or getters and setters. One major drawback of the current JAX-RS specification is that it doesn't support asynchronous responses yet. This will change with JAX-RS 2.0 and then we can make full use of the asynchronous InvoiceService responses.
  • A communication Channel for connecting domain event producers to consumers. The example application provides a SimpleChannel implementation for local communication. Alternative implementations, for example, could connect to a distributed event bus to communicate events across components of a distributed application. 

Running the example application

The example application can be started with

sbt run-main dev.example.eventsourcing.server.Webserver

Two classes relevant for starting the application are:
  • Appserver: configures the event log, services, read models and processes and connects them via an event channel. It also recovers application state from the event history.
  • Webserver: configures the web service layer and starts an embedded web server (Jetty).
To experiment with the BookKeeper based event log, you need to replace JournalioEventLog with BookkeeperEventLog in Appserver and additionally start a test BookKeeper instance with

sbt run-main dev.example.eventsourcing.server.Zookeeper
sbt run-main dev.example.eventsourcing.server.Bookkeeper

Examples how to interact with the RESTful web interface can be found here.


Service Layer Enhancements

In the service layer implementation from part 1 we've seen how to keep the order of logged events in correspondence with the order of updates to the application state. We used a transient event log that could participate in STM transactions. After the transaction commits, the events from the transient event log have been transferred to a persistent event log. A drawback of this approach is that one can loose updates in case of crashes after an STM reference has been updated but before the changes have been written to the persistent event log. This can lead to situations where clients can already see application state that cannot be recovered from the event log. While some applications may tolerate this, others may require that any visible application state must be fully recoverable from the event log. Therefore, an alternative approach must be chosen.

We need a way to write events, captured during domain object updates, to a persistent event log before the STM reference is updated. But writing to the persistent event log must be done outside an STM transaction for reasons explained in part 1. Updates must also be based on the current (i.e. latest) application state. We therefore need to
  1. Get the current state value from a transactional reference (STM transaction 1)
  2. Update domain object(s) obtained from the current state (no STM transaction)
  3. Write the captured update event(s) to a persistent event log (no STM transaction)
  4. Compute a new state value from the domain object update and write it to the transactional reference (STM transaction 2)
Since steps 1-4 are not a single transaction we must prevent their concurrent execution. This can be achieved using a single actor, for example. This actor is the single writer to the transactional reference. This is actually very similar to how Akka Agents work internally. The main difference in our case is that the computation of a new state value occurs outside an STM transaction, whereas Akka Agents apply update functions within STM transactions. This difference allows us to have side effects, such as writing to a persistent event log. The example application provides the above functionality in the UpdateProjection trait:
  • Instances of UpdateProjection manage (part of) application state with a transactional ref of type Ref[S] where S is the state value type. Clients concurrently read application state via currentState. Sequential writes to the transactional ref are done exclusively by the updater actor (more on write concurrency below).
  • UpdateProjection implementors change application state with the transacted method. The update parameter is a function that computes a domain object Update[Event, B] from current state S where B is a domain object type. The update result (either Success[B] or Failure[DomainError]) is returned as future value from the transacted method.
  • The update function and the underlying Future implementation object (promise) are sent to the updater actor with an ApplyUpdate message. The updater then reads the current state and applies the update function to it. If the update succeeds, it writes the captured events to an EventLog and projects the update result onto the current state. The projection is done with the project function. It creates a new state value from the current state and the update result. The new state value is then finally set on the transactional ref and the promise is completed with the update result.
  • Furthermore, the transacted method can participate in STM transactions. If there's an enclosing transaction, the updater actor will only be triggered if the enclosing transaction successfully commits. If there's no enclosing transaction the updater actor will always be triggered.
The example application uses UpdateProjection to implement the stateful InvoiceService. The state is of type Map[String, Invoice] i.e. a single map containing draft, sent and paid invoices. Here's a simplified version of InvoiceService:

The updateInvoice method uses the transacted method of the UpdateProjection trait. It tries to get an invoice with given invoiceId from the current state and applies the supplied update function f to it. The updateInvoice method is used by updateDraftInvoice for updating draft invoices in the invoice map. The updateDraftInvoice method is used by the service methods addInvoiceItem and sendInvoiceTo. Adding an item to an existing draft invoice yields a future value of an updated draft invoice (return type Future[DomainValidation[DraftInvoice]]). Sending an existing draft invoice, on the other hand, causes a state transition of that invoice to a sent invoice (return type Future[DomainValidation[SentInvoice]]) i.e. the service methods make use of the newly introduced domain object types. The InvoiceService must also implement the abstract members project, initialState and eventLog (declared by Projection and UpdateProjection).
  • The project implementation projects an updated invoice onto the current state by simply adding it to the map of invoices (replacing an old invoice if present).
  • An initialState (empty map by default) and an EventLog instance are provided during construction of an InvoiceService.
The above InvoiceService implementation supports concurrent reads but only sequential writes to the whole invoice map. This may be sufficient for many applications but if a higher degree of write concurrency is needed, one could choose to have a separate UpdateProjection instance per invoice object (which is comparable to have a separate Akka Agent for each invoice object). This allows both concurrent reads and writes to the invoices of an application. The following snippet shows the general idea (not part of the example project).

Here, an InvoiceService maintains a map of PersistentInvoice instances where a PersistentInvoice is an UpdateProjection that contains a reference to a single (draft, sent or paid) invoice. Consequently, updates to different invoices can now be made concurrently. The projection function degenerates to a function that simply returns the updated invoice.

Event Logs

TODO

CQRS and Consistency

TODO

Business Processes

TODO

Web Interface

TODO (see also this blog post)

19 comments:

  1. I can not find right the words how I should thanks to you. Keep up the good work!

    ReplyDelete
  2. Martin you made awesome work!
    Keep going!

    I'll try refactor my OLTP app using yours implementation.
    The problem with CQRS is when you do coordinated update with guard condition, but have to avoid throwing exceptions [in case of guard condition not met]. As example - check an account balance before updating other accounts [in parallel] but if balance check fails - gracefully exit. One of possible solutions is multi-staged coordinated transactions - do you think same?

    ReplyDelete
    Replies
    1. Hi Pavel, glad that you find the implementation useful. Regarding your example, what speaks against checking one account balance and updating other accounts in a single STM transaction (without using CQRS). If you implement an AccountService in the same way as the InvoiceService of the example application (by extending UpdateProjection) you can enclose all relevant read and write operations of that service within a single STM transaction using an atomic { ... } block. Does that help?

      Delete
    2. Thank you Martin! I've read your comments, still can't understand how I can notify [without throwing exceptions] others accountActors participating in the coordinated akka transaction, about failed validation in one of them. I see simple but not elegant way is to wrap coordinated atomic blocks with another "general[not coordinated]" atomic, where I will validate operations against accounts by asking them about their balance and if validation fails just do not send them coordinated updates at all.
      I like the way you deal with abstraction of bussines-logic but dependency on scalaz artifact keeps me from implementing it right now - I don't like scalaz, don't ask - with out any reason :).

      Delete
    3. Are you using transactors? Best you share some code snippets (e.g. on github) showing the relevant parts of your app. I have difficulties to follow your descriptions. Regarding scalaz: I'd really like to ask ;)

      Delete
    4. No transactors - I use Coordinated. Shall I use them just for spreading the message processing logic around several methods? I don't find this doubtful feature useful )
      Sorry - right now I can't be as prompt as you Martin ) - currently too many different projects/activities ). Everything is urgent ).
      Regarding Scalaz - Generally, for everyone: - it is good if you
      1. have time (and brain) to learn it and
      2. spare team of clever guys who will participate and use it,
      else better avoid it.
      For me, Scalaz brings another degree of complexity and requires more time to learn than just scala/akka/camel etc (no time).
      But even scala/akka/camel brings a lot of changes: rethink and redesign of architecture, test and integrate with already working infrastructure etc... I prefer doing such things step-by-step with small increments, evolutionary moving to future but not ruin the present ). And in parallel teach developers team for using new architecture and frameworks, migrating network infrastructure to support both: old (webservices/RPC) and new (event oriented/CQS) technologies.
      Indeed all this stuff are very very interesting!
      Well, anyway I will extract and share the code, as soon as I can.
      And you Marting keep going! To improve the world! ))

      Delete
  3. so cool and how timely for me! still have to digest everything and thinking of ways to save the current application state to a RDBMS for legacy reasons. But this should be not too complicated. All the events are there! Its way better than JPA-Session + reattaching entities in DAO + saveOrUpdate.

    ReplyDelete
  4. This comment has been removed by the author.

    ReplyDelete
  5. After thinking more about this, I actually don't understand at all how you maintain consistency with other aggregates. e.g. a User with his invoices.

    ReplyDelete
    Replies
    1. I'm using 'processes' (see PaymentProcess example at 'Event Consumers'). You can achieve eventual consistency using this approach. To achieve strong consistency, manage the aggregates with the same transactional reference.

      Delete
  6. What happens when you need to persist a reference to a different aggregate in an event. E.g. authorizing doctor in the invoice?
    Is it correct to persist the id (string or long +/- version) there? Otherwise one would somehow persists the whole object graph with each event.
    But this leads to a rather complicated machinery for persisting and replaying.

    ReplyDelete
    Replies
    1. Yes, referencing by id is a reasonable approach. This makes things much easier if different aggregate types are maintained by different services (of a distributed application), for example.

      Delete
  7. First of all thanks for this awesome approach!

    I have a question about the UpdateProjection trait in the main branch on your github.
    In the transacted function you use Txn.findCurrent and check if it exists or not and use the afterCommit method.
    What is the purpose of these two lines of code (The documentation is rather scarce)?
    Im not using these two lines except for the dispatch call and it seems to run okay.

    Roel

    link to the code:
    https://github.com/krasserm/eventsourcing-example/blob/master/src/main/scala/dev/example/eventsourcing/state/Projection.scala#L36

    ReplyDelete
    Replies
    1. If the transacted method is called within an enclosing (i.e. existing) STM transaction, and the enclosing transaction rolls back, the updater won't be sent a request message to update the current state. BTW, the very same mechanism is used by (transactional) agents in Akka.

      Delete
    2. Thanks for your answer!

      If I understand it correctly it will have use when for example the transaction method is called from within the transaction method. And the outer transaction has to be successful before dispatching the update.

      I also don't quite understand why Akka agents can not be used. Why is it not possible to first apply the update with the state from the agents get method and then use the agents send method after events are persisted to the log?

      Delete
    3. Regarding your first question: the typical use case is when another (completely different) service starts a transaction and within that transaction makes one or more calls to the 'transacted' method (typically indirectly via methods of services that extend UpdateProjection). The calling service only wants to dispatch all the updates if the overall transaction (it started) succeeds or none of the updates if there's a rollback.

      Regarding your second question: you need to ensure that the state change done by a certain update is immediately visible by the next update. Using an agent you cannot guarantee that because updates to agents are done asynchronously, so you'd have to wait for the agent to process an update and this is probably not the way you want to go. On the other hand, an update to a plain STM reference becomes visible immediately.

      Delete
  8. What time to update the version with play2.1?Thanks!

    ReplyDelete
  9. The Eventsourced library looks great, Thanks !

    Something I'm wondering about: how would you implement the concept of Aggregates if there are too many of them to keep them in memory ?

    What I'm understanding from your example (PaymentProcess, which seems to play the role of the Aggregate) is that Eventsourced does not provide any facility for recovering the state of a particular aggregate and replaying its events. Am I right ?

    Is it then possible to somehow query the journal to retrieve only events of a given aggregate type + id ? For instance, if I have to accept/reject commands based on the state of an aggregate, this is something I need to do.

    Thanks

    ReplyDelete
    Replies
    1. Removing aggregates from memory can be done by stopping the corresponding actor (which must be done by the application). How to recover that actor later is explained in section Recovery.

      PaymentProcess is a stateful, eventsourced business process (often referred to as Saga) and not an aggregate.

      If you want to accept/reject commands based on the state of an aggregate you should use query the state stored in memory. Querying the journal should be done only for recovery - an application during "normal" operation only writes the to journal.

      A high-level overview of the concepts of Eventsourced in given in this blog post. See also the Eventsourced reference application for an example.

      Delete