It is the second time I use this, and I would like to have feedback on it by people more knowledgeable:
async def pool_for_each_par_map(loop, pool, f, p, iterator):
zx = stream.iterate(iterator)
zx = zx | pipe.map(lambda x: loop.run_in_executor(pool, p, x))
async with zx.stream() as streamer:
limit = pool._max_workers
unfinished = []
while True:
tasks = []
for i in range(limit):
try:
task = await streamer.__anext__()
except StopAsyncIteration:
limit = 0
else:
tasks.append(task)
tasks = tasks + list(unfinished)
assert len(tasks) <= pool._max_workers
if not tasks:
break
finished, unfinished = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for finish in finished:
out = finish.result()
f(out)
limit = len(finished)
It is used in my sudopython
project.
And at in mutation
Mind the fact that the last line of the function is not the same. It seems to me the first version is the good one.
It seems it does what it is supposed to do: lazily submit job to multiple process or threads. It is somewhat like pool.map_unordered
, but I really do not want to build a list because it would be way too big, also the iterable can be async thanks to aiostream.