MultiProducerConcurrentConsumer – Asynchronous synchronisation

In the last post I explained how the push in batches up to the concurrency per slot works. This post covers the asynchronous synchronisation approach I used to unleash the push in batches when the batch size is reached.

In the code shown in the Push It post I simplified the synchronisation primitive and called it syncEvent. In reality, the code uses a TaskCompletionSource for synchronisation.

The MPCC declares a field of type TaskCompletionSource<bool>. For the MPCC the TResult type of the TaskCompletionSource doesn’t matter. It could be anything.

TaskCompletionSource<bool> batchSizeReached = new TaskCompletionSource<bool>();

The Push method looks like

public void Push(TItem item, int slotNumber) {
    // as before

    if (incrementedCounter > batchSize)
    {
        batchSizeReached.TrySetResult(true);
    }
}

Whenever the batch size is reached the component tries to set the task completion source into the completed state. It is crucial to use the TrySetResult method since it is possible that the timer loop will try to complete it at the same time, due to the push interval being reached. Using SetResult in Push would result in exceptions bubbling up to the caller of Push in that case.

async Task TimerLoop() {
    var token = tokenSource.Token;
    while (!tokenSource.IsCancellationRequested) {
        try
        {
            await Task.WhenAny(Task.Delay(pushInterval, token), batchSizeReached.Task).ConfigureAwait(false);
            
            batchSizeReached.TrySetResult(true); 

            batchSizeReached = new TaskCompletionSource<bool>();
            await PushInBatches().ConfigureAwait(false);
        }
        catch (Exception)
        {
            // intentionally ignored
        }
    }
}

The timer loop awaits with a Task.WhenAny either the push interval delay task or the batchSizeReached task that is represented by the TaskCompletionSource. When the WhenAny task returns the batchSizeReached task completion source is set to completed with TrySetResult because it might have been the push interval task that completed. Then the field is reset to a new TaskCompletionSource because TaskCompletionSource cannot be reused. The code doesn’t use locking around setting the TaskCompletionSource. The assumption is that field exchange is atomic and there is only one timer loop that resets the field. What could happen though is the following:

Concurrent operations might do a TrySetResult in the push method on the previous task completion source. In that case, the next push will automatically lead to unleashing the task completion source since the incrementedCounter hasn’t been reset. In the worst case if no push will happen the MPCC will only push when the next push interval is due but in my opinion, this is a reasonable tradeoff to make compared to the performance overhead locking around the task completion source would incorporate.

In the next installement, I’m going to talk about the completion method of the MPCC.

About the author

Daniel Marbach

2 comments

By Daniel Marbach

Recent Posts