Wrapping async functions for use in sync code

I have a frustrating problem. I’m writing an application that downloads files, unzips them and extracts a single file from the archive. I want to use asyncio to manage the downloading. See below for the background on why I’m using asyncio, but for now let’s take asyncio as given.

Because I’m ignoring 99% of the downloaded zipfile, I’d rather not download what I don’t need. Pip includes a rather nice “lazy zipfile” implementation that handles this using a filehandle that’s backed by a temporary file where chunks of data are downloaded on demand using range requests.

I’d like to use that if I could. It’s easy to implement the downloads as async code, but the problem is that the downloads are triggered by calls to the file object’s read() method, which is not async. And I don’t really have control over that - the stdlib zipfile module isn’t async, so it expects to make sync calls.

So the control flow is:

My code (async) → zipfile.read(name) (sync) → download (async)

What I’d like to be able to do is something like:

async def getfile(url):
    with LazyZipfile(url) as z:
        await somehow_tell_sync_code_its_being_awaited(z.read(name))

class LazyHandle:
    def __init__(self, url):
        self.url = url
        self.fh = make_tempfile()
    def read(self, n):
        start, end = self.calculate_range(n)
        somehow_await_in_sync_code(self.download(start, end))
        return self.fh.read(n)
    async def download(self, start, end):
        # Make a range request to self.url

class LazyZipfile:
    def __init__(self, url):
        self.lazyhandle = LazyHandle(url)
        self.prepare() # Problem - see below, this really wants to be async too
        return ZipFile(self.lazyhandle)

But of course this doesn’t work as read() isn’t async.

Short of rewriting zipfile to add await before all read calls, and cascading that upwards (anything with an await now needs an async def and itself needs to be called with await) is there anything I can do to make this work? I fear the answer is “no”, and I’m going to have to abandon asyncio for this project :slightly_frowning_face:

I could defer the sync code to a thread somehow, but I’m actively trying to avoid threads here, because if I’m going to use them, I might as well just do the whole thing with threading. Also, it’s not at all obvious to me how I’d defer just the middle bit of that async → sync → async “sandwich” to a thread.

The only other thing I can think of is to wrap read in asyncio.run_until_complete, but that defeats the object, as then (I assume) all other downloads will be blocked until the inner event loop completes.

Going back to why I’m using asyncio, there are fundamentally two reasons. First, to work out when asyncio is a good fit for a problem. My feeling is that this experience says “as long as you’re willing to rewrite any part of the stack that is currently sync, but might need to call back to async code”, which is a rather big blocker for anything that might want to treat files and web requests similarly. My second reason is that interrupt handling is so much cleaner in async code - handling Ctrl-C in threaded code is an utter pain in my experience, and not being able to stop massive download jobs is hardly ideal. If it wasn’t for the inability to cancel threads when I get a Ctrl-C, I’d probably have just used threads for this project and never looked at asyncio (apart from the “I wonder what all the fuss is about” learning exercise).

I don’t think any of this is particularly new - I’ve read lots of articles that talk about “async all the way down” - but I think I’d hoped it would be less of a problem in practice, and I’m finding that might not be the case, unfortunately.

As something of a side issue, I also discovered that __init__ can’t be declared as async. Given that my prepare method above does some downloading, what’s the correct approach to writing classes that want to call async code during initialisation? I really don’t like the pattern of creating incomplete classes that need a finish_init call before they are usable. All I can think of is to use a factory function like the following, but that just hides the problem rather than solving it…

class Example:
    def __init__(self):
        # do sync init
    async def async_init(self):
        # do async init

async def make_example():
    e = Example()
    await e.async_init()

# Now, always use this:
my_obj = await make_example()
2 Likes

@pf_moore Regarding initializing async class this is what could be a better way:

import asyncio


class Example:
    def __init__(self, bar) -> None:
        self.foo = bar

    @classmethod
    async def create(cls, bar) -> "Example":
        self = cls(bar)
        return self

    def __repr__(self) -> str:
        return f"{self.__class__.__name__} bar={self.foo}"


async def main():
    example = await Example.create("baz")
    print(example)


asyncio.run(main())

1 Like

I think you’ve hit it on the head: when you employ async techniques, they have to start at the bottom and go upwards. If they don’t go all the way to the top, that’s fine, but once you’ve switched to sync you can’t switch back without involving threads to simulate the async behavior.

Also, the idea of having the dunder methods (not just __init__) be somehow aware of the caller’s indication that they should be invoked asynchronously is quite interesting. When using HTTPX, various operations have both sync and async flavors with different names, and it would nice if that didn’t need to be the case.

How does this strike you?

async def getfile(url):
    with await LazyZipfile.new(url) as z:
        await z.read(name)

class LazyHandle:
    def __init__(self, url):
        self.url = url
        self.fh = make_tempfile()
    async def read(self, n):
        start, end = self.calculate_range(n)
        await self.download(start, end)
        if disk_is_fast_enough_for_you:
            return self.fh.read(n)
        elif python_39:
            return await asyncio.to_thread(self.fh.read, n)
        else:
            return await asyncio.get_event_loop.run_in_executor(
                None,
                self.fh.read, n
                )
    async def download(self, start, end):
        # Make a range request to self.url


class LazyZipfile:

    @classmethod
    async def new(cls, url):
        self = cls()
        self.lazyhandle = LazyHandle(url)
        await self.prepare()
        # Not sure what you wanted here, but I think the above is
        # enough to transfer the idea :)
        #return ZipFile(self.lazyhandle)

I don’t think I was sufficiently clear. The problem is that I want to pass LazyHandle to the stdlib ZipFile class - which isn’t written to deal with a read() method that’s async.

Your rewrite of LazyZipfile omits the call to ZipFile, which is the bit I’m having a problem with…

You could try this:

1 Like

Thanks. It feels like sort of defeats the point, though, in much the same way that using defer_to_thread does. And given that it describes itself as “in some ways an awful hack”, it’s not something I’d want to rely on.

I don’t think it alters my feeling that the “async all the way down” requirement is a definite problem for some types of application, and an ongoing risk to consider when you make the decision to use asyncio. For example, I’ve been wondering whether using asyncio in pip would be worthwhile, as download times are a definite bottleneck. But if it means we’re blocked from using the fast-deps code that uses the LazyZip technique I described above, then it’s much less attractive. And worse still, it would have been easy to dive into async without realising that LazyZip was going to be an issue, and waste a lot of time working on the change before we found the problem.

Meh. I’m a bit sad, because asyncio seems so attractive in many ways. But every time I try to really use it, I hit difficulties. I guess it just proves the point that you need to be careful to make sure that the tool suits your needs, and not just use it because it’s trendy.

But at this point, I think I’ll just go back to “the devil I know” and get a threaded version of my PyPI metadata downloader working - I think I’ll make more progress that way than by fighting with asyncio any more.

6 Likes

Sync From Async
Remember when I said this was dangerous a few sections ago? I meant it. Because await is just a Python statement that you can pass a coroutine to and get the result back, it means you’re free to ignore it and just call a good, old, synchronous function directly, like this:

def get_chat_id(name):
time.sleep(3)
return “chat-%s” % name

async def main():
result = get_chat_id(“django”)
Do that, and you don’t give the event loop any chance to run - you haven’t paused the current coroutine and given the event loop control using await. That means every other coroutine that might want to run - maybe one has some bytes waiting for it on a socket, or another one was sleeping for a few seconds - don’t even get a chance, and your coroutine just ignores them all and keeps running synchronous code. The event loop doesn’t have some special power inside Python to interrupt you, it needs you to yield control back to it.

Now, there’s a subtle distinction here between blocking and non-blocking calls. It’s not going to ruin your day if you call a non-blocking synchronous function, like this:

def get_chat_id(name):
return “chat-%s” % name

async def main():
result = get_chat_id(“django”)
However, if you call a blocking function, like the Django ORM, the code inside the async function will look identical, but now it’s dangerous code that might block the entire event loop as it’s not awaiting:

def get_chat_id(name):
return Chat.objects.get(name=name).id

async def main():
result = get_chat_id(“django”)
You can see how it’s easy to have a non-blocking function that “accidentally” becomes blocking if a programmer is not super-aware of everything that calls it. This is why I recommend you never call anything synchronous from an async function without doing it safely, or without knowing beforehand it’s a non-blocking standard library function, like os.path.join.

But what is safe? In the sync world, threading is our only built-in option for concurrency, so what we can do is spin up a new thread, get the sync function running inside it, and then have our coroutine pause and give back control to the event loop until its thread is finished and there’s a result.

1 Like

you can use .__new__ but mypy doesn’t allow returning arbitrary objects yet:

from __future__ import annotations

import sys

import anyio

class Example:
    async def __new__(cls, *args: object, **kwargs: object) -> Example:
        v = super().__new__(cls, *args, **kwargs)
        await type(v).__init__(v, *args, **kwargs)
        return v

    async def __init__(self) -> None:
        await anyio.sleep(0)
        self.foo = 0


async def amain() -> int:
    v = await Example()
    print(v)
    return v.foo


def main():
    return anyio.run(amain)

if __name__ == "__main__":
    sys.exit(main())

for the zipfile you could use?

async def getfile(url):
    async with LazyZipfile(url) as z:
        await anyio.to_thread.run_sync(z.read, name)

class LazyHandle:
    def __init__(self, url):
        self.url = url
        self.fh = make_tempfile()
    def read(self, n):
        start, end = self.calculate_range(n)
        anyio.from_thread.run(self.download, start, end).result()
        return self.fh.read(n)
    async def download(self, start, end):
        # Make a range request to self.url

class LazyZipfile:
    def __init__(self, url):
        self.lazyhandle = LazyHandle(url)
    async def __aenter__(self):
        await self.prepare() # Problem - see below, this really wants to be async too
        self._zf = ZipFile(self.lazyhandle)

    def read(self):
        return self._zf.read()

    async def __aexit__(self, *exctype):
        await asyncio.to_thread.run_sync(self._zf.close)

instead of using await with…
Python has an async with context manager which you can use

I don’t know if you’ve solved this problem yet, but instead of giving code, I’m gonna do some explanations instead. Mainly I want to talk about what you asked, how to wrap async code in sync code.
I don’t think it’s advisable to place async code in sync code, cause it defeats the whole purpose of writing async code in the first place.
However, you can place sync code in async code, which is what generally you need.

You mentioned something about the read() method of file-like objects. This is among the hard functions to make async because it’s built from ground up to be synchronous. However, what I can do to go about this challenge is to wrap the method call in an asynchronous generator which I can place in a coroutine and schedule it as a task.
OR:
You can use the trampoline pattern as described by Lukasz Langa in his series on asyncio. That’s how tasks are implemented as well and that’s the best way of interacting with the sync world from async code. You can perhaps create a function which calls read() and the schedule it on the loop as loop.call_soon(func).

The patterns for merging sync code in async code are endless, the thing is you just gotta use the existing tools in the asyncio library to build the patterns, not only relying on coroutines with async/await, you could use futures,queues,tasks etc to implement the patterns.

If you still have some questions, then I’ll be happy to answer some more.

this would still block the event loop for the full duration of the read, you have to use an executor eg via asyncio.to_thread to avoid blocking the loop.

Yeah, true… But I was suggesting that one only reads a few chunks and then calls asyncio.sleep().
Something like:

network.read(CHUNKS)
await asyncio.sleep(0.5)

So that you have some sort of round robin implementation.

Not exactly the case in the discussion, but what about if i want a wrapper that works both with sync and async functions, is this the valid way to go or i will encounter some underlying problems on real code?

This is a very fast code just for the proof of concept and a “tracing” decorator:

import asyncio
from functools import wraps


def trace(func):
    def log(*args):
        """This should be some complex function so i will not duplicate code for sync and async code"""
        print("return:", *args)

    @wraps(func)
    def wrapper(*args, **kwargs):
        print("call:", func.__name__, "args:", args, "kwargs:", kwargs)

        if asyncio.iscoroutinefunction(func):
            task = asyncio.create_task(func(*args, **kwargs))
            task.add_done_callback(lambda task: log(task.result()))
            return task
        else:
            result = func(*args, **kwargs)
            log(result)
            return result

    return wrapper

Your function turns ‘lazy’ coroutine functions into eager task returning functions, I would return an async/sync wrapper based on the coroutine like this:

from functools import wraps


def trace(func):
    def log(*args):
        """This should be some complex function so i will not duplicate code for sync and async code"""
        print("return:", *args)

    if asyncio.iscoroutinetinefunction(func):

        @wraps(func)
        async def awrapper(*args, **kwargs):
            print("call:", func.__name__, "args:", args, "kwargs:", kwargs)
            result = await func(*args, **kwargs)
            log(result)  # consider: await asyncio.to_thread(log, result)
            return result

        return awrapper

    @wraps(func)
    def wrapper(*args, **kwargs):
        print("call:", func.__name__, "args:", args, "kwargs:", kwargs)
        result = func(*args, **kwargs)
        log(result)
        return result

    return wrapper

you may run into issues if you do more than print in your sync log function, so you might want to do await asyncio.to_thread(log, result)

1 Like