Lazy collection of iterables for concurrent.futures.Executor.map

Currently concurrent.futures.Executor has a map function which consumer iterables immediately rather than lazily (doc ref: concurrent.futures — Launching parallel tasks — Python 3.11.0 documentation). Is this necessary and can we consume it somewhat lazily?

The benefits are the same as using map(func, iterable) instead of map(func, list(iterable)). Not only does it consume less memory, it also makes items in iterable available for consumption as soon as they are created. Especially when chaining generators, because Executor.map function consumes iterables immediately, it creates an artificial barrier that renders rest of the generators moot. For e.g. when trying to process a large file line by line but by parallelizing over multiple cores, the map function forces the entire file to be read before processing the first line.

Here is a minimal reproducible example:

from concurrent.futures import ThreadPoolExecutor as PoolExecutor
import time

start = time.monotonic()

def log(stage, i):
    print("{stage} {i} after {s} seconds".format(stage=stage, i=i, s=time.monotonic()-start))

def slow_producer(i):
    time.sleep(2)
    return i

def slow_processor(i):
    time.sleep(2)
    log("Processed", i)
    return i

def producer_pool():
    with PoolExecutor(max_workers=2) as executor_produce:
        for i in executor_produce.map(slow_producer, range(12)):
            log("Produced", i)
            yield i

with PoolExecutor(max_workers=2) as executor_func:
    for i in executor_func.map(slow_processor, producer_pool()):
        log("Collected", i)
Produced 0 after 2.0063121980056167 seconds
Produced 1 after 2.007746382849291 seconds
Produced 2 after 4.008534851949662 seconds
Produced 3 after 4.008850650861859 seconds
Processed 0 after 4.011005098000169 seconds
Processed 1 after 4.011235842946917 seconds
...
Produced 10 after 12.017429335974157 seconds
Produced 11 after 12.017529984004796 seconds
Collected 0 after 12.019060699036345 seconds
Collected 1 after 12.01907985494472 seconds
Collected 2 after 12.019086507847533 seconds
Collected 3 after 12.019091666908935 seconds
Collected 4 after 12.019096395932138 seconds
Collected 5 after 12.019107556901872 seconds
Collected 6 after 12.019122063880786 seconds
Collected 7 after 12.019136790884659 seconds
Processed 8 after 12.019883190980181 seconds
Collected 8 after 12.020117720821872 seconds
Processed 9 after 12.02012226684019 seconds
Collected 9 after 12.020301904994994 seconds
Processed 10 after 14.022098492830992 seconds
Processed 11 after 14.022333290893584 seconds
Collected 10 after 14.022667717887089 seconds
Collected 11 after 14.022843645885587 seconds

On the positive side, we are indeed processing two elements at a time, as expected. But the processed elements are collected after 12 seconds, despite the earliest items being processed within 4 seconds. Which means the next generators cannot consume them until 12 seconds, and all of uncollected elements occupy memory.

The immediate consumption of iterable happens in the the map implementation of the base class line

fs = [self.submit(fn, *args) for args in zip(*iterables)]

This map implementation is used in both ThreadPoolExecutor and ProcessPoolExecutor as well.

Attempted fix 1: Change from list comprehension to generator

Simply changing it from list to a generator actually makes it worse. Here’s an implementation

from concurrent.futures import ThreadPoolExecutor, _base, ProcessPoolExecutor
import time
def _result_or_cancel(fut, timeout=None):
    try:
        try:
            return fut.result(timeout)
        finally:
            fut.cancel()
    finally:
        # Break a reference cycle with the exception in self._exception
        del fut

class LazyExecutor(_base.Executor):
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()
        fs = (self.submit(fn, *args) for args in zip(*iterables))

        def result_iterator():
            try:
                for f in fs:
                    if timeout is None:
                        yield _result_or_cancel(f)
                    else:
                        yield _result_or_cancel(f, end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

ThreadPoolExecutor.__bases__ = (LazyExecutor, )

produces the following output

Produced 0 after 2.0023396608885378 seconds
Processed 0 after 4.004750154912472 seconds
Collected 0 after 4.00489939795807 seconds
Produced 1 after 6.0071516879834235 seconds
Processed 1 after 8.009405701886863 seconds
Collected 1 after 8.009547287831083 seconds
Produced 2 after 10.011826009955257 seconds
...

In the output, we can see that the first element is indeed collected in 4 seconds, but the second element only starts being produced after the first element is collected. We lose all the benefits of using the ThreadPool. This attempt shows why eager evaluation was needed in the first place – so that there is enough input to keep multiple workers engaged. One way to fix this would be to eagerly evaluate a few items and keeping them in a fifo queue (buffer). But that needs us to make decisions about the buffer size.

Below I propose a different way that avoids making that decision in the base class. Here I issue non-blocking calls to consume the iterable while also yielding the results.

Attempted fix 2: Consume the iterable in a separate thread

Here I submit the items from the iterable in a separate thread. The resulting futures are added to a thread-safe fifo queue. This queue is read in the main thread. This way the input iterator is consumed in the background and we only block on the first future in the queue. And I use a PoisonPill to signal the end of input iterable.

# For brevity, eliding the imports and _result_or_cancel above
import queue
import threading

class PoisonPill:
    pass

class FixedExecutor(_base.Executor):
    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()

        poison_pill = PoisonPill()
        fs = queue.Queue()
        def submit():
            for args in zip(*iterables):
                fs.put(self.submit(fn, *args))
            fs.put(poison_pill)

        submission_thread = threading.Thread(target=submit)

        def result_iterator():
            future = None
            try:
                while True:
                    future = fs.get()
                    if future == poison_pill: return
                    if timeout is None:
                        res = _result_or_cancel(future)
                        yield res
                    else:
                        res = _result_or_cancel(future, end_time - time.monotonic())
                        yield res
            finally:
                if future and future == poison_pill: return
                while True:
                    future = fs.get()
                    if future == poison_pill: return
                    future.cancel()

        submission_thread.start()
        yield from result_iterator() 

This leads to the following output

Produced 0 after 2.002713941037655 seconds
Produced 1 after 2.0030609681271017 seconds
Produced 2 after 4.004857782041654 seconds
Produced 3 after 4.005219548009336 seconds
Processed 0 after 4.005280217155814 seconds
Processed 1 after 4.005323752062395 seconds
Collected 0 after 4.0054829090368 seconds
Collected 1 after 4.005531860981137 seconds
Produced 4 after 6.006993685150519 seconds
Produced 5 after 6.0071618689689785 seconds
Processed 2 after 6.0074516059830785 seconds
Processed 3 after 6.007553400006145 seconds
Collected 2 after 6.007599716074765 seconds
Collected 3 after 6.0077077080495656 seconds
Produced 6 after 8.009138146182522 seconds
Produced 7 after 8.00928759505041 seconds
Processed 4 after 8.009618137963116 seconds
...
Processed 10 after 14.016063918126747 seconds
Processed 11 after 14.016122061060742 seconds
Collected 10 after 14.01627196604386 seconds
Collected 11 after 14.016358396969736 seconds

We can see here that items 0 and 1 are collected in 4 seconds and the throughput is maintained as well (item 11 collected at 14 seconds).

On the face of it, this seems like a strictly better approach. Are there any downsides to it? And if not, is this a viable feature addition for concurrent.futures?

5 Likes