How do I write async file list/download? I can’t think of a way to get around the double-await (ie awaiting on a list of awaitables).
Here’s a synchronous version of my logic:
def process_prefixes(prefixes: t.Iterable[str]) -> t.List[int]:
results = []
for prefix in prefixes:
paths = list_paths(prefix)
for path in paths:
data = download(path)
result = process_data(data)
results.append(result)
return results
Here’s my attempt at using async:
async def process_path(path: str) -> int:
loop = asyncio.get_running_loop()
data = await loop.run_in_executor(None, download, path)
return process_data(data)
async def process_paths(
paths: t.Awaitable[t.List[str]]
) -> t.List[t.Awaitable[int]]:
results = []
for path in await paths:
result = process_path(path)
results.append(result)
return results
async def process_prefixes(prefixes: t.Iterable[str]) -> t.List[int]:
loop = asyncio.get_running_loop()
futures = []
for prefix in prefixes:
paths = loop.run_in_executor(None, list_paths, prefix)
prefix_results = process_paths(paths)
futures.append(prefix_results)
results = []
for prefix_results in futures:
for result in await prefix_results:
results.append(await result)
return results
Full example (click to expand)
import sys
import time
import random
import asyncio
import typing as t
def _random_name():
codes = [random.randint(97, 122) for _ in range(random.randint(3, 9))]
return bytearray(codes).decode()
def list_paths(prefix: str) -> t.List[str]:
time.sleep(random.gauss(1.0, 0.1))
paths = []
for _ in range(random.randint(0, 2)):
name = _random_name()
paths.append(f"{prefix}/{name}")
return paths
def download(path: str) -> str:
time.sleep(random.gauss(1.2, 0.1))
return path * random.randint(1, 10)
def process_data(text: str) -> int:
time.sleep(random.gauss(0.1, 0.01))
return len(text)
async def process_path(path: str) -> int:
loop = asyncio.get_running_loop()
data = await loop.run_in_executor(None, download, path)
return process_data(data)
async def process_paths(paths: t.Awaitable[t.List[str]]) -> t.List[t.Awaitable[int]]:
results = []
for path in await paths:
print(" Path:", path, file=sys.stderr)
result = process_path(path)
results.append(result)
return results
async def process_prefixes(prefixes: t.Iterable[str]) -> t.List[int]:
loop = asyncio.get_running_loop()
futures = []
for prefix in prefixes:
print("Prefix:", prefix, file=sys.stderr)
paths = loop.run_in_executor(None, list_paths, prefix)
prefix_results = process_paths(paths)
futures.append(prefix_results)
results = []
for prefix_results in futures:
for result in await prefix_results:
results.append(await result)
return results
def process_prefixes_sync(prefixes: t.Iterable[str]) -> t.List[int]:
results = []
for prefix in prefixes:
print("Prefix:", prefix, file=sys.stderr)
paths = list_paths(prefix)
for path in paths:
print(" Path:", path, file=sys.stderr)
data = download(path)
result = process_data(data)
results.append(result)
return results
def main():
prefixes = [_random_name() for _ in range(10)]
start = time.monotonic()
results = asyncio.run(process_prefixes(prefixes))
# results = process_prefixes_sync(prefixes)
end = time.monotonic()
print(sum(results) / len(results))
print(f"Took {end - start:.3f} s")
if __name__ == "__main__":
main()