Participating in TransactionScopes and Async/Await Introduction

I would consider this blogpost as unnecessary knowledge for most programming tasks you do during your lifetime as a developer. But if you are involved in building libraries or tools used in integration scenarios you might consider this helpful. The .NET platform has under System.Transactions a very nifty class called TransactionScope. The TransactionScope allows you to wrap your database code, your messaging code and sometimes even third-party code (if supported by the third-party library) inside a transaction and only perform the actions when you actually want to commit (or complete) the transaction. As long as all the code inside the TransactionScope is executed on the same thread, all the code on the callstack can participate with the TransactionScope defined in your code. You can nest scopes, create new independent scopes inside a parent transaction scope or even creates clones of a TransactionScope and pass the clone to another thread and join back onto the calling thread but all this is not part of this blogpost. In this blogpost I will cover how to write so called enlistments which can participate with TransactionScope for your own code and and in the next post how to overcome the challenges you’ll face in asynchronous scenarios.

So let’s assume you are writing a message sender which can send messages onto a transport layer of your choice (i.ex. MSMQ, RabbitMQ…). The synchronous variant of that sender could look like the following pseudo-code (drastically simplified for demo purposes!):

    public class Message
    {
        public Message(string value)
        {
            this.Value = value;
        }

        public string Value { get; private set; }
    }
    public interface ISendMessages
    {
        void Send(Message message);
    }

    public class NonTransactionalMessageSender: ISendMessages
    {
        private readonly List<Message> sentMessages = new List<Message>();

        public IReadOnlyCollection<Message> SentMessages
        {
            get { return new ReadOnlyCollection<Message>(this.sentMessages); }
        }

        public void Send(Message message)
        {
            this.SendInternal(message);
        }

        private void SendInternal(Message message)
        {
            this.sentMessages.Add(message);
        }
    }

In the code above I replaced the transport layer with a list which collects all sent messages. When we wrap the message NonTransactionalMessageSender inside a TransactionScope the following test will fail:

        [Test]
        public void ScopeRollback_DoesntSend()
        {
            var sender = new NonTransactionalMessageSender();
            using (var tx = new TransactionScope())
            {
                sender.Send(new Message("First"));
                sender.Send(new Message("Second"));
                sender.Send(new Message("Last"));

                // We do not commit the scope
            }

            sender.SentMessages.Should().BeEmpty();
        }

NonTransactionalMessageSenderFailsTest

But how can we make our MessageSender implementation aware of the surrounding TransactionScope? Pretty simple! We have to write a so called enlistment. We will use a volatile enlistment because we don’t want to participate in two-phase commit transactions. You can read more about enlistments like two-phase, single-phase, durable vs. non-durable and more in the links provided in the reading list at the end of this post.

    public class SendResourceManager : IEnlistmentNotification
    {
        private readonly Action onCommit;

        public SendResourceManager(Action onCommit)
        {
            this.onCommit = onCommit;
        }

        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            preparingEnlistment.Prepared();
        }

        public void Commit(Enlistment enlistment)
        {
            this.onCommit();
            enlistment.Done();
        }

        public void Rollback(Enlistment enlistment)
        {
            enlistment.Done();
        }

        public void InDoubt(Enlistment enlistment)
        {
            enlistment.Done();
        }
    }

The SendRessourceManager above implements the IEnlistmentNotification interface. This interface is used by the transaction manager and called during the different transaction phases. The implementation does nothing more than accepting an action delegate in the constructor and calling the provided action delegate when the transaction manager calls commit on the enlistment. We now need to plug in the SendRessourceManager into the current transaction. This looks like the following:

    public class TransactionalMessageSender : IsendMessages
    {
        private readonly List<Message> sentMessages = new List<Message>();

        public IreadOnlyCollection<Message> SentMessages
        {
            get { return new ReadOnlyCollection<Message>(this.sentMessages); }
        }

        public void Send(Message message)
        {
            if (Transaction.Current != null)
            {
                Transaction.Current.EnlistVolatile(
                    new SendResourceManager(() => this.SendInternal(message)),
                    EnlistmentOptions.None);
            }
            else
            {
                this.SendInternal(message);
            }
        }

        private void SendInternal(Message message)
        {
            this.sentMessages.Add(message);
        }
    }

The code above simply checks whether a transaction is currently available by checking the Transaction.Current property and if a transaction is available it enlists the SendRessourceManager including an anonymous delegate which calls SendInternal on the current transaction. In the case that there is no transaction available it simply calls SendInternal. So what happens when we use the new message sender in the test defined above?

TransactionalMessageSenderTestSucceded

And what about this test?

        [Test]
        public void ScopeComplete_Sends()
        {
            var sender = new TransactionalMessageSender();
            using (var tx = new System.Transactions.TransactionScope())
            {
                sender.Send(new Message("First"));
                sender.Send(new Message("Second"));
                sender.Send(new Message("Last"));

                tx.Complete();
            }

            sender.SentMessages.Should().HaveCount(3)
                .And.Contain(m => m.Value == "First")
                .And.Contain(m => m.Value == "Second")
                .And.Contain(m => m.Value == "Last");
        }

TransactionalMessageSenderTestScopeCompleteSucceded

Of course it’s green too ;). But what if the sender has to support async and therefore the interface of the sender has to be changed to this?

    public interface ISendMessagesAsync
    {
        Task SendAsync(Message message);
    }

We cannot leave the code as is because we have no way to await the sending of the message. In the next post I’ll look deeper into the issues that arise and how we can overcome those. Stay asynchronously tuned!

Readings

About the author

Daniel Marbach

3 comments

By Daniel Marbach

Recent Posts