Feeding data generated via asyncio into a synchronous main loop

I’m trying to build an application that scrapes webpages and loads the resulting data into a sqlite database. The synchronous version of it is basically as follows:

for page in get_webpage_data():
    insert_into_db(connection)

There’s basically no point in making the database insert asynchronous as sqlite doesn’t support multiple readers. And putting the database work into a thread has problems as well (tidying up the thread on errors, interrupt handling) which are basically just unnecessary work and trouble. (And in case anyone suggests it, aiosqlite puts the database on a thread, and the threading problems I hit from this is why I’m looking for alternatives…)

But getting the webpages is absolutely the sort of thing I’d want to use asyncio for - an async version of that code runs far faster than a synchronous version.

What I’d ideally like to do is to have a way of getting the asyncio loop to “run long enough to return one value” then pause while I process that value, then continue, etc. That way, I could write the internals ofget_webpage_data() asynchronously, without needing to change the rest of my code. Yes, this would mean my web scraping can’t happen while my database updates are running, but I’m willing to pay that price (at least initially - if the price turns out to be too high, I can still switch to running the DB in a thread if I need to).

Is it possible to do something like this? I’ve tried checking the documentation, and couldn’t see anything, but honestly I don’t know if my lack of understanding means I can’t frame the question well enough, or if there’s a fundamental reason why what I want to do isn’t possible.

Can anyone help?

1 Like

I think if you just do your database writes directly from inside your asyncio tasks, that’s effectively what you’d get?

1 Like

So basically just call the blocking DB updates within the async code? Hmm, I’d never thought of that. I guess you’re right, that would work. It’s a bit of an inversion of the logic I would normally use (the “iterator feeding data as it arrives to the processing step” logic I showed above) but that’s an aesthetic point, not a technical one.

I’d still be interested in something that worked the way I suggested - it seems like a “natural” way to integrate sync and async processing without the all-or-nothing “async def main” model.

Although I guess that an async iterator like this:

async def get_webpage_data():
    tasks = [get_page(url) for url in my_huge_list]
    for coro in asyncio.as_completed(tasks):
        yield await coro

async def main():
    async for data in get_webpage_data():
        process(data)

is pretty close to what I was describing initially.

As I said, I think my biggest problem is not really having understood the basic concepts (I hope I don’t offend anyone by saying this, but the asyncio docs aren’t the best at explaining the “big picture” ideas)

Thanks for the pointer - I think I’ve got a good solution now (and I’ve definitely learned something :slight_smile:)

1 Like

AFAIK, that is what EventLoop.run_until_complete does, if you feed it a coroutine that returns the next value.
The newer API, run, creates a new event loop, runs a coroutine in it to completion, and finalizes the loop. That’s better for most cases – you’re not mixing up event loops and the finalization is more predictable.
The old way of doing things involves a (usually global) event loop whose scheduled tasks persist between calls to run_*. A bit more flexible, but you need to remember to close the loop manually.

I guess so, I hadn’t really thought of it like that. To be honest, I hadn’t really got a good picture of how run_until_complete would be used in real life except for the “run my async stuff” use case that’s now better handled by asyncio.run (apart from the bug with proactor closedown that seems to exist in 3.8).

So I guess the question is what would be the “coroutine that returns the next value”. It feels like this would be what got returned from asyncio.as_completed(tasks), so I guess something like:

# Synchronous code
def task_results(tasks, loop):
    # Maybe get the default loop rather than having it passed as a parameter?
    # Is that even possible as there isn't a currently running loop...?
    for coro in asyncio.as_completed(tasks):
        yield loop.run_until_complete(coro)

But having to manage the loop variable seems messy, and I’ve completely lost track of whether the remaining tasks will continue in the background if I do this (given that “continue in the background” really means “sit there suspended until I’m done” as I’m back running blocking code at this point).

I get this a lot with asyncio - it all seems fine until I need to do something unusual, at which point I trip up on some detail, need to dig deeper, and my head explodes :slightly_smiling_face: It feels like “simple” asyncio is easy (and great!), “complex” asyncio needs guru-level knowledge (and that’s fine) but there’s no really accessible path to “intermediate” level knowledge, where you understand how the bits fit together without necessarily understanding enough to implement your own event loop or protocol…

Asyncio has a thread-local default event loop, but it’s problematic. While sharing all the running tasks/coroutines across an application is OK, you need to finalize it exactly once in the entire app. Not good for composable libraries.

The new asyncio.run creates, startes, runs, and finalizes a loop all in one synchronous call. During that time, that loop is the currently running event loop. No global state, but all coroutines still have access to their loop.

If you need to run coroutines in more than one synchronous call, then yes, unfortunately you’re writing lower-level code :‍(
AFAIU, all synchronous code should handle the loop explicitly. So I’d write something like:

import random
import asyncio
import time


def task_results(tasks_maker):
    loop = asyncio.new_event_loop()

    tasks = loop.run_until_complete(loop.create_task(tasks_maker))

    try:
        for coro in asyncio.as_completed(tasks, loop=loop):
            yield loop.run_until_complete(coro)
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

###

async def example(i):
    sleep_for = random.uniform(0, 2)
    await asyncio.sleep(sleep_for)
    print('Task', i, 'slept for', sleep_for, 'seconds')
    await asyncio.sleep(0)
    return sleep_for

async def make_tasks():
    return [example(i) for i in range(10)]

for result in task_results(make_tasks()):
    print('Synchronous code got', result)
    time.sleep(0.1)


Hmm, but my understanding that “synchronous code needs to handle the loop explicitly” is not that compatible with the loop argument being deprecated from the synchrouous as_completed.
I do wonder what the replacement here should be.

In the case of the loop argument deprecated across asyncio members such as asyncio.as_completed(), the primary purpose (from my understanding, at least) is to limit the amount of passing of the loop in general; only getting it when you specifically need it (via get_running_loop(), not get_event_loop() which can have unexpected behavior). The long-term goal is to only require explicit access to the event loop when working with lower-level parts of asyncio in libraries rather than user code, but the API is currently in a bit of a state of gradual transition as higher-level equivalents are being added. An example of this was loop.run_in_executor()asyncio.to_thread()to_thread() uses the common case of passing None as the first arg, for using the default ThreadPoolExecutor).

Eventually, I’d imagine also adding an equivalent for loop.run_until_complete(), which could effectively replace it. But as a library that’s considered to be mature, adding too many new members at once can be a bit much. This might be reasonable to include in 3.10 or 3.11 though, given the rather common use case for running a function synchronously. @yselivanov?

Can’t speak for Yury, but I don’t see much point in replacing run_until_complete, given that there’s already a simpler way to run sync functions, i.e., by just… calling them.

The problem is one of running a declared async function synchronously (in effect, “I don’t have anything else for you to do while you await, so please just block until you’re all done”).

That said, I do see people unnecessarily nervous about calling a blocking function inside an async stack when they know they really do want to block. I guess we’ve successfully traumatised enough people with threads and then failed to explain how async is a totally different thing :slight_smile:

I’d say that’s definitely me :slight_smile:

In my case, I don’t actually want to block, per se, rather I want to serialise calls to sqlite. But I also want to avoid putting sqlite calls onto a thread because, well honestly just because I’m fed up of dealing with threads¹ :wink: So blocking on the database calls is a trade-off.

But it’s definitely true that I don’t really have a good picture of what async is, so I sort of think in terms of “less painful threading, but you’re not allowed to do anything that blocks”. And as a result I don’t actually know how to evaluate the blocking-on-the-database trade-off.

Because Python async grew from generators, coroutines and futures, a lot of the documentation I’ve found (at least stuff that goes beyond the basic “just do this and it works by magic” level that I used to get me started) tends to dive into that rather than discussing async as a concept. I wish I knew what the documentation I’d like to see would look like - then I’d be able to offer something more constructive - but unfortunately it feels like one of those “I’ll know it when I see it” situations…

Actually, trio’s documentation is a pretty good example of what I’m after, but the problem there is that it’s about trio, and we have the framework issue - if I want to use a library that only supports asyncio, I have to use asyncio, so I need to re-learn the stuff that trio does differently than asyncio. (I initially started writing my program with trio, but hit exactly this and had to switch to asyncio).

I was going to say that I’d like to see something framework-agnostic, but I don’t even know if that’s possible (is there a framework-agnostic way to discuss asyncio.gather or asyncio.as_completed, or trio nurseries, or are even those concepts so locked to their particular frameworks that you have to choose a mental model and stick with it?)

But - isn’t run_until_complete the means of calling an async function from sync code, not the other way around? And that’s the one (thinking about how the trio docs explain it) that needs the framework to provide the support for? (But asyncio.run is the higher level version, that fires up a new event loop for the duration of the function, so maybe run_until_complete is only needed in code that’s explicitly passing a loop around?)

¹ For example, aiosqlite uses a database thread and doesn’t handle Ctrl-C properly, so I don’t have much confidence that I’ll be more likely to get that right than they were…

Yeah, the dirty little secret is that async is already serialised, which is why just calling it synchronously is fine. (Sending it out to a separate thread and waiting on it using asyncio locks is somewhat better, but only if there is other work to do besides writing to the DB.) It’s really not at all like threads, async is far more about “what will I do while I wait” than “how many things can I do at once”.

1 Like

Yep, that’s what I was specifically referring to – calling an async def/coroutine function synchronously using loop.run_until_complete(). Running a non-coroutine function synchronously is done just by calling it of course. asyncio.run() doesn’t really work as a higher-level version of loop.run_until_complete() in that context because it creates a new event loop every time instead of using the existing one. It’d be convenient to have a version that solely uses asyncio.get_running_loop() (raises when called outside of a running event loop) internally but doesn’t require the user to fetch a reference to the event loop. Assuming my interpretation of the long-term goal of removing the loop arg throughout asyncio is correct (see prev post for details), it seems to be in line with that.

In general, dealing with the event loop is also something that’s commonly mishandled by users, especially in the context of working with multiple threads. For example, you can quickly find yourself in the realm of undefined behaviors from passing around a reference of an event loop to other threads, since it’s intended for an event loop to be used only within the thread that it’s created. See Andrew’s “there be dragons” post.

I think that asyncio would be better off in terms of making it easier to work with if the majority of users didn’t have to worry about dealing with the event loop directly, and potentially getting it wrong in a way that’s not easy to detect.

That’s what run_until_complete does, yeah. But the reason you would use it instead of asyncio.run is if you want to alternate between running async code for a while, and then freezing the async tasks while running some sync code, and then going back to running the same async code, etc. And for that use case, you can accomplish the same effect more simply by just calling your sync code from inside some async task.

Nothing will explode if you call sync code from inside async context. It’s just that while you’re doing that, none of the other tasks can make progress, because they all share the same thread and your sync code is hogging that thread until it finishes.

4 Likes

I know you’ve specifically said this is not what you’re looking for, but if it was me doing this, I’d put the sqlite logic into a thread and use something like https://github.com/aio-libs/janus to push data to it (the logic is just waiting on the queue until a poison pill comes out and doing inserts). Conceptually it’s the cleanest way. Maybe encapsulate the entire thing into a class so shutdown is easier.

My main problem with this is handling Ctrl-C. If I get a KeyboardInterrupt in the main thread, how does that cleanly get the sqlite thread to clean up the database connection and then terminate the thread? Because unless you do something special, threaded applications typically hang when you hit Ctrl-C because the main thread exits, but the worker threads still exist and are not daemons, so the process waits forever for the workers to complete. (And making the worker a daemon doesn’t help in this case because the DB needs to be tidied up).

This is why I prefer not to use threads - but if there’s a clean, easy to use idiom for making sure that Ctrl-C handling works, I’m happy to use it. I’m not aware of anything that isn’t fiddly to get right, though.

I can’t specifically attest for how clean it is because I don’t frequently use signal handling myself, but it might be worth registering a specific signal handler function for SIGINT so that the first Ctrl+C can close your db connections and join the worker threads instead of just exiting the main thread (with signal.signal(signal.SIGINT, custom_handler)). I don’t believe there’s much you can/should do about multiple Ctrl+C, such as if it’s sent while in the handler (other than except KeyboardInterrupt); but that would likely be an improvement.

Yeah, without cancellation being properly plumbed through the system (in a way that even the DB query can be interrupted when cancellation is requested) there’s no good way to handle this.

C# has a pretty good set of primitives for this, though they were deemed unnecessary for Python (until it was discovered that they are, in fact, necessary…).

Cancelling the wait, rather than the underlying operation, is the best we have. In this case, it basically means use a daemon thread and hope the DB survives.

Unfortunately, if the worker thread is in the middle of a long blocking DB operation, the interrupt in the main thread is not passed to the DB, so nothing can interrupt that. Even making the thread a daemon doesn’t fix that, all it does is allow the whole process to die before the DB operation is complete, but that gives no chance for cleanup. Databases are built to handle unexpected termination, but there’s still consequences (sqlite, for example, leaves a journal file lying around which is tidied up when you next open the database and do an update - and if you lose the journal file before then you’re screwed).

Here’s a simple reproducer (that doesn’t involve asyncio):

import threading
from time import sleep

def big_block():
    try:
        sleep(10)
    finally:
        print("cleanup")

if __name__ == '__main__':
    t = threading.Thread(target=big_block)
    t.start()
    t.join()

There’s no way that I know of to modify this so that a Ctrl-C will terminate the process cleanly (printing “cleanup”) before 10 seconds have passed. And that’s basically why blocking operations on threads are problematic. (Disclaimer: I’ve only tried this on Windows, I don’t know Linux signal handling so it may be different there - but that’s no help to me).

Yes, that seems to be the best option we have right now - or I do what I’d basically decided to do which is do the blocking DB operations on the main thread.

Hmm, I wonder if it’s possible to have the main thread be my DB consumer loop, with the async stuff happening on a worker thread? That might be viable, I guess, although it starts to seem fairly over-engineered (block on the main thread is 100% sufficient for my use case, at this point I’m just speculating on other options “in case I ever need them”…)

Note that even if you run blocking sqlite queries on the main thread, control-C won’t actually interrupt any sqlite database queries, since Python doesn’t run signal handlers while the main thread is blocked in C code. So if you hit control-C while sqlite is working, then you’re still going to be stuck waiting until sqlite finishes before Python notices and you get the KeyboardInterrupt.

In practice I’m guessing this is probably fine for your use case, because all your sqlite queries probably complete in a few milliseconds. But the point is that running in the main thread isn’t actually magic :-).

You can get the same semantics with threads by running each sqlite query in its own thread, and then when control-C arrives, wait until the thread completes before raising KeyboardInterrupt. I agree that implementing this by hand is probably super annoying and would have lots of tricky edge cases. But it’s the sort of thing a framework can handle for you, e.g. these are the semantics that you’d get “for free” if you used trio.to_thread.run_sync. And in general, this is about the best you can hope for when working with arbitrary C code.

With asyncio specifically, I don’t think it can handle this, because in general when you hit control-C on an asyncio program then the results are undefined. Usually it makes the event loop stop at some arbitrary point and whatever happens from that happens, including background threads continuing to run, maybe. So if you’re using asyncio then keeping the db access in the main thread is a reasonable idea.

In theory, the ideal solution would be to delegate the sqlite queries into threads, but also integrate it into some kind of overarching cancellation system so that when control-C comes in, it eventually invokes sqlite_interrupt to tell sqlite that we want the thread to terminate ASAP. Regular synchronous Python and asyncio don’t currently have the plumbing to make this possible. In Trio it’s possible in principle, though you’d definitely have to do some work to wire everything up.

1 Like

In practice, blocking on the main thread is fine, but not because my queries are small. I’m bulk-updating a 2.5 GB database, with a 3-million row table, so queries taking a long time is definitely a problem. Blocking is fine because I don’t need to interrupt that often (and when I do, proper cleanup is more important to me than aborting the current update).

As a side note, I did think that Ctrl-C interrupted C calls - it does interrupt time.sleep, but apparently it doesn’t interrupt sqlite3 calls. I’d argue that’s a bug in the sqlite3 module (albeit not a particularly critical one).

That bothers me a lot, though. Are you saying that Ctrl-C is unsafe to use in an async program? That sounds like a pretty worrying limitation. I’d hope that the event loop is exception-safe, in the sense that if an exception happens (ctrl-C or anything else) it would propagate out of the asyncio.run (or equivalent) cleanly, without the asyncio internal data structures being corrupted.

I’d assumed that something like the following was true. Within async code, when an exception occurs everything bar at most one routine is sitting in an await. That one routine would see the exception just as normal synchronous code does, and if it doesn’t handle the exception, it would propagate back up out of an (essentially arbitrary) await in some other routine. The code calling that await then gets to handle or ignore the exception, and so on back to asyncio.run.

That’s not IMO “undefined” even though the exact flow is “arbitrary”. And certainly an exception could mean that any number of await calls never actually return. But it’s possible to reason with, and I’d certainly hope that something like this was the case and not “if you hit Ctrl-C on your async program, say goodbye to any assurance of data integrity”! I do admit that it’s a rather low-level description of what’s going on, and may not be easily translated into something higher level, but “in the face of exceptions, not every await will resume before the exception propagates out of asyncio.run” seems sufficient…

Aha! I hadn’t noticed the existence of sqlite3.Connection.interrupt(), but using that, I suspect it’s possible to put together some sort of cancellation system, as you suggest. I’m not sure why you say “regular synchronous Python and asyncio don’t currently have the plumbing to make this possible”, but I suspect if I try I’ll find out soon enough :slightly_smiling_face:

By the way, thanks to everyone for all the information - it’s given me a lot of food for thought, and significantly advanced my understanding of asyncio.