Your source of geek knowledge

MultiProducerConcurrentConsumer – Push in batches

In the last post, I introduced the push method and the timer loop of the MultiProducerConcurrentConsumer. In this post, I’ll focus on the actual PushInBatches method that gets called by the timer loop.

Task PushInBatches() {
    if (Interlocked.Read(ref numberOfPushedItems) == 0) {
        return TaskEx.Completed;
    }

    for (var i = 0; i < numberOfSlots; i++) { 
       var queue = queues[i]; 

       PushInBatchesUpToConcurrencyPerQueueForAGivenSlot(queue, i, pushTasks); 
    } 

    return Task.WhenAll(pushTasks).ContinueWith((t, s) => {
        var tasks = (List<Task>) s;
        tasks.Clear();
    }, pushTasks, TaskContinuationOptions.ExecuteSynchronously);
}

When the method is entered it first checks whether there is anything to push. Since numberOfPushedItems is accessed by multiple threads it uses Interlocked to read it and compare the returned value. When there is nothing to push a completed task is return. By not using the async keyword here we avoid the state machine generated by the compiler. Bear in mind this trick should not be blindly applied.

Then it loops through all the slots. The queue per slot is taken from the queues array, and PushInBatchesUpToConcurrencyPerQueueForAGivenSlot is called with the queue of the current slot and the pushTasks list. Remember the push task list and the queues array was previously allocated as members of the class for efficient reuse.

At the end of the method, a task that completes is created when all of the tasks contained in pushTasks are completed with Task.WhenAll. A synchronous continuation is scheduled that will make sure the pushTasks list gets cleared at the end of the execution. That continuation task is returned to the caller which is the timer loop. This means the timer loop will only continue when the pushTask list is cleared. So instead of creating a list of tasks for every loop cycle we reuse the task list and clear it after each push cycle which reduces the garbage collection pressure.

Here I used another not so well known trick. Instead of accessing the pushTasks list directly and therefore creating a closure over this, I’m passing the pushTasks list as a state parameter to the ContinueWith method and extract the state inside the body of the continuation. Which save an Action delegate allocation per push cycle.

In the next installment, we’ll look into PushInBatchesUpToConcurrencyPerQueueForAGivenSlot since it is complex enough to warrant a dedicated post.

MultiProducerConcurrentConsumer – Push it

In the last post, I introduced the start procedure for the MultiProducerConcurrentConsumer including a rough outline of the timer loop responsible for pushing items in batches after the push interval is elapsed.

To understand how the batch size affects the push let me first explain the push method. The push method’s responsibility is primarily to insert the item of type TItem into the right slot. To achieve that the passed in slot number is range checked against the available number of slots (of course a lower bound would not hurt either 😉 ). The passed in slot number is used to pick the concurrent queue that represents the slot, and the item is enqueued into the slot.

public void Push(TItem item, int slotNumber)
{
    if (slotNumber >= numberOfSlots)
    {
        throw new ArgumentOutOfRangeException(nameof(slotNumber),
          $"Slot number must be between 0 and {numberOfSlots - 1}.");
    }

    queues[slotNumber].Enqueue(item);

    var incrementedCounter = Interlocked.Increment(ref numberOfPushedItems);

    if (incrementedCounter > batchSize)
    {
        syncEvent.Set();
    }
}

The second responsibility of the push method is to count the number of items that were inserted into the MPCC. In my version of the MPCC, I made a simple trade off. Instead of having a counter per slot I’ve chosen a global counter which counts all the pushed items called numberOfPushedItems. I think it is a reasonable tradeoff and makes the implementation much simpler but still achieves its goal (what do you think?). Since multiple threads could insert data concurrently, I’m using Interlocked.Increment to increment the counter. Whenever the incremented counter is greater than the batch size, the push method signals a synchronization primitive* to unleash the push process. But how does the MPCC know when the batch size is reached?

It does it by observing the synchronization primitive. Let’s expand the previously shown TimerLoop slightly:

async Task TimerLoop()
{
    var token = tokenSource.Token;
    while (!tokenSource.IsCancellationRequested)
    {
        try
        {
            await Task.WhenAny(Task.Delay(pushInterval, token),
               syncEvent.WaitAsync(token)).ConfigureAwait(false);
            await PushInBatches().ConfigureAwait(false);
        }
        catch (Exception)
        {
            // intentionally ignored
        }
    }
}

So instead of only doing a Task.Delay with the push interval we combine the Task.Delay with the syncEvent.WaitAsync into a Task.WhenAny. What it means is the following: Task.WhenAny will return when any of the tasks inside the array passed into the method is completed (successfully, faulted or canceled). So the await will continue when either the push interval task is elapsed (or canceled), or the synchronization primitive was set (or the token canceled). This code has a slight deficiency. Every time we reach the batch size before the push interval has elapsed the task representing the push interval will “leak” until the interval is elapsed. But we will ignore this for now and potentially talk about this in another post.

The code shown so far is now capable of getting items pushed to it into the right slot and when either the push interval or the batch size is reached the component will push items in batches to the pump delegate. In the next post we will cover the actual PushInBatches method.

* I’m using an AsyncAutoResetEvent which I will explain in a later post.

MultiProducerConcurrentConsumer – Start it

In the previous instalment, we discussed the technique of preallocating and reuse to save allocations during runtime. In this post, I will introduce the heart of the MPCC which is the start method and the timer loop outline.

The MPCC can get started at any time after construction. If the component is started, it will start pushing items to the pump delegate whenever the batch size is reached or the push interval. This is how the Start method looks like

public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump)
{
    Start(pump, null);
}

public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump, object state)
{
    if (started)
    {
        throw new InvalidOperationException("Already started");
    }

    tokenSource = new CancellationTokenSource();
    timer = Task.Run(TimerLoop);
    this.pump = pump;
    this.state = state;
    started = true;
}

The MPCC can be started once only. There are two variations for starting, a state based overload and a func which doesn’t provide a state. The state based overload is here to enable scenarios where you want to avoid closure capturing. So you can pass in the state that is required to execute the functionality inside the pump delegate implementation.

Furthermore, the component requires a cancellation token source and a timer loop that executes the interval based pushing of items to the pump. Let’s have a quick look at the timer loop implementation (a few details are left out since we will be exploring those in future posts):

async Task TimerLoop()
{
    var token = tokenSource.Token;
    while (!tokenSource.IsCancellationRequested)
    {
        try
        {
            await Task.Delay(pushInterval, token).ConfigureAwait(false);
            await PushInBatches().ConfigureAwait(false);
        }
        catch (Exception)
        {
            // intentionally ignored
        }
    }
}

The timer loop is scheduled to be executed on the worker thread pool. In its heart, we have a while loop that loops until the token indicates a cancellation was requested. Inside the loop, the component uses a Task.Delay to asynchronously wait until the push interval was reached or the token was cancelled. After the push interval is over the method PushInBatches is called. In this implementation, I choose to ignore any exceptions that might be happening either in the Task.Delay (i.ex. OperationCanceledException was thrown) or exceptions bubbling up from PushInBatches.

Now we have a simple approach to push items in batches based on the push interval defined in the constructor. But this doesn’t address the batch size based pushing of items. We’ll cover this in the next posts.

MultiProducerConcurrentConsumer – Preallocate and reuse

In the last post, I introduced the MultiProducerConcurrentConsumer component. In this post, I’m going to walk through the constructor of the component and explain the preallocation trade-off that was made.

With almost every code that is executed on the hot path of your library, framework or system you need to make tradeoffs regarding allocations. In the end, it boils down to roughly two choices:

  • Allocate as much as you need upfront
  • Allocate on-demand when you need it

The benefit of only allocating when needed is that the memory consumption only grows when it is needed. The downside of this approach is that under highly concurrent scenarios allocating structures in a safe and lock-free way can be tricky.

The drawback of allocating upfront is that even though your application might never need it, the memory is preallocated and will stay that way. On the other hand, if we allocate the potentially needed structures upfront we never need to allocate during concurrent execution. Unfortunately preallocating upfront only works when you know the boundary conditions of the problem at hand.

With the MPCC structure, we can make a few assumptions which allow us to pre-allocate internally needed structures.

public MultiProducerConcurrentCompletion(
    int batchSize, TimeSpan pushInterval, int maxConcurrency, int numberOfSlots)
{
    this.maxConcurrency = maxConcurrency;
    this.pushInterval = pushInterval;
    this.batchSize = batchSize;
    this.numberOfSlots = numberOfSlots;

    queues = new ConcurrentQueue<TItem>[numberOfSlots];
    for (var i = 0; i < numberOfSlots; i++)
    {
        queues[i] = new ConcurrentQueue<TItem>();
    }
    
    var maxNumberOfConcurrentOperationsPossible = numberOfSlots*maxConcurrency;
    pushTasks = new List<Task>(maxNumberOfConcurrentOperationsPossible);

    itemListBuffer = new ConcurrentQueue<List<TItem>>();
    for (var i = 0; i < maxNumberOfConcurrentOperationsPossible; i++)
    {
        itemListBuffer.Enqueue(new List<TItem>(batchSize));
    }
}

Since we know the number of slots up front and we defined the component needs FIFO semantics we can use an array of ConcurrentQueue<TItem> (Line 9) and create all the queues for all the slots ahead of time (Line 10-13).

The maximum concurrency is defined per slot. So we can calculate the maximum number of concurrent operations that are possible at any given time to be number of slots multiplied with maximum concurrency (Line 15). As soon as we calculated maxNumberOfConcurrentOperationsPossible we can allocate a List<Task> which is going to hold concurrently happening operations for graceful shutdown purposes (Line 16).

Last but not least we make the following tradeoff. During runtime, the component will get many items pushed to it and complete items in batches. If we would allocate a List<TItem> everytime the pump callback is called, we’d be creating a lot of Gen0 Garbage. Namely maxNumberOfConcurrentOperationsPossible List<TItem> at maximum per push interval. Depending on the push interval settings this could drastically impact the performance. That’s why the MPCC creates a ConcurrentQueue<List<TItem>> and enqueues List<TItem>(batchSize) into it(Line 18-22). This queue will serve as a List<TItem> buffer. Effectively we are reusing the lists during the whole lifetime of the component. You might have noticed that we initialize the list with the batch size that is also known upfront to prevent unnecessary growing of the lists.

Hope you like this so far. Stay tuned for the next post about MPCC.

Introduction to the MultiProducerConcurrentConsumer for Azure Service Bus message completion

In the last post in the series about Azure Service Bus message completion, I briefly talked how the message batch completion could benefit from a more reactive approach. Before I dive into the inner workings of the reactive structure that I’m going to outline in the next few posts, I need to briefly recap the functional and non-functional requirements we need from the component.

The component shall

  • Make sure messages are only completed on the receiver they came from
  • Reduce the number of threads used when the number of clients is increased
  • Autoscale up under heavy load
  • Scale down under light load
  • Minimise the contention on the underlying collections used
  • Be completely asynchronous
  • Implements a push based model from the producer and consumer perspective
  • Respect the maximum batch sized defined by the client of the component or a predefined push interval
  • Provide FIFO semantics instead of LIFO
  • Be as lock-free as possible

From a design viewpoint the MultiProducerConcurrentConsumer (MPCC) component will look like

The MPCC manages internally N number of slots that can contain items of type TItem. Whenever the timeout or the batch size is reached the component fans out up to a provided concurrency per slot the completion callback.

class MultiProducerConcurrentConsumer<TItem> {
    public MultiProducerConcurrentConsumer(int batchSize, TimeSpan pushInterval, 
int maxConcurrency, int numberOfSlots) { }

    public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump) { }

    public void Start(Func<List<TItem>, int, object, CancellationToken, Task> pump, object state) { }

    public void Push(TItem item, int slotNumber) { }

    public async Task Complete(bool drain = true) { }
}    

Shown above is the class outline that we are going to use for the component. The constructor allows specifying the batch size, the push interval, the number of slots and the concurrency per slot. The component has a method to push items to it called Push. As soon as the component is started the provided pump delegate will be called whenever the batch size is reached or the push interval. The concurrency per slot is probably best explained by giving an example.

Let’s assume we have a batch size of 500 and a push interval of 2 seconds. When producers can produce 1500 items per second the batch size is reached three times per second. MPCC will auto-scale in that case up to the provided concurrency. So in the above example, it will take out 500 items and invoke the callback and then concurrently try to fetch another 500 items and again another 500 items until the slot is empty or the maximum concurrency is reached.

The Complete method is available for the deterministic stopping of the MPCC. It allows to asynchronously wait for all slots to be drained (by default) or immediately stop and throw away all items that haven’t been processed.

In the next post, we are going to depict the inner workings of the MultiProducerConcurrentConsumer, hope you’ll enjoy.

Book review: Effective Debugging – 66 Specific Ways to Debug Software and Systems by Diomidis Spinellis

Target audience: developers
Effective Debugging Cover

Philipp’s comment: Effective Debugging contains 66 recipes that show you how to track, find and fix bugs with less headaches. The recipes are neatly grouped into chapters. Every recipe has a Things to Remember section at the end which wraps up the described technique.
Some of the recipes are very basic and should be in every developer’s arsenal; at least after having read the book. Some recipes are meant for the hard to crack cases while some may seem very obvious. I like the completeness of Diomidis Spinellis’ approach. It reminds us that successful debugging starts with mastering the basics. If you’re already familiar with a technique, you can always jump ahead to the Things to Remember section and continue with the next technique.

If you’re a .NET developer like me, you will appreciate the tooling you get out-of-the-box even more after having read the book. Though for some of the recipes in the book I would like to see a sample for the .NET universe.

To have a more complete list of Debugging Tools in the .NET Universe and insight on how to use them I’m going to post more articles in the next couple of months.

The book is definitely worth a read and serves as a reference for the more advanced, rarely used techniques.
Go grab your copy today and let me know what you think about the book. Which technique did leverage your debugging skills the most?

ISBN-13: 978-0134394794