Prepare for a long post about how we persist data with F# using Dapper into (Azure) SQL Server. I will also show, how we can run our tests either with an in-memory database simulation or a real database. As always, if you spot something that can be improved, please let me know. We are still learning…
The business logic
When the business logic needs to persist data, it uses a function that is passed as an argument:
let createActivityStructure
(
persistActivityStructureEvent, // ActivityStructureEvent -> Async<unit>
// ... other dependencies ...
)
// ... data arguments ...
=
let theEventToBePersisted = ...
persistActivityStructureEvent theEventToBePersisted
We pass dependencies as a tuple to our business logic functions. This simplifies wiring things together because Intelli-sense works better with tuples than plain arguments. It also makes sure that we pass all dependencies and no unintended partial application of function arguments happens.
Other arguments are passed normally.
The persistActivityStructureEvent
function takes an ActivityStructureEvent
and returns Async<unit>
.
The storage “interface”
We use a record to define the functions that are supported by the storage for activity structures:
type ActivityStructuresStorage = {
PersistEvent : ActivityStructureEvent -> Async<unit>
PersistEvents : ActivityStructureEvent list -> Async<unit>
QueryEventsByActivityStructureId : ActivityStructureId -> Async<EventsForProjection<ActivityStructureEvent>>
QueryEventsByEventIds : ActivityStructureEventId list -> Async<ActivityStructureEvent list>
QueryEvents : unit -> Async<EventsForProjections<ActivityStructureEvent>>
}
We have a highly modular design and our system is split into many of these small storages. This simplifies refactoring and extensibility – just like building something new out of existing Lego bricks.
You may ask: why not use an interface, but a record? I just like it this way 🙂
The storage implementations
We have two implementations of these storage functions: in-memory and real database.
We use the in-memory database during coding and testing locally. Using a simple simulation of the database speeds up development a lot because we do not have to worry about database schemas at all. Once the functionality is implemented and we know the structure of the data, we can design the SQL database design.
You may ask: why store data for event sourcing in an SQL server? SQL server is very reliable, well understood, performant and we can do with the data whatever we want. This is very important to me because code can change quickly, data needs to stay for many years.
The in-memory implementation
Here is the complete – yes, no code left out – implementation of the in-memory store for activity structures:
module Calitime.TimeRocket.Core.ActivityTime.Storages.InMemory.ActivityStructures
open Calitime.TimeRocket.Core.ActivityTime
open Calitime.TimeRocket.Core.ActivityTime.Storages
open Calitime.TimeRocket.Streamliner
open FsToolkit.ErrorHandling
let createStorage () =
let mutable (events : ActivityStructureEvent list) = []
let persistEvent event =
async {
events <- event::events
}
let persistEvents e =
async {
events <- List.append e events
}
let queryEventsByActivityStructureId activityStructureId =
events
|> Seq.filter (fun e -> activityStructureId = e.ActivityStructureId)
|> EventsForProjection
|> Async.singleton
let queryEventsByEventIds eventIds =
events
|> List.filter (fun e -> eventIds |> List.contains e.ActivityStructureEventId)
|> Async.singleton
let queryEvents () =
events
|> List.toSeq
|> EventsForProjections
|> Async.singleton
{
ActivityStructuresStorage.PersistEvent = persistEvent
ActivityStructuresStorage.PersistEvents = persistEvents
QueryEventsByActivityStructureId = queryEventsByActivityStructureId
QueryEventsByEventIds = queryEventsByEventIds
QueryEvents = queryEvents
}
We keep the data in a mutable variable events
that simulates the SQL database table. The rest are some simple operations on this list.
We use FsToolkit.ErrorHandling
‘s Async.singleton
to return the result as an Async
. This feels more straight forward than using an async { }
block in queries.
The SQL implementation
The SQL implementation of the storage needs to map the events (F# records with nested discriminated union) into simple records that can be stored into the database, and back.
Therefore we have two map
functions for that:
let mapToRow (event : ActivityStructureEvent) =
let (rowType, data) =
match event.Data with
| ActivityStructureCreated d ->
(CreatedType,
JsonConverters.encode d)
| ActivityStructureDeleted ->
(DeletedType,
null)
// ... other event types ...
{
ActivityStructureEventRow.Type = rowType
EventGuid = event.ActivityStructureEventId |> unwrapActivityStructureEventId
ActivityStructureId = event.ActivityStructureId |> unwrapActivityStructureId
Data = data
Application = event.Application |> unwrapApplication
}
let mapToEvent row =
let eventDataDecoder =
match row.Type with
| CreatedType ->
JsonConverters.decodeThrowing<ActivityStructureCreated> >> ActivityStructureCreated
| DeletedType ->
fun _ -> ActivityStructureDeleted
// ... other event types ...
| _ ->
failwithf "unknown row type %s" row.Type
let data = row.Data |> eventDataDecoder
{
ActivityStructureEvent.ActivityStructureEventId = row.EventGuid |> ActivityStructureEventId
ActivityStructureId = row.ActivityStructureId |> ActivityStructureId
Data = data
Application = row.Application |> wrapApplication
}
JSON serialisation of F# records and discriminated unions is a topic for its own blog post.
The wrap
and unwrap
functions map an F# type into a type that can be directly stored into SQL server, and back. We unwrap for example our identifiers (single case discriminated unions into GUIDs):
let unwrapActivityStructureEventId (ActivityStructureEventId guid) = guid
The “row” that is stored to SQL server looks like this:
[<CLIMutable>]
type ActivityStructureEventRow = {
EventGuid : Guid
ActivityStructureId : Guid
Type : string
Data : string
Application : DateTime
}
We chose to use mostly basic types that can be stored directly without conversion. There are some exceptions for which we use converters and let Dapper do the conversion. Since we map our data anyway, we found it simpler to do (almost) all mapping in one place.
The class is marked [<CLIMutable>]
so that Dapper can write the values when reading data form the storage.
Now, we can create the storage implementation (shortened):
let createStorage (persistencyContext : Sql.PersistencyContext) =
let persistEvents events =
async {
use! connection = Sql.openConnection persistencyContext
use transaction = connection.BeginTransaction()
let sql = """
INSERT INTO activityTime.activityStructureEvents
(eventGuid,activityStructureId,type,data,application)
VALUES (@eventGuid,@activityStructureId,@type,@data,@application)"""
let rows = events |> List.map mapToRow
do! Sql.execute
persistencyContext.Logger
transaction
sql
rows
"ActivityStructure.PersistEvents"
transaction.Commit ()
}
let queryEventsByActivityStructureId (ActivityStructureId guid) =
async {
use! connection = Sql.openConnection persistencyContext
let sql = """
SELECT eventGuid,activityStructureId,type,data,application
FROM activityTime.activityStructureEvents
WHERE activityStructureId = @activityStructureId"""
let! events =
Sql.query
persistencyContext.Logger
connection
sql
{| activityStructureId = guid |}
"ActivityStructure.QueryEventsByActivityStructureId"
return
events
|> Seq.map mapToEvent
|> EventsForProjection
}
let queryEventsByEventIds (ids : ActivityStructureEventId list) =
// ...
let queryEvents () =
// ...
{
ActivityStructuresStorage.PersistEvent = fun event -> persistEvents [ event ]
PersistEvents = persistEvents
QueryEventsByActivityStructureId = queryEventsByActivityStructureId
QueryEventsByEventIds = queryEventsByEventIds
QueryEvents = queryEvents
}
The createStorage
function creates the storage record we have seen above.
The individual query functions follow the same pattern:
- open connection (and transaction for writes)
- build the SQL query
- execute the query
- map the rows to events using the above
mapToEvent
function
Query parameters are passed as anonymous records. They can be passed directly to Dapper that will create SqlParameter
s from the fields.
Please be aware of the fact that you have to pass a DbString
for strings in where
clauses to prevent unneeded conversions in SQL Server. See here.
The Sql
module provides some helper functions to open connections and execute queries.
SQL Helpers for switching storages, row-level security and monitoring
We want to be able to run our code either with the in-memory database simulation or the real database. The PersistencyContext
holds the data needed to access the real database:
[<RequireQualifiedAccess>]
module Calitime.TimeRocket.Core.ActivityTime.Storages.Database.Sql
open System
open System.Collections.Generic
open System.Threading.Tasks
open Calitime
open Calitime.Resilience
open Dapper
open Microsoft.Data.SqlClient
open Newtonsoft.Json
open Polly
open System.Diagnostics
open Tasks
type PersistencyContext = {
ConnectionString : string
TenantId : int
Logger : ILogger
}
The PersistencyContext
holds the connection string, the current tenant and a logger used for monitoring.
We use row-level security to implement a multi-tenant system. Every tenant can only read its own data. The openConnection
function opens a connection and sets the tenant to be used for row-level security:
let openConnection persistencyContext =
task {
let connection = new SqlConnection(persistencyContext.ConnectionString);
do! connection.OpenAsync()
let cmd = connection.CreateCommand();
cmd.CommandText <- @"exec sp_set_session_context @key=N'TenantId', @value=@shardingKey";
cmd.Parameters.AddWithValue("@shardingKey", persistencyContext.TenantId) |> ignore
do! cmd.ExecuteNonQueryAsync() |> Task.Ignore
return connection
} |> Async.AwaitTask
We retry calls to the database in case that the exception is transient. The isTransientError
function returns whether an SqlException
represents a transient error, or not:
// http://elvanydev.com/resilience-with-polly/
let private sqlTransientErrors =
[|
40613 // Database is not currently available.
40197 // Error processing the request.
40501 // The service is currently busy.
49918 // Not enough resources to process the request.
40549 // Session is terminated because you have a long-running transaction.
40550 // The session has been terminated because it has acquired too many locks.
|]
let private isTransientError (ex : SqlException) =
List.contains ex.Number sqlTransientErrors
The track
function takes care of sending monitoring data to our monitoring service and retries calls to the database in case a transient error occured:
let serializeParameters parameters =
try
JsonConvert.SerializeObject(parameters)
with
| _ -> "not serializable"
let private track
sql
parameters
description
(f : unit -> Task<'a>)
(logger : ILogger) =
task {
let start = DateTimeOffset.UtcNow;
let sw = Stopwatch();
sw.Start();
let! result =
Policy
.Handle<SqlException>(fun ex -> isTransientError ex)
.RetryWithExponentialWait(5)
.ExecuteAsync(fun () ->
task {
try
let! result = f()
return result |> Result.Ok
with
| :? SqlException as ex ->
if (isTransientError ex) then
logger.LogDatabaseRetry()
return ex |> Result.Error
})
sw.Stop()
let success =
match result with
| Ok _ -> true
| Error _ -> false
logger.LogDatabase(
sql,
serializeParameters parameters,
description,
start,
sw.Elapsed,
success)
return
match result with
| Error ex -> raise (Exception("Could not persist", ex))
| Ok r -> r
}
We use Polly
for retrying failing database calls.
The logger
is a C# class so we have to map the F# data structures to C# data structures (Result
-> bool
).
The execute
function executes an SQL query that does not return data:
let execute
logger
(transaction : SqlTransaction)
sql
parameters
caller =
task {
let f = fun () ->
transaction.Connection.ExecuteAsync(
sql,
parameters,
transaction) |> Task.Ignore
return! track
sql
parameters
caller
f
logger
} |> Async.AwaitTask
It is just a wrapper around the track
function so that is is easier to call.
The query
function executes an SQL query that returns data:
let query
logger
(connection : SqlConnection)
sql
parameters
caller : Async<IEnumerable<'result>> =
task {
let f = fun () ->
connection.QueryAsync<'result>(
sql,
parameters)
return! track
sql
parameters
caller
f
logger
} |> Async.AwaitTask
It is just a wrapper around the track
function so that it is easier to call.
Wiring everything together
Now we have all the parts needed and as a final step, we need to glue everything together.
The storage creator creates the storage access layer. Either a real one when there is a PersistencyContext
available, or otherwise the in-memory simulation:
module Calitime.TimeRocket.Core.StorageCreator
open Calitime.TimeRocket.Core.ActivityTime.Storages
let createStorage persistencyContext =
match persistencyContext with
| Some x ->
{
Storage.ActivityValues = Database.ActivityValues.createStorage x
Activities = Database.Activities.createStorage x
ActivityStructures = Database.ActivityStructures.createStorage x
}
| None ->
{
Storage.ActivityValues = InMemory.ActivityValues.createStorage ()
Activities = InMemory.Activities.createStorage ()
ActivityStructures = InMemory.ActivityStructures.createStorage ()
}
The above example contains some stores that we did not see yet to give you a better big picture.
Our productive code calls the storage creator with a PersistencyContext
that contains the connection string from a configuration file and the tenant of the user that makes the call to the WebApi. This is handled inside a WebApi Controller base class. The tenant is read from the authentication token.
Our test code decides whether the tests should be run in-memory or with a database:
let private getPersistencyContext () =
let environment = Environment.GetEnvironmentVariable("TestEnvironment")
let environment = if String.IsNullOrWhiteSpace(environment) then "inmemory" else environment
let builder = Microsoft.Extensions.Configuration.ConfigurationBuilder()
let builder = Microsoft.Extensions.Configuration.FileConfigurationExtensions.SetBasePath(builder, Directory.GetCurrentDirectory())
let builder = Microsoft.Extensions.Configuration.JsonConfigurationExtensions.AddJsonFile(builder, sprintf "appsettings.%s.json" environment)
let configuration = builder.Build();
let connectionString = Microsoft.Extensions.Configuration.ConfigurationExtensions.GetConnectionString(configuration, "testing")
if (String.IsNullOrWhiteSpace(connectionString)) then
None
else
let x : Sql.PersistencyContext =
{
ConnectionString = connectionString
TenantId = (FakeStorageTenantIdLeaser.LeaseTenantId()).Value + 50000
Logger = FakeLogger ()
}
x |> Option.Some
Depending on an environment variable the corresponding configuration file is read to get the connection string, or none when running in-memory. The tests are run with a real database in our continuous integration pipeline and when we implement the storage access layer, and in-memory when we code business logic.
In the tests, we generate a fake tenant id for every test run. This allows us to run tests in parallel without cross-talk, since the tenant data is isolated from other tenants.
The returned PersistencyContext
is then passed to the storage creator and the test can be executed.
Conclusions
Persisting data with F# and Dapper is quite the same as with C#. However, the easy composability of functions helps implementing our run-in-memory-or-on-a-real-database in a simpler way that it is done in our C# code.
In reality, things are a bit more complicated because we have to do the storage stuff twice. Once for F# and once for C# because we have a mix of the two languages. But this affects only the builders that build up our system for running tests. That will be part of a later blog post.
So stay tuned…
This blog post is made possible with the support of Time Rocket, the product this journey is all about. Take a look (German only).
Find all blog posts about our journey to F# here.
A very minor suggestion: the SqlTransientErrors list could be an array. In general, use an array for a fixed-length collection. Also, it’s not a type, so it should start with a lowercase s
Thanks for the suggestion. That was copied from our C# code.