I’ve become a big fan of PostgreSQL in the last year. Window functions, indexed JSON data types, and full text search are all awesome, but lately I was really happy to find a simple feature that did exactly what I needed: pg_advisory_lock
. Postgres advisory locks are stored along with Postgres’ own internal locks ( you can even see them in the pg_locks
table), but their meaning is entirely application-dependent.
In my case, we have a per-client sync process that occurs in a background task. We don’t want two of these stepping on one another’s toes, so we can make an advisory lock. Suppose we are doing the sync for a client with an id of 4. We can run
SELECT pg_try_advisory_lock(4);
That query will return True/False depending on whether or not we were granted the lock on 4. When we’re finished with the lock, we call
SELECT pg_advisory_unlock(4);
So this is pretty neat, but the locks are using a global namespace. What if we need to lock for a sync and also for some other background task? One trick you can use is to scope the locks using a hash.
hasher = hashlib.sha1()
hasher.update('sync({})'.format(client_id))
lock_name = struct.unpack('q', h.digest()[:8]) # the pg_lock accepts an int8, so we have to throw away some bits.
cursor.execute('SELECT pg_try_advisory_lock(%s);', (lock_name,))
We can extract this pattern into a context manager to make it more Pythonic and easier to reuse. Here’s my attempt:
import hashlib
import struct
import contextlib
from django.db import connection # or create a connection directly with psycopg2
@contextlib.contextmanager
def pg_try_advisory_lock(lock):
"""
Context manager to acquire a Postgres advisory lock.
:param lock: The lock name. Can be anything convertible to a string.
Should be scoped to the user/org and action being taken.
:param cur: A database cursor. Optional.
:return True/False whether lock was acquired.
"""
hasher = hashlib.sha1()
hasher.update('{}'.format(lock))
int_lock = struct.unpack('q', hasher.digest()[:8])
cur = connection.cursor()
try:
cur.execute('SELECT pg_try_advisory_lock(%s);', (int_lock,))
acquired = cur.fetchall()[0][0]
yield acquired
finally:
cur.execute('SELECT pg_advisory_unlock(%s);', (int_lock,))
cur.close()
Now we can use that function like this:
with pg_try_advisory_lock('sync({})'.format(client_id)) as acquired:
if acquired:
print 'beginning sync for', client_id
# ... actually do the sync
else:
print 'some other process holds the sync lock for', client_id