Categories
Projects and Code

Learn event store with file-based example

Creating an event-based application backed by an event store persistence seems to be most reasonable and predictable way to have flexible software. These less than 200 lines present a simple, training implementation of a a file-based single-threaded event store through which I’d like to present this idea.

Prequisites

  • Create .NET console app
  • Paste all the code below to a single file
  • Install Newtonsoft.JSON nuget package
  • Run

Scenario

From the highest abstraction level our application will have three steps: first of all, a new aggregate will be created (and thus stored in event store), secondly this aggregate will receive an event (which is going to be stored as well), and finally we’re going to retrieve created aggregate from the event store by its persistence ID.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Newtonsoft.Json;

class Program
{
    static void Main(string[] args)
    {
        var aggregate = new ConcreteAggregate();
        aggregate.Handle(new NameChanged() { Name = "dupa1" });

        var retrievedAggregate = new ConcreteAggregate(1);
    }
}

The event store implementation is actually a list of entries, each entry consists of persisted message as well as reference to its owner. It can be serialized to file and deserialized through a static factory method. Thanks to Newtonsoft.JSON serializer, types are being serialized along with content, so we do not have to worry about it.

public class EventStore : List<EventStore.Entry> 
{
    public class Entry
    {
        public object Message { get; set; }
        public int OwnerId { get; set; }
    }

    private const string PeristenceFilePath = "path";

    public static EventStore Deserialize()
    {
        var content = File.Exists(PeristenceFilePath) 
            ? File.ReadAllText(PeristenceFilePath)
            : string.Empty;
        var store = JsonConvert.DeserializeObject<EventStore>(
            content, 
            new JsonSerializerSettings() 
            { 
                TypeNameHandling = TypeNameHandling.All
            }) ?? new EventStore();

        return store;
    }

    internal void Serialize()
    {
        File.WriteAllText(
            PeristenceFilePath,
            JsonConvert.SerializeObject(
                this, 
                new JsonSerializerSettings() 
                { 
                    TypeNameHandling = TypeNameHandling.All
                }));
    }
}

To actually make any use of it, I have created an abstract aggregate class. Just to remind, an aggregate in the ddd-way represents an object that keeps itself internally consistent, being a boundary of transaction. In our case, since the aggregate will itself be responsible for its event store, it will be the one and only entity which will be able to make any transactions.

I have mimicked this approach after several actor implementations like Akka.NET, nact.js and so on.

First of all, we need an AggregateCreated event that is going to be the first one in any stream. The abstract class implements a basic interface which depicts its abilities.

As you can see, it can handle its creation and virtually any object. It keeps track of its events, can persist itself (notice “state design pattern” / “become/unbecome design pattern”) and has an identity, which is one of characteristic traits of an aggregate from DDD point of view.

It can be constructed in two ways: retrieved from event store by id and built from scratch. Note that in the first example we filter events from stream by id and use reflection to provide them to the handlers.

In fact, this is the root of an event store pattern – every retrieval of the aggregate from the store effectively makes it handle events as a stream.

When a new aggregate is being created (with the second constructor), it receives first empty ID.

public class AggregateCreated
{
    public int Id { get; set; }
}

public interface ICanHandle<T>
{
    void Handle(T message);
}

public abstract class Aggregate : 
    ICanHandle<AggregateCreated>, 
    ICanHandle<object>
{
    private List<object> Events { get; }
    private Action<EventStore.Entry> PersistBehavior { get; set; }
    private int Id { get; set; }

    public Aggregate(int id)
    {
        this.Events = new List<object>();
        this.Id = id;
        this.PersistBehavior = _ => {};

        var type = this.GetType();
        var stream = EventStore.Deserialize()
            .Where(item => item.OwnerId == this.Id)
            .Select(item => item.Message);

        foreach (var serializedEvent in stream)
        {
            type.GetMethod(nameof(this.Handle), 
                    new[] { serializedEvent.GetType() })
                .Invoke(this, new object[] { serializedEvent });
        }

        this.PersistBehavior = this.DoPersist;
    }

    public Aggregate()
    {
        this.PersistBehavior = this.DoPersist;
        this.Events = new List<object>();
        this.Handle(new AggregateCreated() 
        { 
            Id = EventStore.Deserialize()
                .Select(e => e.OwnerId)
                .DefaultIfEmpty()
                .Max() + 1
        });
    }

    public void Handle(AggregateCreated message)
    {
        this.Id = message.Id;
        this.Persist(message);
    }

    public void Handle(object message)
    {
        this.Persist(message);
    }

    private void Persist<T>(T message)
    {
        this.PersistBehavior(
            new EventStore.Entry() 
            { 
                Message = message, 
                OwnerId = this.Id 
            });
    }

    private void DoPersist(EventStore.Entry message)
    {
        var state = EventStore.Deserialize();
        state.Add(message);
        state.Serialize();
    }
}

All that has been presented above is in fact some infrastructure for further implementations. An example below depicts a specific aggregate which, besides it inherits from base aggregate class, can handle a specific event.

public class NameChanged
{
    public string Name { get; set; }
}

public class ConcreteAggregate : Aggregate, 
    ICanHandle<NameChanged>
{
    public string Name { get; private set; }

    public ConcreteAggregate(int id) : base(id) { }

    public ConcreteAggregate() : base() { }

    public void Handle(NameChanged message)
    {
        if (!string.IsNullOrEmpty(message.Name))
        {
            this.Name = message.Name;
            base.Handle(message);
        }
    }
}

Summary

In these less than 200 lines of code we have created a basic event store backed by a text file. Of course, only for educational purposes but I hope that if one debug it step by step, the idea of event store will become clear and reasonable to use in your next project.

Leave a Reply

Your email address will not be published. Required fields are marked *