Problems around modelling an asynchronous API for database transaction in Rust

This blog post is motivated by the recent Asynchronous clean-up blog post written by withoutboats. In this post I will present how the described problem affects the design of a sound API for handling SQL database transactions.

Basic problem

A SQL database transaction is used to encapsulate several queries to be handled as a single atomic instruction block by the underlying database system. That usually means that if you issue several queries in a transaction, you won't see concurrent changes by other connections to the affected data. Transactions are one of the basic building blocks for relational database systems to provide their consistency guarantees.

Running a transaction requires the following conceptual steps at protocol level.

  1. The client application issues a BEGIN command.
  2. The client application issues one or more queries.
  3. The client application issues either a COMMIT command to apply all changes in the currently running transaction to the database, or it issues a ROLLBACK command to reset the database state to the same state as without the queries in step 2 applied.

Each of the steps requires sending a command to the database server and receiving a response. As this might involve network based I/O we can run into various problems, such as the network going down or receiving a corrupted response. Therefore, each of these steps must be assumed to be fallible.

There is another problem with this multi step procedure that may arise if you reuse a database connection for several operations. Typically, a web application uses a connection pool with a conceptually fixed number of database connections which are reused for processing several requests. This removes the cost of establishing a new connection to the database for each request. If we now look back to our transaction steps, it appears to be problematic if we stop after step 1 or 2 due to some reason. In that case, we might return a connection with an open transaction back to the connection pool, and subsequent requests might use that connection again. If these requests attempt to change the database, no changes will be committed since there is an open pending transaction.

This results in the following basic assumptions about modelling a transaction API:

  1. We must assume that each step involves I/O
  2. We must assume that each step might fail
  3. We must ensure that we always execute the third step to close the transaction

Synchronous solution

There are two commonly used approaches to model a Rust API for database transactions in synchronous code. The first approach uses a callback for the queries inside of the transaction, while the second approach relies on a Drop impl to roll back uncommitted transactions.

Diesel uses the callback-based approach for modelling database transactions. Conceptually this gives us the following code:

impl Connection {
    fn transaction<T, E>(&mut self, f: impl FnOnce(&mut Self) -> Result<T, E>) -> Result<T, E> 
    where 
        E: From<crate::result::Error>
    {
        // start the transaction 
        // (this corresponds to step 1 in our step list)
        self.batch_execute("BEGIN")?;
        // execute the inner queries by 
        // calling the callback
        // (this corresponds to step 2 in our step list)
        match f(self) {
            Ok(o) => {
                // commit the transaction
                // if the inner function returned Ok
                // (this corresponds to step 3 in our list)
                self.batch_execute("COMMIT")?;
                return Ok(o);
            }
            Err(e) => {
                // rollback the transaction
                // if the inner function returned
                // an error
                // (this corresponds to step 3 in our list)
                self.batch_execute("ROLLBACK")?;
                return Err(e);
            }
        }
    
    }
}

The actual Diesel code is more complicated as it supports nesting transactions via SAVEPOINTS and it handles various edge cases, such as attempting to roll back if committing the transaction fails and returning all encountered errors (like for the case where our callback returns an error and the corresponding rollback fails).

The second approach relies on building a transaction object and having an implicit rollback in the Drop implementation of that object. This approach closely mirrors, for example, closing files as implemented by std::fs::File. The postgres crate is an example of a popular Rust crate implementing this approach. Conceptually, this results in the following code:

impl Connection {
    fn transaction(&mut self) -> Result<Transaction<'a>> {
        // start the transaction
        // (This corresponds to step 1 in our list)
        self.batch_execute("BEGIN")?;
        Ok(Self { 
            connection: self,
            closed: false
        })
    }
}

struct Transaction<'a> {
    // borrow the connection mutably
    // to enforce that all commands go 
    // through the connection wrapper
    connection: &'a mut Connection,
    closed: bool,
}

impl Transaction<'_> {
    // Mirror the API of `Connection` for 
    // executing queries
    // (This corresponds to step 2 in our list)
    fn execute_query(&mut self, query: &str) -> Result<> { 
        self.connection.execute_query(query)
    }
    
    // This function explictly commits
    // the transaction
    // (This corresponds to step 3 in our list)
    fn commit(mut self) -> Result<()> {
        self.batch_execute("COMMIT")?;
        // disable the destructor
        self.closed = true;
    }

    // This function explictly rolls backxs
    // the transaction
    // (This corresponds to step 3 in our list)
    fn rollback(mut self) -> Result<()> {
        self.batch_execute("ROLLBACK")?;
        // disable the destructor
        self.closed = true
    }
}

impl Drop for Transaction<'_> {
    fn drop(&mut self) {
        if !self.closed {
            // rollback an open transaction
            // It's not clear how to handle potential
            // errors here
            self.rollback().expect("Rollback failed");
        }
    }
}

Again, the real code is more complicated as it might want to handle failures while committing or rolling back the transaction differently.

I personally prefer the callback based approach implemented by Diesel as this allows for explicitly communicating all errors to the user. On the other hand, the transaction object based approach enables passing the transaction object easily between different functions, allowing for a more straightforward division of functionality. Overall, both approaches guarantee that we execute all three conceptual steps, however, the transaction object based solution might not propagate all failures and therefore contradicts our second requirement.

Asynchronous approaches for providing a transaction API

Translating both API variants to async Rust code looks simple on the first look. Just add a .await to all function calls that are now async and mark functions itself as async. That would include the functions that execute the BEGIN/COMMIT/ROLLBACK queries and the callback.

Practically that would lead to the following code for the closure based approach:

impl Connection {
    async fn transaction<T, E, F>(&mut self, f: impl FnOnce(&mut Self) -> F) -> Result<T, E> 
    where 
        F: Future<Output = Result<T, E>>,
        E: From<crate::result::Error>
    {
        // start the transaction 
        // (this corresponds to step 1 in our step list)
        self.batch_execute("BEGIN").await?;
        // execute the inner queries by 
        // calling the callback
        // (this corresponds to step 2 in our step list)
        match f(self).await {
            Ok(o) => {
                // commit the transaction
                // if the inner function returned Ok
                // (this corresponds to step 3 in our list)
                self.batch_execute("COMMIT").await?;
                return Ok(o);
            }
            Err(e) => {
                // rollback the transaction
                // if the inner function returned
                // an error
                // (this corresponds to step 3 in our list)
                self.batch_execute("ROLLBACK").await?;
                return Err(e);
            }
        }
    
    }
}

Unfortunately this implementation will not be usable as the compiler complains that the future returned by the callback contains the borrowed connection and that it might return that borrowed connection. For the synchronous case rustc is able to implicitly infer the correct lifetime bounds here, for the asynchronous case we cannot express the correct lifetime bounds as this involves a higher ranked lifetime that needs to restrict another bound (specifically we cannot write for<'a> FnOnce(&'a mut Self) -> (impl Future<Outpt = Result<T, E>> + 'a). Solving that would require async Fn* bounds, which are currently unstable. The good thing is that we can workaround the lack of that feature by just boxing the future returned by the callback. That allows us to express the correct lifetime bound by using for<'a> impl FnOnce(&'a mut Self) -> BoxFuture<'a, Result<T, E>> as type for our closure.

Now you could say: Ok, that wasn't that hard and now it works. Unfortunately it doesn't as soon as you consider cancellation. Cancellation in the context of Rust futures refers to the fact that you can start polling a future and at some point before completion stop polling and drop the future. This commonly happens when you apply timeouts or similar mechanisms to running futures. As futures are always stopped at await points, each single await point is a location where the underlying runtime would be able to stop polling our future. By looking at the code we quickly notice that this might result in situations where we already started the transaction and then just stopped processing the future. This in turn results in a still open transaction and violates our third requirement.

Now, you could say: Just use the other approach based on transaction objects. So let's have a look if that one works better. This time we easily can make all functions async and insert some .await calls, with one notable exception: Our Drop implementation must be sync. There are different ways to workaround this problem:

  • We would use a function like tokio::runtime::Runtime::block_on to poll our future to completion.
  • We could use a function like tokio::task::spawn to run our future in the background after exiting from drop.
  • We could mark the connection as having a open transaction and execute a rollback as soon as the next action is performed with this connection.

The first solution introduces implicit blocking code in an asynchronous environment. This is not obvious for the user, as drop calls are usually hidden. Also it might panic, like in the case of tokio, which detects if the block_on function is called in the scope of a running runtime.

In addition the first two solutions would couple the implementation to a specific runtime as there is currently no runtime agnostic version of this functionality. We could likely write such an abstraction on our own, but that wouldn't cover all the runtimes out there.

The third solution has the major drawback that it assumes that we execute the next action on the connection shortly afterwards, as otherwise the transaction stays open for a long time potentially.

Finally none of the solutions allows us to communicate a potential rollback error back to the users, which violates our second requirement.

Currently both tokio-postgres and sqlx feature a transaction object based approach. They both use the third variant for their Drop implementation. This schedules the rollback before the next action is executed on this transaction.

diesel-async and sqlx provide a callback based solution. The sqlx implementation internally relies on their transaction object implementation, so it schedules a cleanup at a later point in time. diesel-async documents that the returned future is not cancellation safe. In addition it implements checks that mark connections with open transactions as broken before returning them to the connection pool. That prevents reusing the transaction, as the connection pool will close the transaction. This comes with a performance cost as we potentially need to establish a new connection instead.

There are a few hypothetical language features that would solve this problem:

  • async Drop, as it would allow to perform the cleanup directly in the Drop implementation
  • Future::poll_cancel, which would allow to express the transaction logic as custom future
  • do {} final {} constructs as we could then express the rollback as final block
  • Undroppable types would allow us to enforce that either commit() or rollback() is called.

There are differences between how well each of these variants might work. All of them would allow to us to ensure that either ROLLBACK or COMMIT is emitted, which satisfies our third requirement. The later two variants would allow us to also return appropriated errors to the user, while the former two variants do not allow that.