Your source of geek knowledge

Introduction to the MultiProducerConcurrentConsumer for Azure Service Bus message completion

In the last post in the series about Azure Service Bus message completion, I briefly talked how the message batch completion could benefit from a more reactive approach. Before I dive into the inner workings of the reactive structure that I’m going to outline in the next few posts, I need to briefly recap the functional and non-functional requirements we need from the component.

The component shall

  • Make sure messages are only completed on the receiver they came from
  • Reduce the number of threads used when the number of clients is increased
  • Autoscale up under heavy load
  • Scale down under light load
  • Minimise the contention on the underlying collections used
  • Be completely asynchronous
  • Implements a push based model from the producer and consumer perspective
  • Respect the maximum batch sized defined by the client of the component or a predefined push interval
  • Provide FIFO semantics instead of LIFO
  • Be as lock-free as possible

From a design viewpoint the MultiProducerConcurrentConsumer (MPCC) component will look like

The MPCC manages internally N number of slots that can contain items of type TItem. Whenever the timeout or the batch size is reached the component fans out up to a provided concurrency per slot the completion callback.

class MultiProducerConcurrentConsumer<TItem> {
    public MultiProducerConcurrentConsumer(int batchSize, TimeSpan pushInterval, int maxConcurrency, int numberOfSlots) { }

    public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump) { }

    public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump, object state) { }

    public void Push(TItem item, int slotNumber) { }

    public async Task Complete(bool drain = true) { }
}    

Shown above is the class outline that we are going to use for the component. The constructor allows specifying the batch size, the push interval, the number of slots and the concurrency per slot. The component has a method to push items to it called Push. As soon as the component is started the provided pump delegate will be called whenever the batch size is reached or the push interval. The concurrency per slot is probably best explained by giving an example.

Let’s assume we have a batch size of 500 and a push interval of 2 seconds. When producers can produce 1500 items per second the batch size is reached three times per second. MPCC will auto-scale in that case up to the provided concurrency. So in the above example, it will take out 500 items and invoke the callback and then concurrently try to fetch another 500 items and again another 500 items until the slot is empty or the maximum concurrency is reached.

The Complete method is available for the deterministic stopping of the MPCC. It allows to asynchronously wait for all slots to be drained (by default) or immediately stop and throw away all items that haven’t been processed.

In the next post, we are going to depict the inner workings of the MultiProducerConcurrentConsumer, hope you’ll enjoy.

Book review: Effective Debugging – 66 Specific Ways to Debug Software and Systems by Diomidis Spinellis

Target audience: developers
Effective Debugging Cover

Philipp’s comment: Effective Debugging contains 66 recipes that show you how to track, find and fix bugs with less headaches. The recipes are neatly grouped into chapters. Every recipe has a Things to Remember section at the end which wraps up the described technique.
Some of the recipes are very basic and should be in every developer’s arsenal; at least after having read the book. Some recipes are meant for the hard to crack cases while some may seem very obvious. I like the completeness of Diomidis Spinellis’ approach. It reminds us that successful debugging starts with mastering the basics. If you’re already familiar with a technique, you can always jump ahead to the Things to Remember section and continue with the next technique.

If you’re a .NET developer like me, you will appreciate the tooling you get out-of-the-box even more after having read the book. Though for some of the recipes in the book I would like to see a sample for the .NET universe.

To have a more complete list of Debugging Tools in the .NET Universe and insight on how to use them I’m going to post more articles in the next couple of months.

The book is definitely worth a read and serves as a reference for the more advanced, rarely used techniques.
Go grab your copy today and let me know what you think about the book. Which technique did leverage your debugging skills the most?

ISBN-13: 978-0134394794

Complete messages where they came from with Azure Service Bus

In the last post, we made a simplistic attempt to speed up batch completion of messages by having multiple dedicated background completion tasks. Unfortunately, this introduced an interesting side effect.


static async Task BatchCompletionLoop() {
   while(!token.IsCancellationRequested) {
         ...
         await receiveClient.CompleteBatchAsync(lockTokens).ConfigureAwait(false);
         ...
      }
      await Task.Delay(TimeSpan.FromSeconds(5), token).ConfigureAwait(false);
   }
}

The code above is using a single receive client to complete message lock tokens in batches. Those lock token could come from multiple receive clients, but they would end up being completed on the same instance. Depending on the transport type you use it might work or it won’t. When you are using the NetMessaging (also known as SBMP) transport type the above code will just work. If you switch to AMQP as the transport type, the code will blow up with the following exception

Microsoft.ServiceBus.Messaging.MessageLockLostException: The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue.
   at Microsoft.ServiceBus.Common.AsyncResult.End[TAsyncResult](IAsyncResult result)
   at Microsoft.ServiceBus.Messaging.MessageReceiver.EndCompleteBatch(IAsyncResult result)

For Azure Service Bus v3 and higher AMQP is the new standard protocol and SMBP is an opt-in choice.

The proprietary SBMP protocol that is also supported is being phased out in favor of AMQP. see documentation

What does that mean for our message completion? To have a robust completion code which works for both transport types, we have to make sure we only complete lock tokens on receivers the messages were coming from.

In its simplest form, we would need the following moving pieces. A concurrent stack or queue per message receivers, a generic message handling body which closes over the concurrent stack as an input parameter and similarly a parameterized completion loop.

var lockTokensToComplete = new ConcurrentStack<Guid>[numberOfReceivers];
// initialize the concurrent stacks
  
receiveClient1.OnMessageAsync(message => ReceiveMessage(message, lockTokensToComplete[0]);
...
receiveClientN.OnMessageAsync(message => ReceiveMessage(message, lockTokensToComplete[N-1]);

static async Task ReceiveMessage(BrokeredMessage message, ConcurrentStack<Guid> lockTokensToComplete) {
   await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
   lockTokensToComplete.Push(message.LockToken);
}

and the completion loop

for(int i = 0; i < numberOfReceivers; i++) { 
   completionTasks[i] = Task.Run(() => BatchCompletionLoop(receivers[i], lockTokensToComplete[i]));
}
 
static async Task BatchCompletionLoop(MessageReceiver receiver, ConcurrentStack<Guid> lockTokensToComplete) {
   while(!token.IsCancellationRequested) {
    var lockTokens = new Guid[100];
      int numberOfItems = lockTokensToComplete.TryPopRange(lockTokens)
      if(numberOfItems > 0) {
         await receiver.CompleteBatchAsync(lockTokens).ConfigureAwait(false);
      }
      await Task.Delay(TimeSpan.FromSeconds(5), token).ConfigureAwait(false);
   }
}

We have reduced the contention since we have a dedicated lock token concurrent collection to operate on per receiver. We have a dedicated completion loop which is self-contained and only operates on the correct receiver and lock token collection pair. So far so good. Despite that we are still wasting a lot of unnecessary resources. We acquired up to the number receivers dedicated completion background tasks which might idle. And we still don’t exactly know if this code constructs scales elastically up and down depending on the number of messages received and the number of clients.

Ideally, we would have a reactive approach which automatically adapts based on parameters we give it. Building such a reactive approach will be the focus of the next few posts. Stay reactive!

Batch completion with multiple receivers on Azure Service Bus

In the last post, we created multiple receivers with multiple factories to speed up our message processing. Unfortunately, this has some consequences for our background completion loop. Since the message processing logic is shared between multiple receivers, all the receivers will try to push lock tokens into the concurrent stack. So the following code will be called by numberOfReceivers * concurrencyPerReceiver concurrently.

// same as before
await DoSomethingWithTheMessageAsync().ConfigureAwait(false);
lockTokensToComplete.Push(message.LockToken);
// same as before

So for example when we’d use 10 receivers with each a concurrency setting of 32 we’d be ending up pushing lock tokens to the concurrent stack from up to 320 simultaneous operations. Not a big deal you could say since the ConcurrentStack implementation is lock-free. Unfortunately, lock-free doesn’t necessarily mean it is contention free.

ConcurrentQueue<T> and ConcurrentStack<T> are completely lock-free in this way. They will never take a lock, but they may end up spinning and retrying an operation when faced with contention (when the CAS operations fail). Read more

The concurrent stack implementation internally uses a technique called spinning and retrying (for more information see the excellent Blocking vs. Spinning section from Joe Albahari). Under contention with a large number of concurrent operations these operations can fail multiple times and might become less efficient than we assumed them to be.

In the previous post I made the following statement:

When we’d received several hundred messages per seconds our randomly chosen “complete every one-hundredth messages” and then “sleep for five seconds” might turn out to be a suboptimal choice.

With multiple receivers pushing lock tokens into the concurrent stack, we might have made our problem even worse. It is entirely possible that our multiple concurrent receivers can fill the concurrent stack faster with lock tokens than our completion loop manage to complete. Our previously chosen five seconds sleep duration might turn out to be way too long under heavy load.

How about we just spin up multiple batch completion tasks like the following?

var completionTasks = new Task[numberOfReceivers];

for(int i = 0; i < numberOfReceivers; i++) { 
   completionTasks[i] = Task.Run(() => BatchCompletionLoop());
}

static async Task BatchCompletionLoop() {
   while(!token.IsCancellationRequested) {
    var lockTokens = new Guid[100];
      int numberOfItems = lockTokensToComplete.TryPopRange(lockTokens)
      if(numberOfItems > 0) {
         await receiveClient.CompleteBatchAsync(lockTokens).ConfigureAwait(false);
      }
      await Task.Delay(TimeSpan.FromSeconds(5), token).ConfigureAwait(false);
   }
}

Now we’ve just made the contention problem on the concurrent stack even worse. Multiple background completion operations are competing on the concurrent stack as well. Because all those completion tasks would have the same Task.Delay value, there is a high chance we would see a pattern where multiple background completion operations are dispatch on a worker thread but only a few would succeed and idle again, effectively wasting a lot of resources in production.

Leaving contention and resource waste aside we’ve now introduced another major flaw in our completion logic. Can you spot it? If not, don’t worry. I’ll pick it up in the next post.