In the last post about the MPCC I explained how the asynchronous synchronization works by leveraging the power of a TaskCompletionSource. Today I’ll walk you through the completion method. The completion method has essentially two modes. A drain mode and a non-drain mode. In the drain mode, all items that are enqueued will be asynchronously pushed until all internal queues are empty. In the non-drain mode, the queues will be emptied but nothing will be pushed if there is something left...
MultiProducerConcurrentConsumer – Asynchronous synchronisation
In the last post I explained how the push in batches up to the concurrency per slot works. This post covers the asynchronous synchronisation approach I used to unleash the push in batches when the batch size is reached. In the code shown in the Push It post I simplified the synchronisation primitive and called it syncEvent. In reality, the code uses a TaskCompletionSource for synchronisation. The MPCC declares a field of type TaskCompletionSource<bool>. For the MPCC the TResult type of...
MultiProducerConcurrentConsumer – Push in batches up to concurrency per slot
In the previous post I introduced the PushInBatches method who’s responsibility it is to start the PushInBatchesUpToConcurrencyPerQueueForAGivenSlot process. In this post I will focus on the mouthful method above. Like the name of the method already suggest its responsibility is to push items for a slot in batches to the pump delegate while respecting the concurrency settings per slot as provided by the user and explained in the introduction post. Without further ado here is the whole...
MultiProducerConcurrentConsumer – Push in batches
In the last post, I introduced the push method and the timer loop of the MultiProducerConcurrentConsumer. In this post, I’ll focus on the actual PushInBatches method that gets called by the timer loop. When the method is entered it first checks whether there is anything to push. Since numberOfPushedItems is accessed by multiple threads it uses Interlocked to read it and compare the returned value. When there is nothing to push a completed task is return. By not using the async keyword...
MultiProducerConcurrentConsumer – Push it
In the last post, I introduced the start procedure for the MultiProducerConcurrentConsumer including a rough outline of the timer loop responsible for pushing items in batches after the push interval is elapsed. To understand how the batch size affects the push let me first explain the push method. The push method’s responsibility is primarily to insert the item of type TItem into the right slot. To achieve that the passed in slot number is range checked against the available number of...
MultiProducerConcurrentConsumer – Start it
In the previous instalment, we discussed the technique of preallocating and reuse to save allocations during runtime. In this post, I will introduce the heart of the MPCC which is the start method and the timer loop outline. The MPCC can get started at any time after construction. If the component is started, it will start pushing items to the pump delegate whenever the batch size is reached or the push interval. This is how the Start method looks like The MPCC can be started once only. There...
MultiProducerConcurrentConsumer – Preallocate and reuse
In the last post, I introduced the MultiProducerConcurrentConsumer component. In this post, I’m going to walk through the constructor of the component and explain the preallocation trade-off that was made. With almost every code that is executed on the hot path of your library, framework or system you need to make tradeoffs regarding allocations. In the end, it boils down to roughly two choices: Allocate as much as you need upfront Allocate on-demand when you need it The benefit of only...
Introduction to the MultiProducerConcurrentConsumer for Azure Service Bus message completion
In the last post in the series about Azure Service Bus message completion, I briefly talked how the message batch completion could benefit from a more reactive approach. Before I dive into the inner workings of the reactive structure that I’m going to outline in the next few posts, I need to briefly recap the functional and non-functional requirements we need from the component. The component shall Make sure messages are only completed on the receiver they came from Reduce the number of...
Complete messages where they came from with Azure Service Bus
In the last post, we made a simplistic attempt to speed up batch completion of messages by having multiple dedicated background completion tasks. Unfortunately, this introduced an interesting side effect. The code above is using a single receive client to complete message lock tokens in batches. Those lock token could come from multiple receive clients, but they would end up being completed on the same instance. Depending on the transport type you use it might work or it won’t. When you...
Batch completion with multiple receivers on Azure Service Bus
In the last post, we created multiple receivers with multiple factories to speed up our message processing. Unfortunately, this has some consequences for our background completion loop. Since the message processing logic is shared between multiple receivers, all the receivers will try to push lock tokens into the concurrent stack. So the following code will be called by numberOfReceivers * concurrencyPerReceiver concurrently. So for example when we’d use 10 receivers with each a...