How do I avoid nested awaits during async list-download?

How do I write async file list/download? I can’t think of a way to get around the double-await (ie awaiting on a list of awaitables).

Here’s a synchronous version of my logic:

def process_prefixes(prefixes: t.Iterable[str]) -> t.List[int]:
    results = []
    for prefix in prefixes:
        paths = list_paths(prefix)
        for path in paths:
            data = download(path)
            result = process_data(data)
            results.append(result)
    return results

Here’s my attempt at using async:

async def process_path(path: str) -> int:
    loop = asyncio.get_running_loop()
    data = await loop.run_in_executor(None, download, path)
    return process_data(data)


async def process_paths(
    paths: t.Awaitable[t.List[str]]
) -> t.List[t.Awaitable[int]]:
    results = []
    for path in await paths:
        result = process_path(path)
        results.append(result)
    return results


async def process_prefixes(prefixes: t.Iterable[str]) -> t.List[int]:
    loop = asyncio.get_running_loop()
    futures = []
    for prefix in prefixes:
        paths = loop.run_in_executor(None, list_paths, prefix)
        prefix_results = process_paths(paths)
        futures.append(prefix_results)
    results = []
    for prefix_results in futures:
        for result in await prefix_results:
            results.append(await result)
    return results
Full example (click to expand)
import sys
import time
import random
import asyncio
import typing as t


def _random_name():
    codes = [random.randint(97, 122) for _ in range(random.randint(3, 9))]
    return bytearray(codes).decode()


def list_paths(prefix: str) -> t.List[str]:
    time.sleep(random.gauss(1.0, 0.1))
    paths = []
    for _ in range(random.randint(0, 2)):
        name = _random_name()
        paths.append(f"{prefix}/{name}")
    return paths


def download(path: str) -> str:
    time.sleep(random.gauss(1.2, 0.1))
    return path * random.randint(1, 10)


def process_data(text: str) -> int:
    time.sleep(random.gauss(0.1, 0.01))
    return len(text)


async def process_path(path: str) -> int:
    loop = asyncio.get_running_loop()
    data = await loop.run_in_executor(None, download, path)
    return process_data(data)


async def process_paths(paths: t.Awaitable[t.List[str]]) -> t.List[t.Awaitable[int]]:
    results = []
    for path in await paths:
        print("  Path:", path, file=sys.stderr)
        result = process_path(path)
        results.append(result)
    return results


async def process_prefixes(prefixes: t.Iterable[str]) -> t.List[int]:
    loop = asyncio.get_running_loop()
    futures = []
    for prefix in prefixes:
        print("Prefix:", prefix, file=sys.stderr)
        paths = loop.run_in_executor(None, list_paths, prefix)
        prefix_results = process_paths(paths)
        futures.append(prefix_results)
    results = []
    for prefix_results in futures:
        for result in await prefix_results:
            results.append(await result)
    return results


def process_prefixes_sync(prefixes: t.Iterable[str]) -> t.List[int]:
    results = []
    for prefix in prefixes:
        print("Prefix:", prefix, file=sys.stderr)
        paths = list_paths(prefix)
        for path in paths:
            print("  Path:", path, file=sys.stderr)
            data = download(path)
            result = process_data(data)
            results.append(result)
    return results


def main():
    prefixes = [_random_name() for _ in range(10)]
    start = time.monotonic()
    results = asyncio.run(process_prefixes(prefixes))
    # results = process_prefixes_sync(prefixes)
    end = time.monotonic()
    print(sum(results) / len(results))
    print(f"Took {end - start:.3f} s")


if __name__ == "__main__":
    main()
1 Like

There’s nothing wrong with nesting async operations like that. It’s rather hard to follow your code example though, since it has mismatched function names (there is no list_paths function, for example).

You may also want to look into using async for, so that your generator operations can themselves be asynchronous.

Finally, you can call wait() on an iterable of awaitables, and each time it returns it will give you two lists: those that were completed, and those that are still pending. With your current code construction, you’re going to wait for the first future to complete, and not process any of the others until it has been completed… so your code is partially blocking.

1 Like

My full example (which is hidden in a collapsible) has all the functions needed to have a running program.

There may be nothing wrong, but I would like to have a single asynchronous generator which yields results. Maybe I simply have to make process_prefixes one of these. I just have the feeling that there’s a more elegant solution.

I guess I’m working under the assumption that the called coroutines run immediately until they block (on IO), like how I use threads, not that they run when awaited on.

Since you are using run_in_executor, you may be getting that behavior; normally coroutines do not operate that way, they run when the loop has time to run them. While you are waiting on coroutine A, coroutine B will run if it can, but if you aren’t waiting for its result, then when it finishes its result will just sit there, and your generator can’t return it because it is waiting for A. Definitely checkout asyncio.wait() as the means to get the next result, from whichever coroutine provides it, instead of waiting on each one in turn. Using that method, your generator could indeed return results as they become available.

1 Like

Indeed it is, sorry I had not noticed that.