Participating in TransactionScopes and Async/Await Going deep into the abyss

In my last post I showed you the TransactionScope class and how you can write your own enlistments to participate in transactions. The code we wrote was all synchronous. This time we are going deep into the abyss and change our code sample to a completely asynchronous API. Let’s explore how the code could look like:

    public class Message
    {
        public Message(string value)
        {
            this.Value = value;
        }

        public string Value { get; private set; }
    }
    public interface ISendMessagesAsync
    {
        Task SendAsync(Message message);
    }
    public class AsyncTransactionalMessageSender : ISendMessagesAsync
    {
        private readonly List<Message> sentMessages = new List<Message>();

        public IReadOnlyCollection<Message> SentMessages
        {
            get { return new ReadOnlyCollection<Message>(this.sentMessages); }
        }

        public async Task SendAsync(Message message)
        {
            await this.SendInternal(message);
        }

        private async Task SendInternal(Message message)
        {
            Debug.WriteLine("Sending");
            await Task.Delay(1000);
            this.sentMessages.Add(message);
            Debug.WriteLine("Sent");
        }
    }

In the code above I replaced the transport layer with a list which collects all sent messages. When we wrap the message AsyncTransactionalMessageSender inside a TransactionScope the following test will take roughly around 3 seconds and then suddenly fail:

        [Test]
        public async Task ScopeRollbackAsync_DoesntSend()
        {
            var sender = new AsyncTransactionalMessageSender();
            using (var tx = new TransactionScope()
            {
                await sender.SendAsync(new Message("First"));
                await sender.SendAsync(new Message("Second"));
                await sender.SendAsync(new Message("Last"));

                // We do not commit the scope
            }
            sender.SentMessages.Should().BeEmpty();
        }

TransactionScopeWithoutFlowOptions

Wow! What just happened and how can we fix this? The easiest way to fix this situation is to remove the Task.Delay(1000) call in the SendInternal method. Then the InvalidOperationException will no longer be raised. But this didn’t really fix our problem. Because we removed all asynchronous calls in the SendInternal everything will be executed synchronously. As soon as we introduce an asynchronous call again in the SendInternal method the exception will be raised again. So what is the issue?

The exception actually already explains what is happening. The TransactionScope is wrapped in a using block, therefore at the end of the using statement the Transactionscope is disposed. Because the calling thread (here the unit test runner thread) is freed up when the asynchronous operation is running there is nothing which guarantees us that the same calling thread is disposing the TransactionScope. The .NET Framework 4.5.1 introduced a new option on the TransactionScope class called TransactionScopeAsyncFlowOption which has an Enabled value. Enabled instructs the framework to let the TransactionScope flow with the asynchronous continuations. After migrating our projects to .NET 4.5.1 we can choose this new option:

        [Test]
        public async Task ScopeRollbackAsync_DoesntSend()
        {
            var sender = new AsyncTransactionalMessageSender();
            using (var tx = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
            {
                await sender.SendAsync(new Message("First"));
                await sender.SendAsync(new Message("Second"));
                await sender.SendAsync(new Message("Last"));

                // We do not commit the scope
            }
            sender.SentMessages.Should().BeEmpty();
        }

We execute the test again and are excited to see the test turning green…

TransactionScopeWithFlowOptions

No actually it didn’t turn green. Of course not because we didn’t really implement the transactional behavior. So let’s do that! We need a bunch of code…

    public static class TranscationExtensions
    {
        public static Task EnlistVolatileAsync(
            this Transaction transaction,
            IEnlistmentNotification enlistmentNotification,
            EnlistmentOptions enlistmentOptions)
        {
            return Task.FromResult(transaction.EnlistVolatile(enlistmentNotification, enlistmentOptions));
        }
    }

    public class AsyncSendResourceManager : IEnlistmentNotification
    {
        private readonly Func<Task> onCommit;
        private readonly TaskCompletionSource<object> _source;

        public AsyncSendResourceManager(Func<Task> onCommit)
        {
            this.onCommit = onCommit;
        }

        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            preparingEnlistment.Prepared();
        }

        public async void Commit(Enlistment enlistment)
        {
            Debug.WriteLine("Committing");
            await this.onCommit()
                .ConfigureAwait(false);
            Debug.WriteLine("Committed");
            enlistment.Done();
        }

        public void Rollback(Enlistment enlistment)
        {
            enlistment.Done();
        }

        public void InDoubt(Enlistment enlistment)
        {
            enlistment.Done();
        }
    }

and then change the AsyncTransactionalMessageSender to

        public async Task SendAsync(Message message)
        {
            if (Transaction.Current != null)
            {
                await Transaction.Current.EnlistVolatileAsync(new AsyncSendResourceManager(() => this.SendInternal(message)), EnlistmentOptions.None);
            }
            else
            {
                await this.SendInternal(message);
            }
        }

We created an implementation of the IEnlistmentNotification interface which takes as constructor input a Func<Task>. This allows us to await the function during the commit phase. We change the Commit method to async and also add a thin tasked based wrapper as an extension method to the Transaction class which allows us to await the enlistment. We execute the test again:
PotentiallySuccessfulTest
Everything is green and fine isn’t it? Before we can be proud of our work we should introduce another test:

        [Test]
        public async Task ScopeCompleteAsync_Sends()
        {
            var sender = new AsyncTransactionalMessageSender();
            using (var tx = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
            {
                await sender.SendAsync(new Message("First"));
                await sender.SendAsync(new Message("Second"));
                await sender.SendAsync(new Message("Last"));

                tx.Complete();
            }

            sender.SentMessages.Should().HaveCount(3)
                .And.Contain(m => m.Value == "First")
                .And.Contain(m => m.Value == "Second")
                .And.Contain(m => m.Value == "Last");
        }

Ctrl+U+R later.

FailingTest
Ups! What’s going on? Try it out and ask yourself what could be wrong.

Links

Hints: Look at the debug output and the timing of the test.

About the author

Daniel Marbach

7 comments

  • Hi

    I assume you mean this code

    (new AsyncSendResourceManager(() => this.SendInternal(message))

    ?

    There it doesn’t have to. It shortcuts the async statemachine by return the task. The signature there is Func. So you can either write

    () => this.SendInternal(message)

    or

    async () => await this.SendInternal(message)

    Does that clarify?

  • I do not really get the point of all that. The `Commit` and `Rollback` are triggered by the scope disposal, aren’t they? And this disposal is still synchronous in your code. What am I missing?
    Implementing an interface method as async is not enough to cause it to be asynchronously called, is it?

  • Hi Frederic,
    The exception shown here is coming from the framework. By default, the TransactionScopeAsyncFlowOption is not enabled (backward compatibility). With an await statement inside the using block, the continuation of the await which disposes of the scope can be scheduled to be executed on a different thread than the one the created the scope.

    The second part of the post is hinting at the fact that IEnlistmentNotification is an interface defined by the framework. When we need to call an async method inside that interface we have to mark the void method as async void. The problem with that is that we now have a fire & forget approach. Meaning as soon as the first await statement is reached the call is done from the perspective of the caller (here the transaction resource manager). So any exception raised after the await statement inside the enlistment cannot be observed.

By Daniel Marbach

Recent Posts