
So here’s a common scenario when building a system using Event Sourcing with Marten:
- Some of the data in your system is just reference data stored as plain old Marten documents. Something like user data (like I’ll use in just a bit), company data, or some other kind of static reference data that doesn’t justify the usage of Event Sourcing. Or maybe you have some data that is event sourced, but it’s very static data otherwise and you can essentially treat the projected documents as just documents.
- You have workflows modeled with event sourcing and you want some of the projections from those events to also include information from the reference data documents
As an example, let’s say that your application has some reference information about system users saved in this document type (from the Marten testing suite):
public class User
{
public User()
{
Id = Guid.NewGuid();
}
public List<Friend> Friends { get; set; }
public string[] Roles { get; set; }
public Guid Id { get; set; }
public string UserName { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
public string FullName => $"{FirstName} {LastName}";
}
And you also have events for some kind of UserTask
aggregate that manages the workflow of some kind of work tracking. You might have some events like this:
public record TaskLogged(string Name);
public record TaskStarted;
public record TaskFinished;
public class UserAssigned
{
public Guid UserId { get; set; }
// You don't *have* to do this with a mutable
// property, but it is *an* easy way to pull this off
public User? User { get; set; }
}
In a “query model” view of the event data, you’d love to be able to show the full, human readable User
information about the user’s full name right into the projected document:
public class UserTask
{
public Guid Id { get; set; }
public bool HasStarted { get; set; }
public bool HasCompleted { get; set; }
public Guid? UserId { get; set; }
// This would be sourced from the User
// documents
public string UserFullName { get; set; }
}
In the projection for UserTask
, you can always reach out to Marten in an adhoc way to grab the right User
documents like this possible code in the projection definition for UserTask
:
// We're just gonna go look up the user we need right here and now!
public async Task Apply(UserAssigned assigned, IQuerySession session, UserTask snapshot)
{
var user = await session.LoadAsync<User>(assigned.UserId);
snapshot.UserFullName = user.FullName;
}
The ability to just pull in IQuerySession
and go look up whatever data you need as you need it is certainly powerful, but hold on a bit, because what if:
- You’re running the projection for
UserTask
asynchronously using Marten’s async daemon where it updates potentially hundreds ofUserTask
documents a the same time? - You expect the
UserAssigned
events to be quite common, so there’s a lot of potentialUser
lookups to process the projection - You are quite aware that the code above could easily turn into an N+1 Query Problem that won’t be helpful at all for your system’s performance. And if you weren’t aware of that before, please be so now!
Instead of the N+1 Query Problem you could easily get from doing the User
lookup one single event at a time, what if instead we were able to batch up the calls to lookup all the necessary User
information for a batch of UserTask
data being updated by the async daemon?
Enter Marten 8.11 (hopefully by the time you read this!) and our newly introduced hook for “event enrichment” and you can now do exactly that as a way of wringing more performance and scalability out of your Marten usage! Let’s build a single stream projection for the UserTask
aggregate type shown up above that batches the User
lookup:
public class UserTaskProjection: SingleStreamProjection<UserTask, Guid>
{
// This is where you have a hook to "enrich" event data *after* slicing,
// but before processing
public override async Task EnrichEventsAsync(
SliceGroup<UserTask, Guid> group,
IQuerySession querySession,
CancellationToken cancellation)
{
// First, let's find all the events that need a little bit of data lookup
var assigned = group
.Slices
.SelectMany(x => x.Events().OfType<IEvent<UserAssigned>>())
.ToArray();
// Don't bother doing anything else if there are no matching events
if (!assigned.Any()) return;
var userIds = assigned.Select(x => x.Data.UserId)
// Hey, watch this. Marten is going to helpfully sort this out for you anyway
// but we're still going to make it a touch easier on PostgreSQL by
// weeding out multiple ids
.Distinct().ToArray();
var users = await querySession.LoadManyAsync<User>(cancellation, userIds);
// Just a convenience
var lookups = users.ToDictionary(x => x.Id);
foreach (var e in assigned)
{
if (lookups.TryGetValue(e.Data.UserId, out var user))
{
e.Data.User = user;
}
}
}
// This is the Marten 8 way of just writing explicit code in your projection
public override UserTask Evolve(UserTask snapshot, Guid id, IEvent e)
{
snapshot ??= new UserTask { Id = id };
switch (e.Data)
{
case UserAssigned assigned:
snapshot.UserId = assigned?.User.Id;
snapshot.UserFullName = assigned?.User.FullName;
break;
case TaskStarted:
snapshot.HasStarted = true;
break;
case TaskFinished:
snapshot.HasCompleted = true;
break;
}
return snapshot;
}
}
Focus please on the EnrichEventsAsync()
method above. That’s a new hook in Marten 4.13 that lets you define a step in asynchronous projection running to potentially do batched data lookups immediately after Marten has “sliced” and grouped a batch of events by each aggregate identity that is about to be updated, but before the actual updates are made to any of the UserTask
snapshot documents.
In the code above, we’re looking for all the unique user ids that are referenced by any UserAssigned
events in this batch of events, and making one single call to Marten to fetch the matching User
documents. Lastly, we’re looping around on the AgentAssigned
objects and actually “enriching” the events by setting a User
property on them with the data we just looked up.
A couple other things:
- It might not be terribly obvious, but you could still use immutable types for your event data and “just” quietly swap out single event objects within the
EventSlice
groupings as well. - You can also do “event enrichment” in any kind of custom grouping within
MultiStreamProjection
types without this new hook method, but I felt like we needed this to have an easy recipe at least forSingleStreamProjection
classes. You might find this hook easier to use than doing database lookups in custom grouping anyway
Summary
That EnrichEventsAsync()
code is admittedly some busy code that really isn’t the most obvious thing in the world to do, but when you need better throughput, the ability to batch up queries to the database can be a hugely effective way to improve your system’s performance and we think this will be a very worthy addition to the Marten projection model. I cannot possibly stress enough how insidious N+1 Query issues can be in enterprise systems.
This work was more or less spawned by conversations with a JasperFx Software client and some of their upcoming development needs. Just saying, if you want any help being more successful with any part of the Critter Stack, drop us a line at sales@jasperfx.net.