I’m trying to write a program to grab multiple files over http. I’m writing it using asyncio (with httpx as the HTTP library) in the hope of optimising my network throughput, as well as being a chance to learn more about asyncio.
Because I am fetching a lot of files (hundreds of thousands) it’s not practical to use something like asyncio.gather to fire off all of the requests at once (at least I assume it isn’t - if nothing else, I don’t have the memory to have all the responses in RAM at the same time). So I’m using a queue and workers, following a pattern I found on Stack Overflow:
async def worker(client, queue):
while True:
url, file = await queue.get()
tries = 0
r = await client.get(url)
data = r.content
if data:
file.write_text(json.dumps([url, data.decode()]))
queue.task_done()
async def main():
TODO = set(URLs) - set(ALREADY_READ)
queue = asyncio.Queue()
for n, url in enumerate(TODO):
queue.put_nowait((url, LOC / f"{n}.json"))
async with httpx.AsyncClient() as client:
tasks = []
for i in range(20):
task = asyncio.create_task(worker(client, queue))
tasks.append(task)
await queue.join()
for task in tasks:
task.cancel()
This works pretty well, in that it gets files at a very good rate, and my network traffic seems consistently high, so I’m using my available bandwidth effectively. By only having 20 workers, I’m assuming I’ll keep the load on the server reasonable.
The problem is that this runs fine for the first few hundred, maybe a thousand, files, but it then locks up. It’s not timing out (I added a check for a timeout exception, just in case, but that made no difference), it just stops processing.
If I hit Ctrl-C, I get the following error:
File "...\Python39\lib\asyncio\windows_events.py", line 316, in run_forever
super().run_forever()
File "...\Python39\lib\asyncio\base_events.py", line 596, in run_forever
self._run_once()
File "...\Python39\lib\asyncio\base_events.py", line 1854, in _run_once
event_list = self._selector.select(timeout)
File "...\Python39\lib\asyncio\windows_events.py", line 434, in select
self._poll(timeout)
File "...\Python39\lib\asyncio\windows_events.py", line 783, in _poll
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
That doesn’t really help me understand where it’s stuck, unfortunately.
I added some prints, and there’s a few things I noted:
- The last thing printed is just before the
await client.get
call. So it does seem to hang while getting a response. But why aren’t the other workers still running? - The output is very regular: Send the request, get the response, get the content, over and over. No overlapping at all - surely I’d expect to occasionally have maybe a few “Send the request” lines before getting a response?
I’m worried I’m not actually getting concurrent processing here, but I can’t see why that would be the case.
Also, if a get
is hanging, how do I deal with that? I thought httpx
timed requests out automatically (the docs say it does) so why am I pausing indefinitely? Could it be something is still synchronous (like DNS?) and if so is there any way around it? It feels like the only option might be threads, because co-operative multi tasking isn’t going to work too well if the tasks won’t co-operate
While somebody pointing out what I’ve done wrong would be much appreciated, I’m also interested in suggestions about how to debug things like this.
- What’s the best way of finding out exactly where my code is hanging?
- How do I confirm things really are happening in parallel? It feels awfully easy to think I’ve written a program that does 20 requests in parallel, and later find it was entirely serial…
- Are there common patterns I should be using for things like worker threads, cancelling tasks after a timeout, rate-limiting http requests? I’m currently making stuff up myself or using StackOverflow, but I’d rather not reinvent too many wheels