Monday, August 30, 2010

Akka's grown-up hump

It's quite some time ago when I last wrote about Akka's Camel integration. An initial version of the akka-camel module was released with Akka 0.7. Meanwhile Akka 0.10 is out with an akka-camel module containing numerous new features and enhancements. Some of them will be briefly described in this blog post.

Java API

The akka-camel module now offers a Java API in addition to the Scala API. Both APIs are fully covered in the online documentation.

Support for typed consumer actors

Methods of typed actors can be published at Camel endpoints by annotating them with @consume. The annotation value defines the endpoint URI. Here's an example of a typed consumer actor in Java.
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.TypedActor;
import se.scalablesolutions.akka.camel.consume;

public interface MyTypedConsumer {
@consume("file:data/foo")
public void foo(String body);

@consume("jetty:http://localhost:8877/camel/bar")
public String bar(@Body String body, @Header("Content-Type") String contentType);
}

public class MyTypedConsumerImpl extends TypedActor implements MyTypedConsumer {
public void foo(String body) {
System.out.println(String.format("Received message: ", body));
}

public String bar(String body, String contentType) {
return String.format("body=%s Content-Type header=%s", body, conctentType);
}
}
When creating an instance of the typed actor with
import se.scalablesolutions.akka.actor.TypedActor;

// Create typed actor and activate endpoints
MyTypedConsumer consumer = TypedActor.newInstance(
MyTypedConsumer.class, MyTypedConumerImpl.class);
then the actor's foo method can be invoked by dropping a file into the data/foo directory. The file content is passed via the body parameter. The bar method can be invoked by POSTing a message to http://localhost:8877/camel/bar. The HTTP message body is passed via the body parameter and the Content-Type header via the contentType parameter. For parameter binding, Camel's parameter binding annotations are used.

Endpoint lifecycle

Consumer actor endpoints are activated when the actor is started and de-activated when the actor is stopped. This is the case for both typed and untyped actors. An actor can either be stopped explicitly by an application or by a supervisor.

Fault tolerance

When a consumer actor isn't stopped but restarted by a supervisor, the actor's endpoint remains active. Communication partners can continue to exchange messages with the endpoint during the restart phase but message processing will occur only after restart completes. For in-out message exchanges, response times may therefore increase. Communication partners that initiate in-only message exchanges with the endpoint won't see any difference.

Producer actors

Actors that want to produce messages to endpoints either need to mixin the Producer trait (Scala API) or extend the abstract UntypedProducerActor class (Java API). Although the Producer trait was already available in the initial version of akka-camel, many enhancements have been made since then. Most of them are internal enhancements such as performance improvements and support for asynchronous routing. Also, extensions to the API have been made to support
  • pre-processing of messages before they are sent to an endpoint and
  • post-processing of messages after they have been received as response from an endpoint.
For example, instead of replying to the original sender (default behavior) a producer actor could do a custom post-processing e.g. by forwarding the response to another actor (together with the initial sender reference)
import se.scalablesolutions.akka.actor.Actor
import se.scalablesolutions.akka.camel.Producer

class MyProducer(target: ActorRef) extends Actor with Producer {
def endpointUri = "http://example.org/some/external/service"

override protected def receiveAfterProduce = {
// do not reply to initial sender but
// forward result to a target actor
case msg => target forward msg
}
}
Forwarding results to other actors makes it easier to create actor-based message processing pipelines that make use of external services. Examples are given in the akka-camel documentation.

Typed actors need to use Camel's ProducerTemplate directly to produce messages to Camel endpoints. A managed instance of a ProducerTemplate can be obtained via CamelContextManager.template.

Asynchronous routing

Since Akka 0.10, Camel's asynchronous routing engine is fully supported: in-out and in-only messages exchanges between endpoints and actors are designed to be asynchronous. This is the case for both, consumer and producer actors.

This is especially important for actors that participate in long-running request-reply interactions with external services. Threads are no longer blocked for the full duration of an in-out message exchange and are available for doing other work. There's also an asynchronous routing example described in the online documentation.

Routes to actors

Typed an untyped actors can also be accessed from Camel routes directly, using Akka's TypedActorComponent and ActorComponent, respectively. These are Camel components supporting typed-actor and and actor endpoint URIs in route definitions. For example,
from("seda:test").to("actor:uuid:12345678");
routes a message from a SEDA queue to an untyped actor with uuid 12345678. The actor endpoint looks up the actor in Akka's actor registry.

The TypedActorComponent is an extension of Camel's bean component where method invocations follow the semantics of the actor model. Here is an example route from a direct endpoint to the foo method of a typed actor.
from("direct:test").to("typed-actor:sample?method=foo");
The typed actor is registered under the name sample in the Camel registry. For more details how to add typed actors to the Camel registry, follow this link.

CamelService

Prerequisite for endpoints being activated when starting consumer actors is a running CamelService. When starting Akka in Kernel mode or using the Akka Initializer in a web application, a CamelService is started automatically. In all other cases a CamelService must be started by the application itself. This can be done either programmatically with
import se.scalablesolutions.akka.camel.CamelServiceManager._

startCamelService
or declaratively in a Spring XML configuration file.
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka"
xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-0.10.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">

<!– A custom CamelContext (SpringCamelContext) –>
<camel:camelContext id="camelContext">
<!– … –>
</camel:camelContext>

<!– Create a CamelService using a custom CamelContext –>
<akka:camel-service>
<akka:camel-context ref="camelContext" />
</akka:camel-service>

</beans>
Usage of the element requires the akka-spring jar on the classpath. This example also shows how the Spring-managed CamelService is configured with a custom CamelContext.

A running CamelService can be stopped either by closing the application context or by calling the CamelServiceManager.stopCamelService method.

Outlook

The next Akka release will be Akka 1.0 (targeted for late fall) and akka-camel development will mainly focus on API stabilization. If you'd like to have some additional features in the next Akka release, want to give feedback or ask some questions, please contact the Akka community at the akka-user mailing list.