Stuck with psycopg2 + multithreading!

Hi all!

This is my first post here. I am trying to understand locking and multithreading concepts with python and postgres. I have built a simple function to book a seat(say for an airline/theater) and I am trying to run it with 100 concurrent users. Here is some code for reference.


def book_seat(TRIP_ID, user_id, conn):
    try:
        with closing(conn.cursor()) as cur:
            cur.execute("SELECT id FROM seats WHERE TRIP_ID = %s AND user_id is NULL ORDER BY id LIMIT 1 FOR UPDATE", (TRIP_ID))
            seat = cur.fetchone()
            seat_id = seat[0]
            print("Trying to book seat {} for user {}".format(seat_id, user_id))
            cur.execute("UPDATE seats SET user_id = %s WHERE TRIP_ID = %s AND id = %s", (user_id, TRIP_ID, seat_id))
            if cur.rowcount == 0:
                print("Failed to book seat {} for user {}".format(seat_id, user_id))
                return False
            print("Booked seat {} for user {}".format(seat_id, user_id))
            conn.commit()
            return True
    except Exception as e:
        print(f"Error in booking for user {user_id}: {e}")
    finally:
        pool.release_conn(conn)

My expectation was that, if I do not provide FOR UPDATE in the select query, then only first few tickets will be booked back to back by all the threads, because all the threads would see the first empty seat and keep on booking it, till the time one of the threads commit it. This is working fine, but when I provide FOR UPDATE, I am noticing the same behaviour!

I read upon psycopg2 documentation, and it states that v2.9.3 onwards, the connection object is thread safe, so we should be able to use same conn object with multiple cursors which I am doing in my code. But looks like something is missing ?

I have also tried using multiple connection objects(by creating a ThreadedConnectionPool, but then my code is booking 1 seat at a time and taking lot of time, its almost like a sequential execution. I have initialized my pool with 100 workers.

Here is how my book_seat is being invoked for reference.


conn = pool.get_conn()
conn.autocommit = False # Also False by default though, doing this because i want to run my # entire function as a single transaction
with concurrent.futures.ThreadPoolExecutor(max_workers=180) as executor:
        for user_id in users:
            executor.submit(book_seat, TRIP_ID, user_id, conn)

I have tried a lot to debug and take AI help, but not able to understand this one, please give some pointers!

Thanks :slight_smile:

This is not a python question its a Postgreqsql SQL question?

Its been too long since I did SQL stuff to recall details.

I think you have to start a transaction that takes out an update lock - but as I say its been too long since I worked on postgresql.

Or you have to make the update and select be one SQL query.

The reason I have posted this in python help is because I suspect this to be a problem on psycopg2 side or the way I am using it. I am running same thing in golang and it works fine.

It works with single query ofcourse, but I want to run it in 2 phases to be sure about the behaviour of python.

Superficially your approach seems sound (based on looking at it and
reading the docs - I haven’t used FOR UPDATE myself though now that
you’ve pointed it out I have a use case for it).

To clarify, you’re saying that the code updates the same row for
multiple threads (because of the usual race) even when you use FOR UPDATE?

My only speculation would be that you haven’t explicitly opened a
transaction, so the FOR UPDATE might be ineffective (no current
transaction to provide a scope for the row lock)?

The other thing which occurs to be is that you’re using the same
connection in all the threads. I’ve have taken that to imply that there
can only be one cursor in play at a time, effectively serialising the
SQL in book_seat(). Maybe stick some global counters in
boot_seat()'s with closing(...) clause to verify that.

I’d have thought that for parallel SQL operations you’d need to open a
connection per thread (or get one from a pool).

I suspect that if the psycopg2 documentation says that connections are
thread safe itmay just mean that you don’t need to use mutexes of your
own to allocate and release them and their cursors. I wouldn’t normally
take it to mean that you can run parallel SQL through a connection;
instead I’d take it to imply that a connection will prevent multiple
cursors using it at a time.

All of this is specualtion; I haven’t yet tried your code myself.

Cheers,
Cameron Simpson cs@cskk.id.au

It’s confusing to use with closing here since the cursor should already automatically be closed when you simply do:

    with conn.cursor() as curr:
        # more code

I’m not sure what your aim is - If you want to learn more about threading then I would advise (pretty strongly) to not use a database API as your material, since that in itself is already pretty difficult (you have to double-check almost everything here: how are the schema’s defined, what kind of uniqueness constraints are there, how does this interact with your business logic and actual usage, what kind of locks does psychopg2 use under the covers, in how far is psychopg2 really threadsafe - tons of pretty hairy questions).

Another thing that seems problematic in your code is that you let the pool (I assume this is a psycopg2.pool.ThreadedConnectionPool?) release the connection inside the book_seat function. If this function releases connections, it should also create and open them, not take them as input argument! So, if it takes them input argument, then it seems better to not have that finally clause. Whatever connections you have should be automatically closed by the pool itself (in your higher-level code).
(The code as-is seems especially problematic if you only have this one connection - How does that work, if the connection pool releases it in one thread? I’m not familiar with psychopg2, so it may be ok, but the formal design of the code seems suspect to me.)

According to the docs, the connection objects are thread-safe and shareable among threads. Cursors are not. But cursors created from the same context (connection) are not isolated… (yuck!) So, I don’t know if this code is really correct - or will always work correctly, or is efficient. Assuming it is correct (assuming the underlying connection object implements transactions correctly), I would still assume it should often fail in a multithreaded environment (=lead to rollbacks rather than commits), and I don’t see what kind of retry logic you have.

Have you considered using SqlAlchemy rather than the low-level psycopg2 code? SqlAlchemy is a higher-level Pythonic interface to databases (for instance postgres) so is much easier to use. You can express the DB schemas and transactions in actual Python code, instead of having to do the round-about way of using SQL query strings.

Hi Hans,

Thank you for your analysis, let me try to answer it briefly.

You are correct, it should not. I was experimenting with 1 connection, diff cursors vs diff connection per thread so the design is like that. Moreover, like I said, if I create and release connections within the book_seat function, it kind of behaves synchronously, as if there was no threading in place, because I see in logs that user 1 got seat 1, user 2 got seat 2, etc.


def book_seat(TRIP_ID, user_id):
    conn = pool.get_conn()
    try:
        with closing(conn.cursor()) as cur:
            cur.execute("SELECT id FROM seats WHERE TRIP_ID = %s AND user_id is NULL ORDER BY id LIMIT 1 FOR UPDATE", (TRIP_ID,))
            seat = cur.fetchone()
            seat_id = seat[0]
            print("Trying to book seat {} for user {}".format(seat_id, user_id))
            cur.execute("UPDATE seats SET user_id = %s WHERE TRIP_ID = %s AND id = %s", (user_id, TRIP_ID, seat_id))
            if cur.rowcount == 0:
                print("Failed to book seat {} for user {}".format(seat_id, user_id))
                return False
            print("Booked seat {} for user {}".format(seat_id, user_id))
            conn.commit()
            return True
    except Exception as e:
        print(f"Error in booking for user {user_id}: {e}")
    finally:
        pool.release_conn(conn)

Yes, and my guess is that somehow the isolation of cursors is the problem, I feel like the documentation is not very clear. If connection objects are thread safe, then I should NOT have to explicitly mark my function as critical section so that only 1 cursor is active at one time, right ?

No I haven’t, I always found running the query directly easier, but hey, let me try that out as well! :slight_smile: will update.

Hi Cameron,

Yes, that is correct.

If you see the psycopg2 documentation, it says that

Warning

By default even a simple SELECT will start a transaction: in long-running programs, if no further action is taken, the session will remain “idle in transaction”, an undesirable condition for several reasons (locks are held by the session, tables bloat…). For long lived scripts, either make sure to terminate a transaction as soon as possible or use an autocommit connection.

So I have not started it explicitly. I have only set autocommit to False so that it doesn’t treat every line as a transaction, instead everything till I manually say conn.commit()

I have tried printing the cursor object in every iteration, and I got different results, so I think we can rule out the existence of single cursor!

<cursor object at 0x000001E4B407E0A0; closed: 0>
<cursor object at 0x000001E4B407DFC0; closed: 0>
<cursor object at 0x000001E4B407E500; closed: 0>
<cursor object at 0x000001E4B407E5E0; closed: 0>
<cursor object at 0x000001E4B407E880; closed: 0>
<cursor object at 0x000001E4B407EDC0; closed: 0>
<cursor object at 0x000001E4B407F140; closed: 0>
<cursor object at 0x000001E4B407F220; closed: 0>
<cursor object at 0x000001E4B407F300; closed: 0>

What you said is the correct interpretation. But then the doc doesn’t tell how to run parallel connections effectively! :frowning:

If you see the psycopg2
documentation
,
it says that

Warning
By default even a simple SELECT will start a transaction: in
long-running programs, if no further action is taken, the session will
remain “idle in transaction”, an undesirable condition for several
reasons (locks are held by the session, tables bloat…). For long lived
scripts, either make sure to terminate a transaction as soon as
possible or use an autocommit connection.

Ah, ok, good.

So I have not started it explicitly. I have only set autocommit to False so that it doesn’t treat every line as a transaction, instead everything till I manually say conn.commit()

Sounds good.

I have tried printing the cursor object in every iteration, and I got different results, so I think we can rule out the existence of single cursor!

<cursor object at 0x000001E4B407E0A0; closed: 0>
<cursor object at 0x000001E4B407DFC0; closed: 0>
[...]

This just says you’ve got several cursor instances. You get a new one
every time you make one. But I believe they’ll be using the same
connection and that they are serialised, so only one cursor is active at
a time.

WRT to making multiple connections, just make another the same way you
made the first one:

 conn = psycopg2.connect(your_dsn)

You could do that per-thread:

 with psycopg2.connect(your_dsn) as conn:
     with conn.cursor() as curs:
         ... do stuff ...

to get parallel db access.

Cheers,
Cameron Simpson cs@cskk.id.au

This is always kind of difficult in these kind of APIs, and is also not always documented very well (I find the psycopg2 docs lack clarity and details about what is really going on). But it’s not necessarily so that if the “connection” is thread-safe, the “cursor” is also thread-safe.

Even though the docs state that a connection is “thread-safe” they do not clarify what this means (I know what it means in general, but without explicit examples, it’s not at all clear what this means in this context). So, I would be wary of this and just not share connections in different threads.

There is also the issue of what postgres itself supports. See for instance this:

Read Committed is the default isolation level in PostgreSQL. When a transaction uses this isolation level, a SELECT query (without a FOR UPDATE/SHARE clause) sees only data committed before the query began; it never sees either uncommitted data or changes committed by concurrent transactions during the query’s execution. In effect, a SELECT query sees a snapshot of the database as of the instant the query begins to run. However, SELECT does see the effects of previous updates executed within its own transaction, even though they are not yet committed. Also note that two successive SELECT commands can see different data, even though they are within a single transaction, if other transactions commit changes after the first SELECT starts and before the second SELECT starts.

(PostgreSQL: Documentation: 16: 13.2. Transaction Isolation)

Just to follow up to Hans’ remarks, transaction isolation is tricky (and comes in different flavours!)

One temptation I find myself in is to consider transactions like mutexes. Which they’re not! They run in parallel and can easily make bad decisions because of it. (Yes, the OP’s row locking should accomodate this issue - I’m not criticising the OP here.)