Scaling Workflows with NServicebus Routing Slips

If you’ve been using NServiceBus and Sagas, you owe it to yourself to check out Routing Slips. Essentially they’re a stateless alternative to Sagas and very useful for tackling workflow-processing needs. In this article I’ll take you from a simple handler, through a saga, and on to a full routing slip demo.

Sagas

Sagas in NServicebus provide additional durability for primarily long-running processes and Temporal Logic scenarios. They solve the problem by introducing state. One scenario I’ve seen Sagas used, which I want to focus on here, is to coordinate a workflow.

For example:
Step 1 -> Starts Saga and handles message A
Step 2 -> Saga handles message B
Step 3 -> Saga handles message C and completes

Collectively, steps 1-3 may be too much to do inside a single handler. Thus you introduce durability with a Saga coordinating each step. If a failure is encountered at Step 2, you don’t need to re-process Step 1. Additionally, some steps may require more parallelism than others. Using a Saga across multiple services allows you to maintain your durability and scale individual portions of the process.

However, that durability comes at a cost - performance.

Routing Slips

But what if you barely use the Saga state? For example, passing a few key values from one handler to another.

This is where Routing Slips come in, eliminating the overhead of managing the state for each saga.

The most important contribution this has on your system is increased concurrency. Ultimately, your maximum concurrency is dependant on how many stages are in your process. The more steps you have, the greater your concurrency. Thus, breaking your process up into stages can increase overall throughput so long as each stage has larger computational work than the overhead to move the message to the next step.

As a result, each stage can act as if it’s part of an assembly line, performing the same task on a semi-finished assembly - which in our case is data in a database.

Setting Up Our Assembly Line

In NServiceBus terms, each step represents a handler - performing one or more tasks for each message. We can improve throughput by dividing our process into logical steps for each task thats needed.

We’ll start with a very simple handler that inserts a record. This sample is lifted from a recent article I wrote on profiling NServiceBus transactions. I’ve expanded on this to add a bit more complexity.

  • Widgets are made up of up to 2 components - we’re calling them Component A and Component B
  • We have an inventory of Component A and Component B from which we can “claim” values for our use. Specifically, we must mark the “Batchid” value on the component to signify that it is in use.

I’ve simplified the code presented here to convey the purpose. Full source code is available on GitHub.

public void Handle(BuildWidgetCommand message)
{
    ...

    var componentA = GetComponentA(db, batchId);

    ...

    var widget = new Widget()
    {
        ...
        component_a_id = componentA.id,
        ...
    };

    db.Insert(widget);
    
}

private static Component_A GetComponentA(Database db, Guid batchId)
{
    db.Execute("UPDATE TOP(1) Component_A SET batchid = @0 WHERE batchid IS NULL", batchId);
    return db.Single<Component_A>("WHERE batchid = @0", batchId);
}

Endpoint Configuration

// DTC Disabled
configuration.Transactions().DisableDistributedTransactions();
configuration.Transactions().DoNotWrapHandlersExecutionInATransactionScope();

// In-Memory Persistence
configuration.UsePersistence<InMemoryPersistence>();

We’ll be keeping an eye on throughput - how fast our system can completely assemble our data. The scenario we’ll be using for these tests is as follows:

  • 1 million records exist in all of our tables
  • Process 10K new widgets
  • A random assortment will not need Component B - only Component A. We specify this on the message.

Initial Benchmark Results

Windows 8.1 (VMWare Fusion) on mid-2014 Macbook Pro
4 Virtual Processors
8GB Memory

Results

Messages: 10,000
Completion time: 100 seconds

Moving to Sagas

Moving our process into a Saga can help us in a couple of ways. First, we increase durability by being able to fault within each step rather than for the entire process. Maybe there is a ton of contention on the ComponentA table - wouldn’t it be nice to only fail on that part and continue once we have our id?

Second, we break up our workflow into smaller steps that can help with readability. In larger, more complex workflows, this can be a major plus. Also, as we create steps, we also create opportunities to scale.

We start by identifying where we can break the process up into steps. This isn’t easy and in most cases it will be a judgment call on where your highest points of contention are. In this case we can identify the following:

  1. Obtain Component A
  2. If needed, obtain Component B
  3. Create our Widget

Our Saga can then start to take shape. The information we’ll need throughout the process can be saved to our SagaData (state).

public class BuildWidgetSagaData : IContainSagaData
{
    ...
    [Unique]
    public Guid BatchId { get; set; }
    public Guid? ComponentAId { get; set; }
    public Guid? ComponentBId { get; set; }
    public bool NeedsComponentB { get; set; }
    ...
}

We can then kick our process (Saga) off the same way as our handler did before.

public class BuildWidgetSaga : Saga<BuildWidgetSagaData>, IAmStartedByMessages<BuildWidgetCommand>

...

public void Handle(BuildWidgetCommand message)
{
    Data.BatchId = Guid.NewGuid();
    Data.NeedsComponentB = message.NeedsComponentB;
    Data.Tracer = message.Tracer;

    Bus.SendLocal(new BuildWidgetSaga_GetComponentA(Data.BatchId));
}

What we do in the first step is very simple - collect and make note of what was requested and then determine our next/first action. In our case, it will always be to Obtain Component A

public void Handle(BuildWidgetSaga_GetComponentA message)
{
    ...
    db.Execute("UPDATE TOP(1) Component_A SET batchid = @0 WHERE batchid IS NULL", Data.BatchId);
    
    var componentId = db.Single<Component_A>("WHERE batchid = @0", Data.BatchId);
    
    Data.ComponentAId = componentId.id;
    ...
}

What we do in this step ultimately has the same impact on our system as the handler from before. The main difference is that we save our Component ID to our SagaData (State), and move on to our next task.

public void Handle(BuildWidgetSaga_GetComponentA message)
{
    ...
    if (Data.NeedsComponentB)
    {
        Bus.SendLocal(new BuildWidgetSaga_GetComponentB(Data.BatchId));
        return;
    }
    Bus.SendLocal(new BuildWidgetSaga_CreateWidget(Data.BatchId));
}

Finally, when we arrive at our last step (to create the widget), we should have everything we need on our SagaData.

public void Handle(BuildWidgetSaga_CreateWidget message)
{
    ...
    var widget = new Widget()
    {
        id = Data.BatchId,
        Name = DateTime.Now.Ticks.ToString(),
        CreatedDate = DateTime.Now,
        ModifiedDate = DateTime.Now,
        component_a_id = Data.ComponentAId.HasValue ? Data.ComponentAId.Value : null as Guid?,
        component_b_id = Data.ComponentBId.HasValue ? Data.ComponentBId.Value : null as Guid?
    };
    db.Insert(widget);
    ...
}

Obviously, theres a few details under the hood for everything to wire up nicely - Saga Mappings, Transactions, Idempotency (to name a few). Check here if you want to see everything.

Results

Messages: 10,000
Completion time: 750 seconds

Hmmm, 7x slower?

So what happens if we just throw more workers at it?

<TransportConfig MaximumConcurrencyLevel="12" ... />

Results

Messages: 10,000
Completion time: 765 seconds

The problem here is we’ve only created the opporunity to scale. We have all 14 workers doing the same group of tasks. Any number of them could be obtaining a component or creating the widget. What we need to do is assign roles to our workers - like an assembly line.

Scaling out the Saga

To scale the saga out we’ll need to move our Saga into a shared library, and simply change the endpoints for each step in the saga. I’ve broken this out into the following services:

  • Service (original, starts the process)
  • ComponentA
  • ComponentB
  • Widget

For simplicity, I opted to specify the endpoint in the code:

Bus.Send("ComponentAService", new BuildWidgetSaga_GetComponentA(Data.BatchId));
...
Bus.Send("ComponentBService", new BuildWidgetSaga_GetComponentB(Data.BatchId));
...
Bus.Send("WidgetService", new BuildWidgetSaga_CreateWidget(Data.BatchId));
...
Bus.Send("Service", new ProcessComplete());

The result is 4 services, with 4 workers, and 4 queues. Each service has a reference to the same Saga but is only responsible for a single part of the process. To coordinate this, we must move to a share persistence mode. In this case, I went with NHibernate.

configuration.UsePersistence<NHibernatePersistence>();

Results

Messages: 10,000
Completion time: 264 seconds

Scaling With Routing Slips

Now we’re making progress! In all honesty, that’s not too bad either when you look at how much is being done. So much so, that it sure seems like a lot of work to just pass 2 key values from handler to handler.

We can do this through the use of Routing Slips. With them, we use the same message, but pass it to N number of handlers. We do so by configuring a RoutingSlip - which is simply an Itinerary of endpoints, a Log of previously visited endpoints, and Attachments (such as key values).

Setting It Up

Routing Slips can be found on Nuget under the Message Routing project. At the time of this article, Message Routing works with NSB 5 and .NET 4.5.1.

Install-Package NServiceBus.MessageRouting

Once installed, we’ll need to enable them from our config:

configuration.RoutingSlips();

Refactoring From Our Saga

The most drastic change is where we start our process. Before, this was signified by the IAmStartedByMessages interface. Now, we move back to just a handler but with a twist.

public class BuildWidgetHandler : IHandleMessages<BuildWidgetCommand>
{
    public void Handle(BuildWidgetCommand message)
    {
        var destinations = new List<string>();
        destinations.Add("ComponentAService");
        
        if(message.NeedsComponentB)
            destinations.Add("ComponentBService");

        destinations.Add("WidgetService");

        Bus.Route(message, destinations.ToArray());
    }
}

The key to this is that we no longer need to use Bus.Send(). In place of this, we use Bus.Route() - which hands our message and Itinerary off to the routing framework for execution. Under the hood, the framework will send the message along to the first address in the destinations list we provide. After the message is handled, the destination is recorded in the Log and the message is routed off to the next destination.

In our case, destinations represent steps.

public class BuildWidgetHandler : IHandleMessages<BuildWidgetCommand>
{
    private readonly RoutingSlip _routingSlip;

    public BuildWidgetHandler(RoutingSlip routingSlip)
    {
        _routingSlip = routingSlip;
    }

    public void Handle(BuildWidgetCommand message)
    {
        ...
        db.Execute("UPDATE TOP(1) Component_A SET batchid = @0 WHERE batchid IS NULL", message.BatchId);
        var componentA = db.Single<Component_A>("WHERE batchid = @0", message.BatchId);

        _routingSlip.Attachments["componentAId"] = componentA.id.ToString();      
    }
}

The jobs our steps do remain the same. The ceremony of how they do it is simplified.

  • We no longer need to specify the next step or route the message ourselves.
  • Instead of SagaData we now use Attachments, which are available on the RoutingSlip
  • To get the current RoutingSlip we simply add it as a parameter in our handler’s constructor

Once we arrive at the final assembly station, we simply pull the elements out of the Attachments that we need.

var componentAId = Guid.Parse(_routingSlip.Attachments["componentAId"]);

Results

Messages: 10,000
Completion time: 159 seconds

Conclusion

Routing Slips are pretty awesome and work great in workflow scenarios. They won’t however, replace Sagas for long-running processes where having state is a must.

If you want the full source, and each example, I’ve posted it up on Github.