Our journey to F#: persistency

Prepare for a long post about how we persist data with F# using Dapper into (Azure) SQL Server. I will also show, how we can run our tests either with an in-memory database simulation or a real database. As always, if you spot something that can be improved, please let me know. We are still learning…

The business logic

When the business logic needs to persist data, it uses a function that is passed as an argument:

let createActivityStructure
    (
        persistActivityStructureEvent, // ActivityStructureEvent -> Async<unit>
        // ... other dependencies ...
    )
    // ... data arguments ...
    =

    let theEventToBePersisted = ...
    persistActivityStructureEvent theEventToBePersisted

We pass dependencies as a tuple to our business logic functions. This simplifies wiring things together because Intelli-sense works better with tuples than plain arguments. It also makes sure that we pass all dependencies and no unintended partial application of function arguments happens.

Other arguments are passed normally.

The persistActivityStructureEvent function takes an ActivityStructureEvent and returns Async<unit>.

The storage “interface”

We use a record to define the functions that are supported by the storage for activity structures:

type ActivityStructuresStorage = {
    PersistEvent : ActivityStructureEvent -> Async<unit>
    PersistEvents : ActivityStructureEvent list -> Async<unit>
    QueryEventsByActivityStructureId : ActivityStructureId -> Async<EventsForProjection<ActivityStructureEvent>>
    QueryEventsByEventIds : ActivityStructureEventId list -> Async<ActivityStructureEvent list>
    QueryEvents : unit -> Async<EventsForProjections<ActivityStructureEvent>>
}

We have a highly modular design and our system is split into many of these small storages. This simplifies refactoring and extensibility – just like building something new out of existing Lego bricks.

You may ask: why not use an interface, but a record? I just like it this way 🙂

The storage implementations

We have two implementations of these storage functions: in-memory and real database.

We use the in-memory database during coding and testing locally. Using a simple simulation of the database speeds up development a lot because we do not have to worry about database schemas at all. Once the functionality is implemented and we know the structure of the data, we can design the SQL database design.

You may ask: why store data for event sourcing in an SQL server? SQL server is very reliable, well understood, performant and we can do with the data whatever we want. This is very important to me because code can change quickly, data needs to stay for many years.

The in-memory implementation

Here is the complete – yes, no code left out – implementation of the in-memory store for activity structures:

module Calitime.TimeRocket.Core.ActivityTime.Storages.InMemory.ActivityStructures

open Calitime.TimeRocket.Core.ActivityTime
open Calitime.TimeRocket.Core.ActivityTime.Storages
open Calitime.TimeRocket.Streamliner
open FsToolkit.ErrorHandling

let createStorage () =
    let mutable (events : ActivityStructureEvent list) = []

    let persistEvent event =
        async {
            events <- event::events
        }

    let persistEvents e =
        async {
            events <- List.append e events
        }

    let queryEventsByActivityStructureId activityStructureId =
        events
        |> Seq.filter (fun e -> activityStructureId = e.ActivityStructureId)
        |> EventsForProjection
        |> Async.singleton

    let queryEventsByEventIds eventIds =
        events
        |> List.filter (fun e -> eventIds |> List.contains e.ActivityStructureEventId)
        |> Async.singleton

    let queryEvents () =
        events
        |> List.toSeq
        |> EventsForProjections
        |> Async.singleton

    {
        ActivityStructuresStorage.PersistEvent = persistEvent
        ActivityStructuresStorage.PersistEvents = persistEvents
        QueryEventsByActivityStructureId = queryEventsByActivityStructureId
        QueryEventsByEventIds = queryEventsByEventIds
        QueryEvents = queryEvents
    }

We keep the data in a mutable variable events that simulates the SQL database table. The rest are some simple operations on this list.

We use FsToolkit.ErrorHandling‘s Async.singleton to return the result as an Async. This feels more straight forward than using an async { } block in queries.

The SQL implementation

The SQL implementation of the storage needs to map the events (F# records with nested discriminated union) into simple records that can be stored into the database, and back.

Therefore we have two map functions for that:

let mapToRow (event : ActivityStructureEvent) =
    let (rowType, data) =
        match event.Data with
        | ActivityStructureCreated d ->
            (CreatedType,
             JsonConverters.encode d)
        | ActivityStructureDeleted ->
            (DeletedType,
             null)
        // ... other event types ...

    {
        ActivityStructureEventRow.Type = rowType
        EventGuid = event.ActivityStructureEventId |> unwrapActivityStructureEventId
        ActivityStructureId = event.ActivityStructureId |> unwrapActivityStructureId
        Data = data
        Application = event.Application |> unwrapApplication
    }
let mapToEvent row =
    let eventDataDecoder =
        match row.Type with
        | CreatedType ->
            JsonConverters.decodeThrowing<ActivityStructureCreated> >> ActivityStructureCreated
        | DeletedType ->
            fun _ -> ActivityStructureDeleted
        // ... other event types ...
        | _ ->
           failwithf "unknown row type %s" row.Type

    let data = row.Data |> eventDataDecoder

    {
        ActivityStructureEvent.ActivityStructureEventId = row.EventGuid |> ActivityStructureEventId
        ActivityStructureId = row.ActivityStructureId |> ActivityStructureId
        Data = data
        Application = row.Application |> wrapApplication
    }

JSON serialisation of F# records and discriminated unions is a topic for its own blog post.

The wrap and unwrap functions map an F# type into a type that can be directly stored into SQL server, and back. We unwrap for example our identifiers (single case discriminated unions into GUIDs):

let unwrapActivityStructureEventId (ActivityStructureEventId guid) = guid

The “row” that is stored to SQL server looks like this:

[<CLIMutable>]
type ActivityStructureEventRow = {
    EventGuid : Guid
    ActivityStructureId : Guid
    Type : string
    Data : string
    Application : DateTime
}

We chose to use mostly basic types that can be stored directly without conversion. There are some exceptions for which we use converters and let Dapper do the conversion. Since we map our data anyway, we found it simpler to do (almost) all mapping in one place.

The class is marked [<CLIMutable>] so that Dapper can write the values when reading data form the storage.

Now, we can create the storage implementation (shortened):

let createStorage (persistencyContext : Sql.PersistencyContext) =

    let persistEvents events =
        async {
            use! connection = Sql.openConnection persistencyContext
            use transaction = connection.BeginTransaction()

            let sql = """
                INSERT INTO activityTime.activityStructureEvents
                (eventGuid,activityStructureId,type,data,application)
                VALUES (@eventGuid,@activityStructureId,@type,@data,@application)"""

            let rows = events |> List.map mapToRow

            do! Sql.execute
                    persistencyContext.Logger
                    transaction
                    sql
                    rows
                    "ActivityStructure.PersistEvents"

            transaction.Commit ()
        }


    let queryEventsByActivityStructureId (ActivityStructureId guid) =
        async {
            use! connection = Sql.openConnection persistencyContext

            let sql = """
                SELECT eventGuid,activityStructureId,type,data,application
                FROM activityTime.activityStructureEvents
                WHERE activityStructureId = @activityStructureId"""

            let! events =
                Sql.query
                    persistencyContext.Logger
                    connection
                    sql
                    {| activityStructureId = guid |}
                    "ActivityStructure.QueryEventsByActivityStructureId"

            return
                events
                |> Seq.map mapToEvent
                |> EventsForProjection
        }


    let queryEventsByEventIds (ids : ActivityStructureEventId list) =
        // ... 


    let queryEvents () =
        // ...

    {
        ActivityStructuresStorage.PersistEvent = fun event -> persistEvents [ event ]
        PersistEvents = persistEvents
        QueryEventsByActivityStructureId = queryEventsByActivityStructureId
        QueryEventsByEventIds = queryEventsByEventIds
        QueryEvents = queryEvents
    }

The createStorage function creates the storage record we have seen above.

The individual query functions follow the same pattern:

  • open connection (and transaction for writes)
  • build the SQL query
  • execute the query
  • map the rows to events using the above mapToEvent function

Query parameters are passed as anonymous records. They can be passed directly to Dapper that will create SqlParameters from the fields.

Please be aware of the fact that you have to pass a DbString for strings in whereclauses to prevent unneeded conversions in SQL Server. See here.

The Sql module provides some helper functions to open connections and execute queries.

SQL Helpers for switching storages, row-level security and monitoring

We want to be able to run our code either with the in-memory database simulation or the real database. The PersistencyContext holds the data needed to access the real database:

[<RequireQualifiedAccess>]
module Calitime.TimeRocket.Core.ActivityTime.Storages.Database.Sql

open System
open System.Collections.Generic
open System.Threading.Tasks
open Calitime
open Calitime.Resilience
open Dapper
open Microsoft.Data.SqlClient
open Newtonsoft.Json
open Polly
open System.Diagnostics
open Tasks

type PersistencyContext = {
    ConnectionString : string
    TenantId : int
    Logger : ILogger
}

The PersistencyContext holds the connection string, the current tenant and a logger used for monitoring.

We use row-level security to implement a multi-tenant system. Every tenant can only read its own data. The openConnection function opens a connection and sets the tenant to be used for row-level security:

let openConnection persistencyContext =
    task {
        let connection = new SqlConnection(persistencyContext.ConnectionString);

        do! connection.OpenAsync()

        let cmd = connection.CreateCommand();
        cmd.CommandText <- @"exec sp_set_session_context @key=N'TenantId', @value=@shardingKey";
        cmd.Parameters.AddWithValue("@shardingKey", persistencyContext.TenantId) |> ignore
        do! cmd.ExecuteNonQueryAsync() |> Task.Ignore

        return connection
    } |> Async.AwaitTask

We retry calls to the database in case that the exception is transient. The isTransientError function returns whether an SqlException represents a transient error, or not:

// http://elvanydev.com/resilience-with-polly/
let private SqlTransientErrors =
    [
        40613 // Database is not currently available.
        40197 // Error processing the request.
        40501 // The service is currently busy.
        49918 // Not enough resources to process the request.
        40549 // Session is terminated because you have a long-running transaction.
        40550 // The session has been terminated because it has acquired too many locks.
    ]

let private isTransientError (ex : SqlException) =
    List.contains ex.Number SqlTransientErrors

The track function takes care of sending monitoring data to our monitoring service and retries calls to the database in case a transient error occured:

let serializeParameters parameters =
    try
        JsonConvert.SerializeObject(parameters)
    with
    | _ -> "not serializable"

let private track
    sql
    parameters
    description
    (f : unit -> Task<'a>)
    (logger : ILogger) =

    task {
        let start = DateTimeOffset.UtcNow;
        let sw = Stopwatch();
        sw.Start();
        let! result =
            Policy
                .Handle<SqlException>(fun ex -> isTransientError ex)
                .RetryWithExponentialWait(5)
                .ExecuteAsync(fun () ->
                        task {
                            try
                                let! result = f()
                                return result |> Result.Ok
                            with
                            | :? SqlException as ex ->
                                if (isTransientError ex) then
                                    logger.LogDatabaseRetry()

                                return ex |> Result.Error
                        })

        sw.Stop()

        let success =
            match result with
            | Ok _ -> true
            | Error _ -> false

        logger.LogDatabase(
            sql,
            serializeParameters parameters,
            description,
            start,
            sw.Elapsed,
            success)

        return
            match result with
            | Error ex -> raise (Exception("Could not persist", ex))
            | Ok r -> r
    }

We use Polly for retrying failing database calls.

The logger is a C# class so we have to map the F# data structures to C# data structures (Result -> bool).

The execute function executes an SQL query that does not return data:

let execute
    logger
    (transaction : SqlTransaction)
    sql
    parameters
    caller =

    task {

        let f = fun () ->
                     transaction.Connection.ExecuteAsync(
                        sql,
                        parameters,
                        transaction) |> Task.Ignore

        return! track
                    sql
                    parameters
                    caller
                    f
                    logger
    } |> Async.AwaitTask

It is just a wrapper around the track function so that is is easier to call.

The queryfunction executes an SQL query that returns data:

let query
    logger
    (connection : SqlConnection)
    sql
    parameters
    caller : Async<IEnumerable<'result>> =

    task {

        let f = fun () ->
                     connection.QueryAsync<'result>(
                        sql,
                        parameters)

        return! track
                    sql
                    parameters
                    caller
                    f
                    logger
    } |> Async.AwaitTask

It is just a wrapper around the track function so that it is easier to call.

Wiring everything together

Now we have all the parts needed and as a final step, we need to glue everything together.

The storage creator creates the storage access layer. Either a real one when there is a PersistencyContext available, or otherwise the in-memory simulation:

module Calitime.TimeRocket.Core.StorageCreator

open Calitime.TimeRocket.Core.ActivityTime.Storages

let createStorage persistencyContext =
        match persistencyContext with
        | Some x ->
            {
                Storage.ActivityValues = Database.ActivityValues.createStorage x
                Activities = Database.Activities.createStorage x
                ActivityStructures = Database.ActivityStructures.createStorage x
            }
        | None ->
            {
                Storage.ActivityValues = InMemory.ActivityValues.createStorage ()
                Activities = InMemory.Activities.createStorage ()
                ActivityStructures = InMemory.ActivityStructures.createStorage ()
            }

The above example contains some stores that we did not see yet to give you a better big picture.

Our productive code calls the storage creator with a PersistencyContext that contains the connection string from a configuration file and the tenant of the user that makes the call to the WebApi. This is handled inside a WebApi Controller base class. The tenant is read from the authentication token.

Our test code decides whether the tests should be run in-memory or with a database:

let private getPersistencyContext () =
        let environment = Environment.GetEnvironmentVariable("TestEnvironment")
        let environment = if String.IsNullOrWhiteSpace(environment) then "inmemory" else environment
        let builder = Microsoft.Extensions.Configuration.ConfigurationBuilder()
        let builder = Microsoft.Extensions.Configuration.FileConfigurationExtensions.SetBasePath(builder, Directory.GetCurrentDirectory())
        let builder = Microsoft.Extensions.Configuration.JsonConfigurationExtensions.AddJsonFile(builder, sprintf "appsettings.%s.json" environment)

        let configuration = builder.Build();

        let connectionString = Microsoft.Extensions.Configuration.ConfigurationExtensions.GetConnectionString(configuration, "testing")
        if (String.IsNullOrWhiteSpace(connectionString)) then
            None
        else
            let x : Sql.PersistencyContext =
                {
                    ConnectionString = connectionString
                    TenantId = (FakeStorageTenantIdLeaser.LeaseTenantId()).Value + 50000
                    Logger = FakeLogger ()
                }
            x |> Option.Some

Depending on an environment variable the corresponding configuration file is read to get the connection string, or none when running in-memory. The tests are run with a real database in our continuous integration pipeline and when we implement the storage access layer, and in-memory when we code business logic.

In the tests, we generate a fake tenant id for every test run. This allows us to run tests in parallel without cross-talk, since the tenant data is isolated from other tenants.

The returned PersistencyContext is then passed to the storage creator and the test can be executed.

Conclusions

Persisting data with F# and Dapper is quite the same as with C#. However, the easy composability of functions helps implementing our run-in-memory-or-on-a-real-database in a simpler way that it is done in our C# code.

In reality, things are a bit more complicated because we have to do the storage stuff twice. Once for F# and once for C# because we have a mix of the two languages. But this affects only the builders that build up our system for running tests. That will be part of a later blog post.

So stay tuned…

This blog post is made possible with the support of Time Rocket, the product this journey is all about. Take a look (German only).

Find all blog posts about our journey to F# here.

About the author

Urs Enzler

Add comment

By Urs Enzler

Recent Posts