Help with asyncio program freezing during requests

I’m trying to write a program to grab multiple files over http. I’m writing it using asyncio (with httpx as the HTTP library) in the hope of optimising my network throughput, as well as being a chance to learn more about asyncio.

Because I am fetching a lot of files (hundreds of thousands) it’s not practical to use something like asyncio.gather to fire off all of the requests at once (at least I assume it isn’t - if nothing else, I don’t have the memory to have all the responses in RAM at the same time). So I’m using a queue and workers, following a pattern I found on Stack Overflow:

async def worker(client, queue):
    while True:
        url, file = await queue.get()
        tries = 0
        r = await client.get(url)
        data = r.content
        if data:
            file.write_text(json.dumps([url, data.decode()]))

        queue.task_done()

async def main():
    TODO = set(URLs) - set(ALREADY_READ)
    queue = asyncio.Queue()
    for n, url in enumerate(TODO):
        queue.put_nowait((url, LOC / f"{n}.json"))

    async with httpx.AsyncClient() as client:
        tasks = []
        for i in range(20):
            task = asyncio.create_task(worker(client, queue))
            tasks.append(task)

            await queue.join()

        for task in tasks:
            task.cancel()

This works pretty well, in that it gets files at a very good rate, and my network traffic seems consistently high, so I’m using my available bandwidth effectively. By only having 20 workers, I’m assuming I’ll keep the load on the server reasonable.

The problem is that this runs fine for the first few hundred, maybe a thousand, files, but it then locks up. It’s not timing out (I added a check for a timeout exception, just in case, but that made no difference), it just stops processing.

If I hit Ctrl-C, I get the following error:

File "...\Python39\lib\asyncio\windows_events.py", line 316, in run_forever
    super().run_forever()
  File "...\Python39\lib\asyncio\base_events.py", line 596, in run_forever
    self._run_once()
  File "...\Python39\lib\asyncio\base_events.py", line 1854, in _run_once
    event_list = self._selector.select(timeout)
  File "...\Python39\lib\asyncio\windows_events.py", line 434, in select
    self._poll(timeout)
  File "...\Python39\lib\asyncio\windows_events.py", line 783, in _poll
    status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)

That doesn’t really help me understand where it’s stuck, unfortunately.

I added some prints, and there’s a few things I noted:

  1. The last thing printed is just before the await client.get call. So it does seem to hang while getting a response. But why aren’t the other workers still running?
  2. The output is very regular: Send the request, get the response, get the content, over and over. No overlapping at all - surely I’d expect to occasionally have maybe a few “Send the request” lines before getting a response?

I’m worried I’m not actually getting concurrent processing here, but I can’t see why that would be the case.

Also, if a get is hanging, how do I deal with that? I thought httpx timed requests out automatically (the docs say it does) so why am I pausing indefinitely? Could it be something is still synchronous (like DNS?) and if so is there any way around it? It feels like the only option might be threads, because co-operative multi tasking isn’t going to work too well if the tasks won’t co-operate :roll_eyes:

While somebody pointing out what I’ve done wrong would be much appreciated, I’m also interested in suggestions about how to debug things like this.

  1. What’s the best way of finding out exactly where my code is hanging?
  2. How do I confirm things really are happening in parallel? It feels awfully easy to think I’ve written a program that does 20 requests in parallel, and later find it was entirely serial…
  3. Are there common patterns I should be using for things like worker threads, cancelling tasks after a timeout, rate-limiting http requests? I’m currently making stuff up myself or using StackOverflow, but I’d rather not reinvent too many wheels :slight_smile:

Away from a Python interpreter right now, but it looks like you’re joining on your queue after starting your first worker: maybe dedent?

I prefer to have exceptions in my workers be raised in the main stack, in my case with concurrent.futures.wait, perhaps awaiting the futures with a timeout then checking if queue is empty in a loop?

Oh, bloody hell!!! Yes, that’s almost certainly the problem with not running in parallel. Thank you, and my apologies for wasting your time with a dumb mistake like that.

Although I will say that I feel like it’s awfully easy to make mistakes like that with asyncio code, possibly more so than with threaded code (where I don’t recall having ever done anything quite that dumb). Maybe there’s something that could be done to make async more idiot-proof? I’ll happily volunteer as the idiot :slightly_smiling_face:

I’m not quite sure what you mean here. I thought concurrent.futures was about threading, not async? But even so, could you explain the structure you’re thinking of in a bit more detail?

It could be that the one worker that got started hit an exception, and your script hung because you were still awaiting queue.join(), which was never going to happen because there were no active workers anymore. You could do something like:

# after the task creation loop
tasks.append(asyncio.create_task(queue.join())
done, not_done = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
if queue.empty():
    # all done, clean up
    for task in not_done:
        task.cancel()
else:
    # a worker must have errored out, raise the exception
    for task in done:
        task.result()

That should at least tell if if and how a worker is going wrong :slight_smile:

Yeah, that makes a lot of sense. I think I’ve got spoiled by having higher-level libraries for threading, I’ve forgotten how to do this sort of orchestration properly for worker threads!

Thanks for the help, it’s much appreciated. I’ll give those suggestions a try and see if I can get a bit further.

One thing I played with, but then ended up discarding because my concurrency level was lower…

I think you can create all of those hundreds of thousands of tasks, they don’t take much memory themselves. As I understand the way asyncio works, none of them will actually be scheduled to do any work until await (or some equivalent) is called on them. So you could put the entire lists of tasks into some sort of leaky bucket, pulling out the maximum number that you want to allow to run at once, and then pulling more out as some of them complete. You’ll keep adding new tasks to the list that you pass to asyncio.wait, and when a task appears in that list for the first time it will start to get CPU time to run.

This is all predicated on the use of cooperative scheduling; if you are putting the tasks into executor threads, then they’ll start running immediately.

asyncio.create_task schedules execution, so you’d need to only call that after you pull a task (or, description of a task) out of the bucket.

But, to limit the number of tasks running at once, you could create a Semaphore and acquire it for each task. Then you can start them all at once and use gather.

I’m using threading and queues with concurrent.futures, so I was making parallels with it and asyncio. Zachary proposed something similar (likely better) to what I was thinking (my current implementation is inefficient and joins on workers with a timeout in an infinite loop, rather than using concurrent.futures.wait)

So something like this?

import asyncio
from random import random

async def task(sem, i):
    async with sem:
        await asyncio.sleep(random()/3)
        if random() < 0.001:
            raise RuntimeError(f"Task {i} aborted!")
        print(f"Task {i} completed")

async def main():
    N = 1_000_000
    sem = asyncio.Semaphore(100)
    print(f"Creating {N} tasks")
    tasks = [task(sem, i) for i in range(N)]
    print("Created")
    await asyncio.gather(*tasks)



if __name__ == "__main__":
    asyncio.run(main())

(I’ll note that in writing this, I missed the await for asyncio.sleep() and the async in async with sem. The errors weren’t particularly obvious, especially the sleep one… I think I’ve seen it mentioned that this is a difficult problem to solve, but I can confirm it’s frustrating when trying to write async code as a newcomer…)

That code seems to work great for N=1000 or even 10,000. At a million, though, there was a pretty long pause when one of the tasks aborted - long enough that I thought it had hung. At first I thought it was because gather says “Other awaitables in the aws sequence won’t be cancelled and will continue to run” when a task raises an exception. But that doesn’t seem to be the case, as increasing the wait in each task didn’t make any difference. Which sort of makes sense - there’s nothing that will await the remaining tasks once the gather propagates the first exception. (In a “real” implementation, I might want to be more robust - aborting everything because one task fails is a bit aggressive, but let’s get the basic stuff sorted first :slightly_smiling_face:)

So I’m not at all sure what’s going on here. Maybe there’s a major overhead to tearing down millions of tasks? Maybe there’s something Windows-specific going on? (The original traceback I quoted referenced GetQueuedCompletionStatus which is a Windows API - my test here isn’t using the network, but my original code was doing HTTP requests, I don’t know whether that matters).

As I said above, my main problem here is that I don’t know how to debug this. I may be doing something wrong, but I don’t know what and I don’t really know how to think about the logic to know where to look for bugs in my code… And I’m still not 100% sure that it wouldn’t just be better to have fewer tasks - I know asyncio is designed to work with a lot more tasks than threading can cope with, but am I going too far with millions?

OK, I did some more debugging. It looks like it’s memory, resulting in millions of tasks being non-viable (with 2 million tasks, my process was hitting 4GB of RAM).

I instrumented the code to test increasing numbers of tasks:

import asyncio
from random import random
from datetime import datetime

STARTED = False
def start_timestamp():
    global STARTED
    if not STARTED:
        print(f"{datetime.now()}: First task started")
        STARTED = True

async def task(sem, i):
    start_timestamp()
    async with sem:
        await asyncio.sleep(random()/3)
        raise RuntimeError(f"{datetime.now()}: Task {i} aborted!")

async def main():
    #N = 2_000_000
    global STARTED
    sem = asyncio.Semaphore(100)
    for N in (1_000, 10_000, 100_000, 1_000_000, 2_000_000):
        STARTED = False
        print(f"{datetime.now()}: Creating {N} tasks")
        tasks = [task(sem, i) for i in range(N)]
        print(f"{datetime.now()}: Completed creation")
        try:
            await asyncio.gather(*tasks)
        except Exception:
            print(f"{datetime.now()}: Await raised an exception")
        else:
            print(f"{datetime.now()}: Await returned")
        print()


if __name__ == "__main__":
    asyncio.run(main())

The results I got were:

2021-05-05 10:35:48.141158: Creating 1000 tasks
2021-05-05 10:35:48.156758: Completed creation
2021-05-05 10:35:48.170563: First task started
2021-05-05 10:35:48.190632: Await raised an exception

2021-05-05 10:35:48.191567: Creating 10000 tasks
2021-05-05 10:35:48.219568: Completed creation
2021-05-05 10:35:48.320292: First task started
2021-05-05 10:35:49.829851: Await raised an exception

2021-05-05 10:35:49.829851: Creating 100000 tasks
2021-05-05 10:35:50.130283: Completed creation
2021-05-05 10:35:51.217503: First task started
2021-05-05 10:36:08.743601: Await raised an exception

2021-05-05 10:36:08.743601: Creating 1000000 tasks
2021-05-05 10:36:11.144130: Completed creation
2021-05-05 10:36:23.125137: First task started
2021-05-05 10:39:21.774252: Await raised an exception

2021-05-05 10:39:21.774252: Creating 2000000 tasks
2021-05-05 10:39:31.080623: Completed creation
2021-05-05 10:40:40.308741: First task started

The 2M case was still running at 10:56, when I cancelled it. While the last case was running, I checked memory and the Python process was using about 4G of RAM, enough to have a noticeable effect on my PC. I guess that’s only 2K of RAM per task. I don’t know if that counts as unreasonable, or simply a case of “don’t do that, then”…

The huge step between 1M and 2M tasks seems odd, though. I didn’t check RAM on the smaller tasks, so I don’t know if the RAM usage went up linearly with number of tasks. It’s possible 2M tipped the program over to swapping, but I didn’t spot that if so.

For now, I think I’d take the view that creating millions of tasks is a bad idea, and I should be using a shared worker style of approach for jobs at this sort of scale…

2 Likes

Which I guess begs the question, is anyone aware of a library like concurrent.futures (the worker pool and executor abstractions) for asyncio? I’ve not been able to find anything on a quick search, so does that mean I’m facing writing one of my own? At this scale, robustness and reliability is fairly important, which is why I’d rather use an established library if possible - you’ve seen how my attempts have gone so far :slightly_frowning_face:.

I think you can simply use concurrent.futures itself together with a blocking HTTP library such as requests. Given that you’re not aiming for very large concurrency, it should be fine.

I can, yes, but I’m also trying to learn more about asyncio. At the moment it feels like what I’m learning is “it’s harder than threads and you probably don’t need it” :frowning:

But you’re right, I can manage with threads (I have a working version with threads, it was much easier to write). But that prompts the question, how much concurrency should I be looking at before asyncio is the better option? 200 threads? 2000? To be perfectly honest, I don’t even know how many workers is ideal for my workload (downloading many small files) other than by experimenting.

You want as many workers as needed to cover the latencies involved in downloading so as to always exploit your connection’s bandwidth. Say the latency of issueing a request is equal to the download time for a file, you want ~2 workers. If the latency of issueing a request is twice the actual download time, you want ~3 workers. etc.

The general formula would be something like 1 + latency / download time, or (taking into account your max bandwidth and the file size) 1 + latency * bandwidth / average file size. You probably want to spawn a bit more still (perhaps 50% or 100% more), because not all accesses will have the same latency, and not all files have the same size.

As for at which point asyncio becomes preferrable to threads for IO, I have no precise idea, but 100 threads is probably a reasonable order of magnitude.

IMO: Using the system asynchronous HTTP request to maximize the request a system/network hardware could handle may be better of than try spin up a bunch of threads that try to fight for a network resources.

If you can clarify the specifics of how that would work (on Windows, in a Python program where I want to post-process the downloaded data and explicitly not store most of it on disk) then I’d be interested. But I’m not sure how to use a general statement like you just made, I’m afraid.

For example - this is one of the approach:

In the pass these was called async IO - but it become very ambiguous these day. So if you search on Google use callback like HTTP request with callback may yield little better result. These type of method allow the hardware work as fast as it own pace without the system push a stress on it via unnecessary threading (especially million of them, which also overhaul the system it self).

Discard the scaling issue, I like the approach you have when it come to quick and easy.

In this thread I’m asking how to use asyncio (it’s the Async-SIG thread, after all). Gevent is a different concurrency option. C# and C++ are different languages. I think you’re going way too off-topic here.

3 Likes

OK, so maybe something simpler.

Create a class to encapsulate one ‘download request’. Instantiate as many of those as you need, and shove them into an asyncio.Queue. Create some number of worker coroutine tasks (10, 20, you choose), each of which will wait on the queue for a work item, process it, and continue doing that until the queue is empty (or the coroutine gets a cancellation exception).

You can then gather() on the list of worker coroutines, or wait() if you want to catch any exception that might occur and cancel the remaining work.

I just try to help us avoid going deep into solving the problem created by the approach we chose ( the screaming architecture). This is a interesting and educational stuff otherwise.