Ok to pass loop to another thread?

Hi all

I read this warning in another thread -

“The reality is: passing a loop instance between threads brings you to the land where dragons live. It is dangerous but you might survive.”

I do this in my project, so I thought I had better find out if this is safe.

I have a class which represents an asynchronous database connection. It works by creating a separate thread to act as a db handler. It uses a queue.Queue to send requests to the handler. A request consists of the actual command, a reference to the running loop, and an asyncio.Queue to send results back.

The handler gets the request from the request_queue, executes the command, and places the results, in chunks of 50 rows at a time, on the return_queue using loop.call_soon_threadsafe(return_queue.put_nowait, rows)

The caller awaits the return_queue and retrieves the results as they appear.

I also use an asyncio.Event so that if the caller sends a ‘commit’ or ‘rollback’, the handler can notify the caller when it has finished, using loop.call_soon_threadsafe(wait_event.set).

Is there any problem with this approach?

Thanks

1 Like

Apologies for the late reply on this, but hopefully my answer can provide some useful information to someone out there.

Is there a particular reason as to why you need to use a thread as the DB handler? This really doesn’t seem like a situation where you would need to use threads, unless you’re using an uncommon DBMS that doesn’t have an async library.

Your situation is pretty much an exact representation of what Andrew was warning about when he was talking about “there be dragons”. It might be okay, but any time your passing an event loop instance between threads (regardless of using call_soon_threadsafe()), you run the risk of encountering undefined behaviors.

If your current approach works perfectly fine and you’re not having issues, great. Just know that if you do encounter problems, there’s not going to be much that we can do to help.

Also, in general, if you can avoid using threads, coroutines are significantly more lightweight and have faster context switching; although it does come at the cost of some degree of added complexity. When using asyncio, it’s typically better to avoid using threads unless you have to (such as when you need to call IO-bound functions from a library that doesn’t have async support). And even in those situations, it’s much better to use loop.run_in_executor() or the upcoming asyncio.to_thread() (in 3.9) than it is to try passing the event loop instance to other threads. It’s suggested to only use one event loop instance per thread, and obtain access to it using asyncio.get_running_loop() when needed.

[aeros] Kyle Stanley Profile - aeros - Discussions on Python.org aeros CPython
core developer
July 18

Apologies for the late reply on this, but hopefully my answer can
provide some useful information to someone out there.

No problem - I appreciate the input.

It works by creating a separate thread to act as a db handler.

Is there a particular reason as to why you need to use a thread as the
DB handler? This really doesn’t seem like a situation where you would
need to use threads, unless you’re using an uncommon DBMS that doesn’t
have an async library.

My project supports 3 databases - PostgreSQL, Sql Server, and sqlite3.
To my knowledge only the first one has native async support.

Is there any problem with this approach?

[…]

Also, in general, if you can avoid using threads, coroutines are
significantly more lightweight and have faster context switching;
although it does come at the cost of some degree of added complexity.
When using asyncio, it’s typically better to avoid using threads unless
you have to (such as when you need to call IO-bound functions from a
library that doesn’t have async support). And even in those situations,
it’s much better to use |loop.run_in_executor()| or the upcoming

asyncio.to_thread()| (in 3.9) than it is to try passing the event loop
instance to other threads. It’s suggested to only use one event loop
instance per thread, and obtain access to it using
asyncio.get_running_loop()| when needed.

I tried using run_in_executor(). It worked, but did not give me what I
wanted.

If you SELECT rows from a database, you have a choice of fetchall(),
fetchmany(), or fetchone(). Or you can use the cursor as an iterator and
fetch the rows one at a time in a for loop.

I definitely want to use the iterator approach. fetchall() would not
block the event loop, but it would effectively block the caller until
all rows were received. With an iterator this does not happen.

I could not see a way to combine that with run_in_executor(). That is
why I came up with my own threaded approach. I found that returning each
row with call_soon_threadsafe() ran slowly, so I changed it so that the
thread returns rows in blocks of 50 at a time, and it runs very smoothly.

I did not know about to_thread(). I just had a quick look, but at first
glance I do not see how it would help in my situation.

By the way, I noticed a typo in the docs for to_thread() -
s/propogate/propagate. Maybe this can be fixed before final release.

Frank

Having said all of that, I realise that I never experimented with using run_in_executor() to fetch 50 rows at a time using fetchmany(50).

That may well give me the same functionality. I will give it a try and let you know. If it works and performance is similar, I will switch to this.

It works, but it runs about 20% slower on retrieving 32k rows. I will stick with my method for now, but will keep this for Plan B in case I run into any dragons!

Ah, somehow I forgot that SQL Server doesn’t have an async library - I’ve been primarily using MySQL (aiomysql) and PostgreSQL (asyncpg) lately.

As for sqlite3, it wouldn’t really benefit as much from async support since it’s limited by local disk I/O (there’s not much mainstream support for async file I/O at the platform level). Async tends to make the biggest difference when dealing with intermittent network delays that vary in duration, but it can still benefit programs with local disk I/O to some degree if you can do some other work in the meantime. In that case, you’re generally using a thread to do the disk I/O instead of using coroutines.

asyncio.to_thread() is effectively a high-level version of loop.run_in_executor() that we just recently added. It behaves similarly to loop.run_in_executor(None, func, *args), but uses the current running event loop instead of calling run_in_executor() on any event loop instance, and directly accepts kwargs.

Thanks for pointing that out, I’ll try to fix that before the next 3.9 beta! I only recently wrote the docs for asyncio.to_thread() within the last couple months, and must have missed that typo.

I suspect it might be a bit slower due to the overhead of ThreadPoolExecutor, which is great for when you’re using a decent number of threads and want to re-use as much as possible, but when dealing with a smaller number of threads, the overhead of managing the workers can result in slightly slower performance compared to using threads directly.

But that’s just my guess based on the info provided, it could also be possible that the job is too large and could be split into multiple workers. That might be worth experimenting with, and could even yield significant performance benefits if the job(s) are long running enough (particularly if the DBMS is running on multiple nodes).

Also, make sure you’re using loop.run_in_executor(None, ...), as this uses the event loop’s default executor (basically just a ThreadPoolExecutor instance in an internal attribute), which is created the first time it’s used and then is re-used for future calls to loop.run_in_executor(). This is opposed to passing in an instance of ThreadPoolExecutor. It’s effectively the same if you have access to the instance globally, but much better than passing a new instance of ThreadPoolExecutor every time you call loop.run_in_executor() (ultimately defeating the purpose of using a pool). I don’t have any reason to suspect that you’re doing this, but I feel that it’s worth mentioning since I’ve seen it happen many times and it can make a very substantial difference.

Either way though, it’s good to know that the loop.run_in_executor() option works in your case as a backup. Thanks for trying it out and reporting back with the results. :slight_smile:

FWIW, you could use loop.run_in_executor() for each row yielded by the cursor, with each iteration effectively being a separate job submitted to the executor. But it might involve overriding the cursor class’s __next__/__anext__, which could be more complex than necessary if your current solution is working without issues.

For now, you could put that aside as another alternative approach to consider. It could also be beneficial to someone else working with a DBMS that has intermittent network delays or has multiple nodes (allowing them to share the workload across separate jobs).