Request for review: pool_for_each_par_map

It is the second time I use this, and I would like to have feedback on it by people more knowledgeable:

async def pool_for_each_par_map(loop, pool, f, p, iterator):
    zx = stream.iterate(iterator)
    zx = zx | pipe.map(lambda x: loop.run_in_executor(pool, p, x))
    async with zx.stream() as streamer:
        limit = pool._max_workers
        unfinished = []
        while True:
            tasks = []
            for i in range(limit):
                try:
                    task = await streamer.__anext__()
                except StopAsyncIteration:
                    limit = 0
                else:
                    tasks.append(task)
            tasks = tasks + list(unfinished)
            assert len(tasks) <= pool._max_workers
            if not tasks:
                break
            finished, unfinished = await asyncio.wait(
                tasks, return_when=asyncio.FIRST_COMPLETED
            )
            for finish in finished:
                out = finish.result()
                f(out)
            limit = len(finished)

It is used in my sudopython project.

And at in mutation

Mind the fact that the last line of the function is not the same. It seems to me the first version is the good one.

It seems it does what it is supposed to do: lazily submit job to multiple process or threads. It is somewhat like pool.map_unordered, but I really do not want to build a list because it would be way too big, also the iterable can be async thanks to aiostream.

Maybe you could ask in the Users category?

1 Like

I do not have enough creds. About the Async-SIG category

1 Like

These might be of interest: