Home / .NET / Another attempt to batch complete with Azure Service Bus

Another attempt to batch complete with Azure Service Bus

In the last post, we tried to be smart with batch completion and completed every one-hundredth message. Unfortunately, this way of doing completion might not work under with few messages in the queue. We decided to go back to the drawing board and see if we can come up with a better approach which can cope with high but also small loads of messages. The heart of the code we wrote in the last post (omitting the exception handling part) looks like

var lockTokensToComplete = new ConcurrentStack<Guid>();
 
receiveClient.OnMessageAsync(async message =>
{
   // same as before
   await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
   lockTokensToComplete.Push(message.LockToken);
   // same as before
},..)

Instead of completing messages in batches of hundred lock tokens we can try to move out the batch completion into a dedicated batch completion task. Hopefully that is not too hard. Let’s see

var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var batchCompletionTask = Task.Run(async () => {
   while(!token.IsCancellationRequested) {
      var lockTokens = new Guid[100];
      int numberOfItems = lockTokensToComplete.TryPopRange(lockTokens)
      if(numberOfItems > 0) {
         await receiveClient.CompleteBatchAsync(lockTokens).ConfigureAwait(false);
      }
      await Task.Delay(TimeSpan.FromSeconds(5), token).ConfigureAwait(false);
   }
});

We schedule a task to be run on the worker thread pool to offload the actual completion loop away from the thread that is starting the task. We loop until the cancellation is requested on the cancellation token source which is linked to the token passed as closure into the completion loop. We then try top pop a range of lock tokens from the ConcurrentStack. If we received more than zero items, we would complete the lock tokens we popped on the receiveClient. At the end of the loop, we asynchronously sleep until either we shut down or five seconds are over.

This is beautiful and straightforward. We have a dedicated completion circuit, the contention on the concurrent stack is in a controllable range. Under small load, we complete lock tokens in in batches of one to maximum one hundred tokens. If we receive only a limited number of messages, the loop might complete messages with their tokens one by one (for example when we receive a message every 6 seconds). But what happens when the load increases?

When we’d received several hundred messages per seconds our randomly chosen “complete every one-hundredth messages” and then “sleep for five seconds” might turn out to be a suboptimal choice. We will dive more into this topic in upcoming posts. Could we try to optimize a few things in this simple loop? Feel free to exercise your brain or just wait for the next post 😉