Home / .NET / MultiProducerConcurrentConsumer – Push in batches up to concurrency per slot

MultiProducerConcurrentConsumer – Push in batches up to concurrency per slot

In the previous post I introduced the PushInBatches method who’s responsibility it is to start the PushInBatchesUpToConcurrencyPerQueueForAGivenSlot process. In this post I will focus on the mouthful method above.

Like the name of the method already suggest its responsibility is to push items for a slot in batches to the pump delegate while respecting the concurrency settings per slot as provided by the user and explained in the introduction post.

Without further ado here is the whole code.

void PushInBatchesUpToConcurrencyPerQueueForAGivenSlot(ConcurrentQueue<TItem> queue, 
    int currentSlotNumber, List<Task> tasks)  {

    int numberOfItems;
    var concurrency = 1;
    do {
        numberOfItems = 0;
        List<TItem> items = null;
        for (var i = 0; i < batchSize; i++) {
            TItem item;
            if (!queue.TryDequeue(out item)) {
                break;
            }

            if (items == null && !itemListBuffer.TryDequeue(out items)) {
                items = new List<TItem>(batchSize);
            }

            items.Add(item);
            numberOfItems++;
        }

        if (numberOfItems <= 0) {
            return;
        }

        Interlocked.Add(ref numberOfPushedItems, -numberOfItems);
        concurrency++;
        var task = pump(items, currentSlotNumber, state, tokenSource.Token)
        .ContinueWith((t, taskState) => {

            var itemListAndListBuffer = 
                (Tuple<List<TItem>, ConcurrentQueue<List<TItem>>>) taskState;
            itemListAndListBuffer.Item1.Clear();
            itemListAndListBuffer.Item2.Enqueue(itemListAndListBuffer.Item1);

        }, Tuple.Create(items, itemListBuffer), TaskContinuationOptions.ExecuteSynchronously);
        tasks.Add(task);
    } while (numberOfItems == batchSize && concurrency <= maxConcurrency);
}

The method sets the initial concurrency to one and starts a while loop. The while loop will continue as longs as the numberOfItems that have been picked from the slots is equal to the batch size and the concurrency is less or equal than the maximum concurrency that is allowed per slot. Inside the while loop, there are essentially two subroutines. The first part consists of a for loop to get items from the queue and the second part concurrently dispatches the pump tasks as well as a cleanup continuation which returns and clears the borrowed item buffer and makes it available to be reused again.

The for loop starts from zero to the batch size and tries to dequeue items from the queue of the slot. When there is nothing to dequeue the for loop is left. When there is something to dequeue, and the items buffer has not yet been acquired it tries to borrow a buffer from the buffer pool. After that, the item is added to the items list and the numberOfItems is increased by one. This process continues until the queue has no more items or the batch size is reached. When numberOfItems is zero, we return because there is nothing ready to be pushed to the pump delegate.

When there are items to be pushed we first decrease the numberOfPushedItems with numberOfItems in a thread safe manner (using Interlocked.Add). Then we increase the current concurrency. After that, the pump delegate is executed without awaiting the task. Since we want to execute the pump delegate concurrently, the task cannot be awaited If the task would be awaited the code would execute sequentially instead of concurrently. To the pump task a continuation is attached that avoids closure allocations by passing in the state the lambda body needs into the state parameter of the ContinueWith method. The lambda extracts the state when it is executed and clears the buffer list and returns the buffer list to the buffer pool for further reuse. The continuation has the TaskContinuationOptions.ExecuteSynchronously to hint to the TPL runtime that the lambda executed there doesn’t need to be rescheduled (it might still be depending on runtime circumstances but I’m getting off topic here). The continuation task is then added to the task list so that the PushInBatches method can await all those tasks concurrently.

In the next instalment, I’m going to shed some light on the asynchronous synchronisation primitive I quickly described in the Push It post. I hope you enjoy so far this deep dive series into the MPCC component. Feel free to leave comments, likes, criticism and more over the usual channels.