A few weeks ago I started a blog post series to summarize my experiences in building an event-sourced web application using Scala and Akka. This was done based on an example application (see branch part-1). There, I gave an overview of the application architecture and presented some details of the immutable domain model and the service layer. Since then, the example application has been extended (see branch part-2) with a number of new features and enhancements. Here's an overview:
Enhancements are:
- The STM-based state management was completely revised and generalized into the traits
UpdateProjectionandEventProjection. AnUpdateProjectionis similar to an Akka Agent: it applies state transition functions asynchronously and can participate in STM transactions. The major difference is that anUpdateProjectionis specialized on domain object updates and can log captured events to a persistent event log. By default, update events are logged before state changes are visible via STM references (this is an important difference to the service layer of part 1).UpdateProjectionimplementors (such as the example application'sInvoiceService) are domain event producers.EventProjectionimplementors, on the other hand, are domain event consumers. They internally use plain Akka Agents to manage state and derive new state values from received domain events (using an application-defined event projection function).EventProjectionimplementors are usually components that manage read models or coordinate business processes in event-driven architectures, for example.
- The domain model was enhanced by introducing the domain classes
DraftInvoice,SentInvoiceandPaidInvoiceto represent the states an invoice can have. Valid state transitions are defined by the methods on these domain classes and can therefore be checked by the compiler. This approach is explained more detailed here although the implementation used in our example application slightly differs. In part 1, we only had a singleInvoiceclass and valid state transitions had to be checked at runtime.
New features include:
- A revised
EventLogtrait together with two implementations:JournalioEventLogis based on Journal.IO andBookkeeperEventLogon Apache BookKeeper. AnEventLogsupports synchronous and asynchronous appending of events as well as iterating over stored events, either from the beginning or from an application-defined position in the event history. Event log entries are also assigned sequence numbers so that event consumers can detect gaps in event streams or re-order (resequence) them, if needed.
- Event consumers. One example is
InvoiceStatistics, anEventProjectionthat derives invoice update statistics from domain events. Here, a separate read model is used (following the CQRS pattern) to serve invoice statistic queries. Another example isInvoiceReplicator, anEventProjectionthat simply reconstructs the invoice map (as maintained by theInvoiceService) from invoice events at a different location. It can be used to replicate application state across different nodes and to serve (eventually consistent) reads. The replicated state could also be used by a snapshot service to take snapshots of application state.InvoiceReplicatorneeds to receive events in the correct order and is therefore configured to resequence the received event stream. A third example is thePaymentProcess. It coordinates the activities ofInvoiceServiceandPaymentService. Instead of having these services sending commands to each other, it is thePaymentProcessthat sends commands to (i.e. calls methods on) these services in reaction to domain events. This event-driven approach to implementing business processes not only decouples the services from each other but also lets other components extend (or monitor) the business process by subscribing to and reacting on the relevant domain events. ThePaymentProcessis currently stateless. Processes that need to maintain state should implementEventProjectionand recover the process state during application start (or failover) from the event history.
- A RESTful web interface for invoices and invoice statistics with support for HTML, XML and JSON representations. These can be negotiated with the HTTP Accept header. The web layer is based on the Jersey web framework (the JAX-RS reference implementation). HTML representations are rendered with the Scalate template engine. The mapping between XML and JSON representations and immutable domain classes is based on JAXB annotations. A JAXB-based XML provider must be supported by any JAX-RS implementation but Jersey additionally comes with a JAXB-based JSON provider so that the same metadata (JAXB annotations) can be used to generate both XML and JSON representations. Following some simple rules, it is possible to JAXB-annotate Scala case classes without polluting them with Java collection types or getters and setters. One major drawback of the current JAX-RS specification is that it doesn't support asynchronous responses yet. This will change with JAX-RS 2.0 and then we can make full use of the asynchronous
InvoiceServiceresponses.
- A communication
Channelfor connecting domain event producers to consumers. The example application provides aSimpleChannelimplementation for local communication. Alternative implementations, for example, could connect to a distributed event bus to communicate events across components of a distributed application.
Running the example application
The example application can be started withsbt run-main dev.example.eventsourcing.server.WebserverTwo classes relevant for starting the application are:
Appserver: configures the event log, services, read models and processes and connects them via an event channel. It also recovers application state from the event history.
Webserver: configures the web service layer and starts an embedded web server (Jetty).
JournalioEventLog with BookkeeperEventLog in Appserver and additionally start a test BookKeeper instance with
sbt run-main dev.example.eventsourcing.server.Zookeepersbt run-main dev.example.eventsourcing.server.BookkeeperExamples how to interact with the RESTful web interface can be found here.
Service Layer Enhancements
In the service layer implementation from part 1 we've seen how to keep the order of logged events in correspondence with the order of updates to the application state. We used a transient event log that could participate in STM transactions. After the transaction commits, the events from the transient event log have been transferred to a persistent event log. A drawback of this approach is that one can loose updates in case of crashes after an STM reference has been updated but before the changes have been written to the persistent event log. This can lead to situations where clients can already see application state that cannot be recovered from the event log. While some applications may tolerate this, others may require that any visible application state must be fully recoverable from the event log. Therefore, an alternative approach must be chosen.We need a way to write events, captured during domain object updates, to a persistent event log before the STM reference is updated. But writing to the persistent event log must be done outside an STM transaction for reasons explained in part 1. Updates must also be based on the current (i.e. latest) application state. We therefore need to
- Get the current state value from a transactional reference (STM transaction 1)
- Update domain object(s) obtained from the current state (no STM transaction)
- Write the captured update event(s) to a persistent event log (no STM transaction)
- Compute a new state value from the domain object update and write it to the transactional reference (STM transaction 2)
UpdateProjection trait:- Instances of
UpdateProjectionmanage (part of) application state with a transactionalrefof typeRef[S]whereSis the state value type. Clients concurrently read application state viacurrentState. Sequential writes to the transactionalrefare done exclusively by theupdateractor (more on write concurrency below).
UpdateProjectionimplementors change application state with thetransactedmethod. Theupdateparameter is a function that computes a domain objectUpdate[Event, B]from current stateSwhereBis a domain object type. The update result (eitherSuccess[B]orFailure[DomainError]) is returned as future value from thetransactedmethod.
- The
updatefunction and the underlyingFutureimplementation object (promise) are sent to the updater actor with anApplyUpdatemessage. Theupdaterthen reads the current state and applies theupdatefunction to it. If the update succeeds, it writes the captured events to anEventLogand projects the update result onto the current state. The projection is done with theprojectfunction. It creates a new state value from the current state and the update result. The new state value is then finally set on the transactionalrefand thepromiseis completed with the update result.
- Furthermore, the
transactedmethod can participate in STM transactions. If there's an enclosing transaction, theupdateractor will only be triggered if the enclosing transaction successfully commits. If there's no enclosing transaction theupdateractor will always be triggered.
UpdateProjection to implement the stateful InvoiceService. The state is of type Map[String, Invoice] i.e. a single map containing draft, sent and paid invoices. Here's a simplified version of InvoiceService:The
updateInvoice method uses the transacted method of the UpdateProjection trait. It tries to get an invoice with given invoiceId from the current state and applies the supplied update function f to it. The updateInvoice method is used by updateDraftInvoice for updating draft invoices in the invoice map. The updateDraftInvoice method is used by the service methods addInvoiceItem and sendInvoiceTo. Adding an item to an existing draft invoice yields a future value of an updated draft invoice (return type Future[DomainValidation[DraftInvoice]]). Sending an existing draft invoice, on the other hand, causes a state transition of that invoice to a sent invoice (return type Future[DomainValidation[SentInvoice]]) i.e. the service methods make use of the newly introduced domain object types. The InvoiceService must also implement the abstract members project, initialState and eventLog (declared by Projection and UpdateProjection).
- The
projectimplementation projects an updated invoice onto the current state by simply adding it to the map of invoices (replacing an old invoice if present).
- An
initialState(empty map by default) and anEventLoginstance are provided during construction of anInvoiceService.
InvoiceService implementation supports concurrent reads but only sequential writes to the whole invoice map. This may be sufficient for many applications but if a higher degree of write concurrency is needed, one could choose to have a separate UpdateProjection instance per invoice object (which is comparable to have a separate Akka Agent for each invoice object). This allows both concurrent reads and writes to the invoices of an application. The following snippet shows the general idea (not part of the example project).Here, an
InvoiceService maintains a map of PersistentInvoice instances where a PersistentInvoice is an UpdateProjection that contains a reference to a single (draft, sent or paid) invoice. Consequently, updates to different invoices can now be made concurrently. The projection function degenerates to a function that simply returns the updated invoice.
I can not find right the words how I should thanks to you. Keep up the good work!
ReplyDeleteMartin you made awesome work!
ReplyDeleteKeep going!
I'll try refactor my OLTP app using yours implementation.
The problem with CQRS is when you do coordinated update with guard condition, but have to avoid throwing exceptions [in case of guard condition not met]. As example - check an account balance before updating other accounts [in parallel] but if balance check fails - gracefully exit. One of possible solutions is multi-staged coordinated transactions - do you think same?
Hi Pavel, glad that you find the implementation useful. Regarding your example, what speaks against checking one account balance and updating other accounts in a single STM transaction (without using CQRS). If you implement an AccountService in the same way as the InvoiceService of the example application (by extending UpdateProjection) you can enclose all relevant read and write operations of that service within a single STM transaction using an atomic { ... } block. Does that help?
DeleteThank you Martin! I've read your comments, still can't understand how I can notify [without throwing exceptions] others accountActors participating in the coordinated akka transaction, about failed validation in one of them. I see simple but not elegant way is to wrap coordinated atomic blocks with another "general[not coordinated]" atomic, where I will validate operations against accounts by asking them about their balance and if validation fails just do not send them coordinated updates at all.
DeleteI like the way you deal with abstraction of bussines-logic but dependency on scalaz artifact keeps me from implementing it right now - I don't like scalaz, don't ask - with out any reason :).
Are you using transactors? Best you share some code snippets (e.g. on github) showing the relevant parts of your app. I have difficulties to follow your descriptions. Regarding scalaz: I'd really like to ask ;)
DeleteNo transactors - I use Coordinated. Shall I use them just for spreading the message processing logic around several methods? I don't find this doubtful feature useful )
DeleteSorry - right now I can't be as prompt as you Martin ) - currently too many different projects/activities ). Everything is urgent ).
Regarding Scalaz - Generally, for everyone: - it is good if you
1. have time (and brain) to learn it and
2. spare team of clever guys who will participate and use it,
else better avoid it.
For me, Scalaz brings another degree of complexity and requires more time to learn than just scala/akka/camel etc (no time).
But even scala/akka/camel brings a lot of changes: rethink and redesign of architecture, test and integrate with already working infrastructure etc... I prefer doing such things step-by-step with small increments, evolutionary moving to future but not ruin the present ). And in parallel teach developers team for using new architecture and frameworks, migrating network infrastructure to support both: old (webservices/RPC) and new (event oriented/CQS) technologies.
Indeed all this stuff are very very interesting!
Well, anyway I will extract and share the code, as soon as I can.
And you Marting keep going! To improve the world! ))
so cool and how timely for me! still have to digest everything and thinking of ways to save the current application state to a RDBMS for legacy reasons. But this should be not too complicated. All the events are there! Its way better than JPA-Session + reattaching entities in DAO + saveOrUpdate.
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteAfter thinking more about this, I actually don't understand at all how you maintain consistency with other aggregates. e.g. a User with his invoices.
ReplyDeleteI'm using 'processes' (see PaymentProcess example at 'Event Consumers'). You can achieve eventual consistency using this approach. To achieve strong consistency, manage the aggregates with the same transactional reference.
DeleteWhat happens when you need to persist a reference to a different aggregate in an event. E.g. authorizing doctor in the invoice?
ReplyDeleteIs it correct to persist the id (string or long +/- version) there? Otherwise one would somehow persists the whole object graph with each event.
But this leads to a rather complicated machinery for persisting and replaying.
Yes, referencing by id is a reasonable approach. This makes things much easier if different aggregate types are maintained by different services (of a distributed application), for example.
DeleteFirst of all thanks for this awesome approach!
ReplyDeleteI have a question about the UpdateProjection trait in the main branch on your github.
In the transacted function you use Txn.findCurrent and check if it exists or not and use the afterCommit method.
What is the purpose of these two lines of code (The documentation is rather scarce)?
Im not using these two lines except for the dispatch call and it seems to run okay.
Roel
link to the code:
https://github.com/krasserm/eventsourcing-example/blob/master/src/main/scala/dev/example/eventsourcing/state/Projection.scala#L36
If the transacted method is called within an enclosing (i.e. existing) STM transaction, and the enclosing transaction rolls back, the updater won't be sent a request message to update the current state. BTW, the very same mechanism is used by (transactional) agents in Akka.
DeleteThanks for your answer!
DeleteIf I understand it correctly it will have use when for example the transaction method is called from within the transaction method. And the outer transaction has to be successful before dispatching the update.
I also don't quite understand why Akka agents can not be used. Why is it not possible to first apply the update with the state from the agents get method and then use the agents send method after events are persisted to the log?
Regarding your first question: the typical use case is when another (completely different) service starts a transaction and within that transaction makes one or more calls to the 'transacted' method (typically indirectly via methods of services that extend UpdateProjection). The calling service only wants to dispatch all the updates if the overall transaction (it started) succeeds or none of the updates if there's a rollback.
DeleteRegarding your second question: you need to ensure that the state change done by a certain update is immediately visible by the next update. Using an agent you cannot guarantee that because updates to agents are done asynchronously, so you'd have to wait for the agent to process an update and this is probably not the way you want to go. On the other hand, an update to a plain STM reference becomes visible immediately.
What time to update the version with play2.1?Thanks!
ReplyDeleteThe Eventsourced library looks great, Thanks !
ReplyDeleteSomething I'm wondering about: how would you implement the concept of Aggregates if there are too many of them to keep them in memory ?
What I'm understanding from your example (PaymentProcess, which seems to play the role of the Aggregate) is that Eventsourced does not provide any facility for recovering the state of a particular aggregate and replaying its events. Am I right ?
Is it then possible to somehow query the journal to retrieve only events of a given aggregate type + id ? For instance, if I have to accept/reject commands based on the state of an aggregate, this is something I need to do.
Thanks
Removing aggregates from memory can be done by stopping the corresponding actor (which must be done by the application). How to recover that actor later is explained in section Recovery.
DeletePaymentProcess is a stateful, eventsourced business process (often referred to as Saga) and not an aggregate.
If you want to accept/reject commands based on the state of an aggregate you should use query the state stored in memory. Querying the journal should be done only for recovery - an application during "normal" operation only writes the to journal.
A high-level overview of the concepts of Eventsourced in given in this blog post. See also the Eventsourced reference application for an example.