It is the second time I use this, and I would like to have feedback on it by people more knowledgeable:
async def pool_for_each_par_map(loop, pool, f, p, iterator): zx = stream.iterate(iterator) zx = zx | pipe.map(lambda x: loop.run_in_executor(pool, p, x)) async with zx.stream() as streamer: limit = pool._max_workers unfinished =  while True: tasks =  for i in range(limit): try: task = await streamer.__anext__() except StopAsyncIteration: limit = 0 else: tasks.append(task) tasks = tasks + list(unfinished) assert len(tasks) <= pool._max_workers if not tasks: break finished, unfinished = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED ) for finish in finished: out = finish.result() f(out) limit = len(finished)
It is used in my
And at in mutation
Mind the fact that the last line of the function is not the same. It seems to me the first version is the good one.
It seems it does what it is supposed to do: lazily submit job to multiple process or threads. It is somewhat like
pool.map_unordered, but I really do not want to build a list because it would be way too big, also the iterable can be async thanks to aiostream.