Distributed Locks with Postgres (Advisory Locks) and Knex.js

June 6, 2023 ā˜¼ NestJS ā˜¼ PostgreSQL ā˜¼ Node.js ā˜¼ Knex

šŸ“ Context: Node.js

For many productive use cases, on top of scaling, you might want to have multiple running instances of your service at the same time.

Suddenly you find yourself in a distributed environment and this require different handling in some situations. For example:

What if we want to prevent users from making more than one simultaneous request to a shared resource? Essentially all our instances must operate with that shared resources in a mutually exclusive way

Life is easy when thereā€™s only 1 instance of your service :)

In this blog post Iā€™ll walk you through the way I approached the problem given the following requirements:

  1. Mutual exclusion. At any given moment, only one client can access (read/write) a resource
  2. Deadlock free. Eventually the lock on the resource will be released
  3. The approach would be effective across multiple instances
  4. Simplest possible solution with the lowest possible effort (deadlinesā€¦ šŸ„²)

Tech Stack

Iā€™ve personally applied the following techniques to a Node.js Microservice written in NestJS.

Which solution do we choose?

The first thing that comes to mind is some sort of Distributed Locking. The topic is quite complex with many possible solutions and trade-offs but our brain is wired in a way that we tend to explore first the known solutions.

In my case, Redis.

Unfortunately, it doesā€™t fit requirement #4. Our Application doesnā€™t have Redis and that means we would have to add a new piece of infra with all the related costs and complexity.

After a bit of digging, turns out that Postgres has a nifty useful feature: Advisory Locking.

You can create database locks using arbitrary keys (with some meaning for your application, like a resource ID) but the system does not enforce their use ā€” it is up to the application to use them correctly.

We basically have to acquire a lock for a given resource identifier, do our work and then release it:

SELECT pg_try_advisory_lock(1);

After weā€™ve done our work:

SELECT pg_advisory_unlock(1);

BINGO. This solution seems to fit all our requirements. In particular #4 since we already use Postgres (all our instances share the same underlying PostgreSQL database).

Thereā€™s more to the story though, please read on.

First Implementation

In the first iteration I used the Session Advisory Locks. Once acquired, they are held until they are manually released or the session ends. This is very important as you will see later. Also, a session lock can be acquired multiple times by its owning process; for each completed lock request there must be a corresponding unlock request before the lock is actually released.

Meaningā€¦ if I do the following:

SELECT pg_try_advisory_lock(1); --> returns true
SELECT pg_try_advisory_lock(1); --> returns true
SELECT pg_try_advisory_lock(1); --> returns true

I also have to call unlock 3 times:

SELECT pg_advisory_unlock(1); --> returns true
SELECT pg_advisory_unlock(1); --> returns true
SELECT pg_advisory_unlock(1); --> returns true

SELECT pg_advisory_unlock(1); --> returns false (no lock)

Code: LockManager

I want the API for it to look like this:

await lockManagerService.withLock("myKey", async () => {
  // Lock will always be release automatically
  await accessExternalResource();
});

The below code is a simplified version of the real code.

The class PostgresClient itā€™s just a wrapper that returns a Knex js instance ((method) PostgresClient.getPool(): Knex<any, any[]>) with some configured Pool:

pool: {
  min: 0,
  max: 50,
}
@Injectable()
export class PsqlLockManager implements LockManager {
  constructor(private readonly postgresClient: PostgresClient) {}

  async withLock(
    resourceIdentifier: string,
    callback: WithLockFunction
  ): Promise<void> {
    try {
      const isLockAcquired = await this.acquireSessionAdvisoryLock(
        resourceIdentifier
      );

      if (!isLockAcquired) {
        throw new LockingError(
          "Resource with key ${resourceIdentifier} still locked"
        );
      }
      await callback();
    } catch (error) {
      throw error;
    } finally {
      await this.releaseLock(resourceIdentifier);
    }
  }
  private async releaseLock(resourceIdentifier: string): Promise<void> {
    const lockKey = this.generateLockKey(resourceIdentifier);
    try {
      const releasedLock = await this.postgresClient
        .getPool()
        .select<{ lock_status: boolean }>(
          this.postgresClient
            .getPool()
            .raw("pg_advisory_unlock(?) AS lock_status", [lockKey])
        )
        .first();
    } catch (error) {
      throw new DeadlockError(
        "Resource with Lock ${resourceIdentifier} can not be released"
      );
    }
  }

  private async acquireSessionAdvisoryLock(
    resourceIdentifier: string
  ): Promise<boolean> {
    const lockKey = this.generateLockKey(resourceIdentifier);

    try {
      const obtainedLock = await this.postgresClient
        .getPool()
        .select<{ lock_status: boolean }>(
          this.postgresClient
            .getPool()
            .raw("pg_try_advisory_lock(?) AS lock_status", [lockKey])
        )
        .first();

      if (!obtainedLock) {
        throw new InternalError("System is not able to lock the resource");
      }

      return obtainedLock.lock_status;
    } catch (error) {
      throw error;
    }
  }

  /**
   * @description The lock unique identifier must be a 64-bit big int or a 32-bit integer. This function transform a string in an Int
   * @param str
   * @returns number
   */
  private hashStringToInt(str: string): number {
    let hash = 0;

    for (let i = 0; i < str.length; i++) {
      const character = str.charCodeAt(i);
      hash = (hash << 5) - hash + character;
      hash |= 0; // Convert to 32-bit integer
    }

    return hash;
  }

  private generateLockKey(resourceIdentifier: string) {
    return this.hashStringToInt(resourceIdentifier);
  }
}

The Pitfall of Using PostgreSQL Session Advisory Locks with Knex.js DB Connection Pool

As I was testing the locking mechanism of the PsqlLockManager class I encountered a problem:

Session-level pg_advisory_locks can only be released in the same database session in which it was obtained!

Our Knex.js instance is using a Pool for performance reasons and it takes a new connection from the pool for each query which makes advisory lock useless.

Long story short you must unlock a session-level advisory lock on the same connection which was used to lock it, otherwise you will end up blocked forever.

Itā€™s not possible, as far as I know, to return the same connection from the DB pool in order to be used for the unlock mechanism.

Bummer :(

Solutions

You have 2 solutions:

  1. You can have a specific Knex instance with pool size 1. Your queries should be sent only to that specific connection. Doing so you can lock and unlock without issues. Knex will still reuse the same connection between requests (no TCP connection setup required) but it still not very efficient and scalable.
  2. Use Transactional Advisory Locks
begin;
SELECT pg_advisory_xact_lock(1);
-- Do  work in between
-- Lock released
commit;

From the docs:

Transaction-level lock requests, behave more like regular lock requests: they are automatically released at the end of the transaction, and there is no explicit unlock operation. This behavior is often more convenient than the session-level behavior for short-term usage of an advisory lock.

And thatā€™s what I ended up adopting.

The code for it is not much different (let me know if you are interested in another blog post with it) and all the tests were green now. We have a working solution!

šŸ„³

Conclusion

One big take away of this experience is that the ā€œdevil is in the detailsā€, as the saying goes. If we had deployed the first iteration of the code it would have been riddled with hard to catch bugs (since these type of scenarios wonā€™t happen very often).

Thorough testing is the key!

āš ļø This solution might not fit your requirements. Make sure you evalute all the trade-offs and alternatives before copy/pasting ;)

Thatā€™s all folks! :)


If you have any suggestions, questions, corrections or if you want to add anything please DM or tweet me: @zanonnicola