The ‘queue’ problem

When using messaging, in some scenarios many messages are generated as a result of an incoming message to process the initial request. Normally a fan-out pattern or competing-consumers can ensure these messages are processed inline with the designated SLA as we can scale out the processing side of it, but the queuing nature of messaging is also an important factor to remember.

Imagine an incoming request to process a batch of work like image processing, or text file cleaning, or downloading web pages from the internet. The initial incoming request could generate thousands of messages to deal with the request (e.g. a message per work item). Meanwhile a second message comes in and similarly generates thousands of related messages to deal with its work items. The work items for the second message will queue behind the first request’s messages and this could delay the processing for the second request beyond its SLA. This design is also wasteful when you consider cancellation scenarios. When the initial request is cancelled, you will still need to churn through all the generated messages that are in the queue, even though there is nothing to do. Can we do any better?

Staggered-batch processing

An alternative approach is to not generate all the messages upfront and generate them in batches. We’ll need some sort of a persistence to remember how many of the messages are generated, and generate a new batch once a batch is fully processed. By not generating all the messages upfront, we’re not congesting the queue and in case of cancellations we simply stop generating more messages.

Here ‘batch’ synonymous to a group of messages generated to handle the work process.

While generally you can do this with vanilla messaging and a database, you will need to take consistencies between message queue and database and deal with concurrency issues. What happens when two messages try to update the database at the same time? These are already done by Saga abstractions for you, so you can focus on the business code, like in this case, batching the request and processing them. Let’s see how this can be done using NServiceBus saga implementations.

The saga to generate and track work orders has main events:

  • Start a new instance when a work request comes in. At this point, we also start importing the first batch. (StartProcessing)
  • Wait for acknowledgement of a batch being imported and import the next batch (WorkOrderCompleted)
  • Finish up when all batches are imported and terminate (WorkAllDone)

When importing the next batch, either all work requests are already generated (no more batches to generate) or we need to generate the next one.

1
2
3
4
5
6
7
8
9
10
11
async Task ImportNextBatch(IMessageHandlerContext context)
{
if (Data.Progress.AllWorkCompleted(Data.WorkCount))
{
await FinishWork(context);
}
else if (Data.Progress.HasRemainingWork(Data.WorkCount))
{
await SendWorkRequest(MakeNextBatch(), context);
}
}

The saga Data access allows concurrent and thread safe access to the progress information. The concurrency checks built into the saga means the Data is never updated by two concurrently running messages at the same time. This is all transparent to the users’ code.

The messages to process the work order (ProcessWorkOrder) is sent to a different queue, so it can be scaled out as it makes sense. The processing time of the messages can be monitored so that the number of consumers can be adjusted accordingly.

The output of this process is that the messages are generated in batches (here a batch consists of 100 messages):

1
2
3
4
5
6
7
8
Starting the process for '987' work orders.
Queueing next batch of work orders: (1 - 100).
Importing the next batch of work.
Queueing next batch of work orders: (101 - 200).
...
Importing the next batch of work.
Queueing next batch of work orders: (901 - 987).
All done.

Caveats

This process halts when a message errors out on the processing side, as the subsequent batches are no longer generated and processed. This may be desired in certain scenarios where the work as a whole makes sense (failing a request can fail the whole work), but if partial processing is acceptable this will need to change. There are a few strategies to handle this:

  • The generation of the messages can be done via a time out. This will stagger the generation side so that the processing side can handle the messages as they come in.

  • More elaborate work tracking can be designed so that errors from processing side are also reported. The errored tasks can be retried/discarded as appropriate.

Summary

The thread and concurrent safe programing model of Sagas allows easier redesigning of fan-out message processes in a way that is more efficient and results in better overall SLAs. The sample code can be found on my github repository.