Event Sourcing

Systems using Event Sourcing store their persistent state as a sequence of events instead of updating a single model. A particular state can be recreated by replaying all events. It’s basically a persistent transaction log file for your application.

Why should I care?

You’re able to restore the system state to any point in time. More important you have all information available to debug the reason for a certain state. Depending on your business domain, this might be a business- or even legal requirement.

On the source code level, I see a huge advantage in the maintainability and readability of the code. Every chance to the system’s state goes through a command which result is a set of events. This makes it easy for other developers to understand your system.

On the other hand, using event sourcing comes with a cost. Your event store will grow and its additional data that needs to be maintained. For performance reasons you have to maintain additional snapshots. If you avoid snaphots your application might need some extra time recreating a particular state that you canot affort. Also you might need to maintain a separate read model that needs to be in sync.

How can I build such a system?

In an event sourced system all intents are expressed as a command. Commands are usually start with an verb (e.g. ShipOrderCommand). A command should clearly express a single intent. Generic commands like “Update” or “Insert” are not useful.

Each command has an CommandHandler thats first resposibility is to check the preconditions if this command can be performed by the caller depending on the current state and may reject it’s execution. A CommandHandler never modifies any data directly, but it returns a sequence of events that are the result of the command. For example the ShipOrderCommand may return an event OrderShippedEvent.

The events provided by the CommandHandlers are then handled by an EventBus that stores the events in the EventStore. The EventStore also knows about EventHandlers which register for a certain event type and notifies them about the new event. Usual jobs of EventHandlers are updating the domain model.

The EventStore holds all events and provides methods to get all events for a particular thing. The EventStore is the transaction log of your application from which you can rebuild the state of the system.

The following sequence diagram shows this interaction between those components:

Sequence diagram of event sourcing components

This is just a brief overview of the most important components of an event-sourced system.

Did you like this post?

Exploring Streams in Scala

In this blog post, I‘m going to explore Streams in Scala‘s Collection API. So first of all what is a Stream? A Stream is a lazily evaluated list. This means that elements in a Stream get only evaluated when they are needed. Therefore Streams can be infinite while strict collections cannot. A Stream can also be seen as an immutable Iterator.

Creating a Stream

To create a Stream at least two values must be given:

  • an initial value
  • a function to compute the next value

For demonstration of an infinite Stream I’ll choose a Stream of natural numbers (0, 1, 2, …). The set of natural numbers starts with zero what is the initial value of our Stream. The successor of the initial value can easily computed by n+1. Then the successor of n+1 is n+1+1 and so forth. To build such a Stream we could write the following function:

def from(start: Int): Stream[Int] =
Stream.cons(start, from(start + 1))

The Stream is created by calling the cons function. It takes two parameters. The first parameter is the initial value. The second parameter takes a function which returns another Stream. The cons call returns a new Stream. The Stream consists of the initial value and the “rest“. The “rest” will only be evaluated when needed.
Ok, I lied a bit. cons feels like a function but in reality it is a object with an apply method. When we call Stream.cons(...) we call the apply method. Written out in full it would be Stream.cons.apply(...) but we don‘t really want to write it like that.

The same function can also be written in a shorter notion using the #:: operator:

def from(start: Int): Stream[Int] =
start #:: from(start + 1)

Such common builder functions like from, range, continually and so on for the creation of Streams can be found in the Stream object.

Now consider the following example:

val nn = from(0)
println(nn.take(10).mkString(","))

A variable nn is created and an infinite Stream of natural numbers is assigned. At this point the from
function is only called once. Recursive calls of from(start + 1) doesn’t happen at this point. The next line takes the first 10 elements of the Stream and prints them to the console. Now guess how many times the from function will be called? Right 9 times, because the initial value (zero) has already been initialised.

0|1|2|3|4|5|6|7|8|9|...

(Just call nn.toString on the REPL to verify)

Filtering/ Mapping Streams

A Stream can be used like every other collection because it is of type LinearSeq. Operations like filter and map are lazy evalutated. They simply return another Stream and perform the operation when required. I think we can say that all operations that return another Stream can be considered as lazy operations (like filter, map). Of course, operations like foldLeft, foldRight, find and exists are not lazy.

For example to filter even values:

val even = nn filter (_ % 2 == 0)
println(even.take(10).mkString(","))

The 1st line calls filter on the Stream of natual numbers. Calling the filter method will return a new Stream. The new Stream uses the original Stream as its input and only returns elements for which the filter conditiion holds true. In short: filter is lazy evaluated. The second line prints the first 10 even elements to the console. This should be: 0,2,4,6,8,10,12,14,16,18

Now the first 20 elements of the natural number‘s Stream has been initialized:

0|1|2|3|4|5|6|7|8|9|10|11|12|13|14|15|16|17|18|19|...

Streams can be used in pattern matching

A Stream can be used in pattern matching. To do so you can use the #:: extractor. The following example prints out the string “matched” on the console if the first two elements of our nn Stream is 0 and 1. And yes, it does 🙂

nn match {
case 0 #:: 1 #:: _ => println("matched")
}

Conclusion

Stream is the immutable equivalent to Iterator. While an Iterator doesn’t keep computated values Stream does. So as with any immutable data structure you don’t have to care about state with Streams. This is useful when you pass a Stream around to other functions. If you do so with a Iterator you have to care about state what needs much more caution and can lead to logic errors. The immutability of Streams makes it also quite easy to compose them. Keep in mind that a Stream has a higher memory footprint than an Iterator because it keeps the computed elements.

Did you like this post?

Helper for streaming MongoDB GridFS files in lift web applications

Lift is a web application framework written in scala and comes with native integration for mongodb. The module is called “lift-mongodb” and integrates mongodb as the persistence layer for its Record and Mapper framework.

GridFS is a specification for storing large files in MongoDB. Most drivers support it directly.

In this post, I‘m going to develop a helper that makes GridFS files accessible via HTTP. Furthermore, the helper should support HTTP caching so the files can be cached by the clients.

Let‘s get started.

Basic setup

I assume you have a plain lift project. I‘m using sbt for building the lift application. If you (for whatever reason) prefer maven you can certainly do so.

First of all, we need to tell lift that we want to use mongodb. Therefore we‘ll add the lift-mongodb module as a dependency. See http://www.assembla.com/wiki/show/liftweb/lift mongodb to find out more.

val lift_mongo = "net.liftweb" % "lift-mongodb" % "2.2"

First shot

To start simple I wrote an object named GridFSHelper with a get function. The get function takes a file name as the only argument and returns a value of type Box[LiftResponse]. Like a real-world box, the lift box can be empty or full. And so Box has two subtypes called Empty and Full.

The behaviour of the get function is like this:
It uses the default mongo connection to query for a file with the given filename. If no file was found it returns Empty to signal that it has nothing to respond. This leads into a 404 (Not Found) HTTP message.

If the file was found it returns a “Full Box” containing a StreamingResponse . A StreamingResponse takes six arguments.
First of all the InputStream that should be sent to the client. The second argument is a function which is called when the stream was done or aborted. This is perfect to cleanup resources. The third argument is the length of the stream. The last three arguments are a map with HTTP header fields, a map with cookies and the HTTP status code.

object GridFSHelper {

  def get(filename: String): Box[LiftResponse] = {
    MongoDB.use(DefaultMongoIdentifier) ( db => {
    val fs = new GridFS(db)

    fs.findOne(filename) match {
       case file:GridFSDBFile => 
         val headers = ("Content-Type" ->  "application/octet</del>stream") :: Nil
         val stream = file.getInputStream

         Full(StreamingResponse(
              stream,
              () => stream.close,
              file.getLength,
              headers, Nil, 200))

       case _ => Empty
     }
   })
  }
}

You can use the GridFSHelper by binding it to an uri.

Add the following code to the Boot.scala file.

LiftRules.dispatch.append {
  case req @ Req(List("files", filename), <em>, </em> => {
     () => GridFSHelper.get(req, filename <ins> "." </ins>     req.path.suffix)
  }
}

However this implementation has some restrictions:

  • The content-type is not set properly.
  • It doesn‘t support HTTP caching (no 304 messages).

Know the content type

Currently we have the following line which sets a fixed content type.

val headers = ("Content-Type" <del>> "application/octet</del>stream") :: Nil

This means all responses have the same content type regardless if it‘s an image, an HTML or pdf …
This is far away from being perfect.

To determine the file‘s type we can look at the file extension. Fortunately, web containers do this already, so we don‘t have to implement it ourselves.

To get the content type evaluated just replace the previous line with the following:

def get(filename: String): Box[LiftResponse] = {
// some code ...
val headers = ("Content-Type" -> contentType(filename)) :: Nil
// more code ...
}

private def contentType(filename:String) =
LiftRules.context.mimeType(filename) openOr "application/octet-stream"

Now the HTTP response should come with the right content type. if not, the content type for the given file extension is not known by the web container. In that case you can add the content type to the web.xml :

For example:

<mime-mapping>
<extension>svg</extension>
<mime-type>image/svg+xml</mime-type>
</mime-mapping>

Handle HTTP caching

In the default configuration Lift sets a bunch of HTTP header fields to tell the client that nothing should be cached. This rule applies to our GridFS response as well. To allow clients to cache our response we have to reset some HTTP header fields:

val headers =
("Content-type" -> contentType(filename) ::
("Pragma" -> "") ::
("Cache-Control" -> "") :: Nil

Namely, we have to reset the Pargma and the Cache-Control field.

Next, we have to set the Date , Last-Modified and the Expires headers. The header list will now look like this:

val headers =
("Content-Type" -> contentType(filename)) ::
("Pragma" -> "") ::
("Cache-Control" -> "") ::
("Last-Modified" -> toInternetDate(lastModified)) ::
("Expires" -> toInternetDate(millis + 10.days)) ::
("Date" -> nowAsInternetDate) :: Nil

Great, our HTTP header is set probably. Now we need to check the request to see if we can return a 304 (not modified) response. This tells the client that there is no need to download the whole file again. The client can use the cached file.

Fortunately there is a testFor304 function in the Req which we can use.

def get(req: Req, filename: String): Box[LiftResponse] = {
// some code ...
req.testFor304(lastModified, "Expires" -> toInternetDate(millis + 10.days)) openOr {
// create and return StreamingResponse
}
// more code ...

As you see I introduced a new parameter req to pass the current Request to the function.

All the magic is done by the testFor304 function. If its return value is Empty we have to build our response, otherwise, we can simply return the already prepared response.

This simple helper allows us to stream files from GridFS to the client. It sets the proper content type and supports HTTP caching.

The complete code can be found at github: http://gist.github.com/653101

Comments and improvements welcome!

Did you like this post?

Parsing chunks of XML documents with JAXB

Jaxb is a great java library for mapping XML documents to Java objects and vice versa. But how can Jaxb be used to parse large XML documents?
The Unofficial JAXB Guide contains a small section which provides some useful information about this topic.

Assume we have a xml document similar to the following:

<Example id="10" date="1970-01-01" version="1.0">
   <Properties>...</Properties>
   <Summary>...</Summary>
   <Document id="1">...</Document>
   <Document id="2">...</Document>
   <Document id="3">...</Document>
</Example>

Now I want to unmarshal the Example element into the corresponding Example object. If I do so the whole XML document gets unmarshalled. If the XML document contains hundreds of thousands of Document elements it will consume a huge amount of memory. But at a certain point, I’m only interested in the Example element with its Properties and Summary element. The Document elements can be parsed by chunks.

To reach that goal I use virtual infosets like stated in the JAXB Guide. Therefore I created a simple class named ParitalXmlEventReader which is of type XmlEventReader and delegates all method calls to a parent reader. As a constructor argument it takes a QName of an element. If the reader finds the first start element of that type it closes the parent element by returning the EndElement event. So the xml document above will look like that to the caller of the reader:

<Example id="10" date="1970-01-01" version="1.0">
  <Properties>...</Properties>
  <Summary>...</Summary>
</Example>

As the parent reader is still located at the first Document start element we can use the same reader to parse the document elements

The following code demonstrates the use of the PartialXmlEventReader:

@Test
public void testChunks() throws JAXBException, XMLStreamException {

  final QName qName = new QName("Document");

  InputStream in =   getClass().getResourceAsStream("example.xml");
  if(in == null)
    throw new NullPointerException();

  // create xml event reader for input stream
  XMLInputFactory xif = XMLInputFactory.newInstance();
  XMLEventReader reader = xif.createXMLEventReader(in);

  // initialize jaxb
  JAXBContext jaxbCtx =   JAXBContext.newInstance(Example.class, Document.class);
  Unmarshaller um = jaxbCtx.createUnmarshaller();

  // unmarshall the Example element without parsing the   document elements
  Example example = um.unmarshal(new   PartialXmlEventReader(reader, qName),
Example.class).getValue();

  assertNotNull(example);
  assertEquals("My Properties",   example.getProperties());
  assertEquals("My Summary", example.getSummary());
  assertNull(example.getDocument());

  Long docId = 0l;
  XMLEvent e = null;

  // loop though the xml stream
  while( (e = reader.peek()) != null ) {

    // check the event is a Document start element
    if(e.isStartElement() &&     ((StartElement)e).getName().equals(qName)) {

      // unmarshal the document
      Document document = um.unmarshal(reader,   Document.class).getValue();

    assertNotNull(document);
    assertEquals(++docId, document.getId());

    } else {
      reader.next();
    }
  }
  assertEquals(new Long(10), docId);
}

You can find the source code of the PartialXmlEventReader here.

Did you like this post?

Generating large PDF documents with Apache FOP

Some days ago I had trouble with generating large PDF documents (> 2000 pages) with Apache FOP. The problem was the memory consumption while rendering the document. In my opinion, it was not an acceptable solution to increase the JVM memory > 2GB. So I had to find a way to optimize my templates.

Fortunately, on the FAQ list of Apache FOP is a section about Memory Usage, which gives some very useful hints on optimizing the template.

In my case, I had defined one page-sequence element for the whole document, but logically the PDF document contained multiple documents with several pages. So it was quite easy to define an own page sequence for each logical document. With this little modification in my template, the memory usage shrank from about 2GB under 60MB and the rendering process finished notable faster.

The initial template looked something like this:

<?xml version="1.0" encoding="utf-8"?>
<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:fo="http://www.w3.org/1999/XSL/Format">

<xsl:template match="/">
  <fo:root>
    <fo:page-sequence master-reference="A4">
      <xsl:apply-templates select="Document" />
    </fo:page-sequence>
  </fo:root>
</xsl:template>

<xsl:template match="Document">
  <!-- Page content goes here -->
</xsl:template>

</xsl:stylesheet>

After optimizing the template it looked like this:

<?xml version="1.0" encoding="utf-8"?>
<xsl:stylesheet version="1.0" xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
xmlns:fo="http://www.w3.org/1999/XSL/Format">

<xsl:template match="/">
  <fo:root>
    <xsl:apply-templates select="Document" />
  </fo:root>
</xsl:template>

<xsl:template match="Document">
  <fo:page-sequence master-reference="A4">
    <!-- Page content goes here -->
  </fo:page-sequence>
</xsl:template>

</xsl:stylesheet>

Did you like this post?