Asynchronous generator to io.BufferedReader

I’m looking to use Starlette’s request.stream from an incoming request and feed it straight into a tarfile.open similar to tarfile.open(request.stream(), mode="r|*"). But this doesn’t work as request.stream() is an async generator and tarfile.open is expecting a file-like object (e.g. io.BufferedReader).

Any suggestions on how one goes about converting an async generator to a BufferedReader?

For anyone reading after-the-fact, a solution I found was to

Not exactly elegant, but it gets the job done.

I don’t quite understand why you are using a separate queue and thread – it seems kind of a pity to (have to?) add those extra layers. Would you still need that if all processing is done in async functions?

As mentioned above, tarfile won’t work with an async generator and Starlette’s request.stream() is an async generator.

If you have another approach that would work and not necessitate the aforementioned convoluted approach, I’m all ears!

Do you actually need the streaming functionality? I.e., would it work to just read the entire generator, concatenate the blocks into a single bytes object and wrap that in BytesIO? Otherwise, I’m not surprised it’s tricky, because tarfile was written long before anyone even conceived of async in Python.

I do. The tarfiles I’m handling are in the order or 20GiB.

I don’t have an immediate alternative – and understand that you need some bridge between the async generator and a file-like stream in order to use tarfile.open as-is – but what if you use tarfile.open itself (with that bridge) inside an async function, couldn’t you then get rid of the extra queue plus reader thread? Or do you need to use tarfile.open itself inside a regular, non async function? The extra queuing + thread (I assume) counteract any efficiency gains that you might get from async starlette streams. If so, then I would also have opted for a non-streaming solution or a synchronous stream (as supported by requests for instance). Or – as Karl suggested – reading out the whole stream and wrapping it up in a BytesIO buffer.

One completely different approach might be to make an async version of the _Stream class (https://github.com/python/cpython/blob/main/Lib/tarfile.py#L330) and then also add an async version of the open function (tarfile.async_open or so added to a subclass of Tarfile). Might be a lot more work or not depending on how often the internal stream interface is used in non-async functions which would all need to become async… (Plus you already have a working solution now :slight_smile: )

It turns out that aiofiles (async read/write of local files) doesn’t support tarfiles, but I wonder if it could be used to provide a bridge that is purely in memory (meaning not requiring an extra queue + consumer thread)…
Or … perhaps Streams — AnyIO 4.1.0.post3 documentation could be used instead of the queue (since starlette is already using anyio). Python queues, afaik, are pretty slow because of their safety guarantees (locking).

what if you use tarfile.open itself (with that bridge) inside an async function

The separate thread helps me cross the async/sync boundary. If I keep it all in the same thread, I don’t see how I can get the async generator bridged into either a synchronous generator or an io.BufferedReader (the problem in the initial post).

Or do you need to use tarfile.open itself inside a regular, non async function?

My scenario is not unlike Requests - Starlette, wherein we have a Starlette route (async) which provides a request.stream() (async).

reading out the whole stream and wrapping it up in a BytesIO buffer.

Unfortunately not an option given the size of the tarfiles I’m handling, which is the entire impetus behind finding reasonable streaming solutions.

Writing the appropriate layers for tarfile to be able to handle async is something I’ve considered. It’s just a larger undertaking than I would have expected given the problem. The performance I’m getting at the moment is fine, but I agree this would likely be the best course of action.

2 Likes

I think this architecture is sound, but you’ll likely want to make the bridge queue bounded, to make use of backpressure and not exhaust your memory. But the default queue doesn’t have an async blocking put.

I’ve heard of the janus library, which should handle this case. Barring that, you might want to run the put itself in a thread too.