MultiProducerConcurrentConsumer – Completion

In the last post about the MPCC I explained how the asynchronous synchronization works by leveraging the power of a TaskCompletionSource. Today I’ll walk you through the completion method.

The completion method has essentially two modes. A drain mode and a non-drain mode. In the drain mode, all items that are enqueued will be asynchronously pushed until all internal queues are empty. In the non-drain mode, the queues will be emptied but nothing will be pushed if there is something left. The component is discarding all pending items. Let’s look at the completion method as a whole below.

public async Task Complete(bool drain = true) {
    if (started) {
        tokenSource.Cancel();
        await timer.ConfigureAwait(false);

        if (drain) {
            do {
                await PushInBatches().ConfigureAwait(false);
            } while (Interlocked.Read(ref numberOfPushedItems) > 0);
        }

        tokenSource.Dispose();
    }

    foreach (var queue in queues) {
        if (queue.IsEmpty) {
            continue;
        }

        TItem item;
        while (queue.TryDequeue(out item)) { }
    }

    numberOfPushedItems = 0;
    started = false;
    pump = null;
    state = null;
    tokenSource = null;
}

When the component was previously started the token source that is observed by the timer loop is canceled and the completion of the timer loop is awaited. When the timer loop is done and the drain mode is active the PushInBatches method is asynchronously awaited on the caller of the complete method. This happens in a while loop until the there is nothing more to push.

At the end of the method, the queues are checked. If the drain mode was selected the assumption is that the queues will be empty. When the drain mode was previously not selected all the queues are emptied by using the TryDequeue method until there is nothing left. Finally, all the internal state is reset so that the MPCC component could be started again.

Now we’ve covered over multiple posts how the MPCC component works that is responsible to concurrently complete messages for Azure Service Bus message receive loops. In the next installments, I’m going to have a closer look into how we could test such a concurrent component with a component test and the challenges we’ll face in doing so. Happy easter to you all!

About the author

Daniel Marbach

3 comments

By Daniel Marbach

Recent Posts