Parallel reduction using concurrent.futures

In the past I’ve often worked with the ProcessPoolExecutor from concurrent.futures and found its map utility very convenient for clean parallel execution. When you want to perform a reduction operation on the results (e.g. adding all the results), you’d have to await the map-results and then perform it in the main thread. While this might only be useful for computationally expensive reductions, this type of operation can easily be carried out in parallel.

I’ve written an implementation that worked nicely with my use cases of adding a sequence of numpy arrays coming from the executor’s map (see with more details below). I wanted to suggest adding sth. similar to the concurrent.futures module, possibly for both, ProcessPoolExecutor and ThreadPoolExecutor, but I’d like to hear your feedback first.
My main concern is lack of relevance because the huge overhead that comes with this parallelization only yields a speedup in very specific cases, namely for expensive reductions that on the other hand often come with a lot of memory that has to be distributed to the workers. Just adding integers will most likely always be faster just using standard reduce and even for numpy arrays it’s not always clear if it will yield a speedup.

from concurrent.futures import ProcessPoolExecutor
import itertools

class MyPPE(ProcessPoolExecutor):

    def reduce(self, fn, seq, n=None, sync=False, *args, **kwargs):
        """Implementation of parallel reduction using
        the process pool;
        takes a reduction function and a sequence (either
        generator/iterator or list. If no. of elements
        is known, provide with argument 'n', else the
        iterator will be evaluated once at the beggining 
        to infer the no. of elements.
        The reduction is carried out assuming associativity
        while ensuring the correct order of elements, i.e.,
        retaining commutativity.
        The call of 'next' on the previous step's return generator
        of 'map' for submission of the next task implicitly 
        awaits the evaluation of the previous step. Thus,
        dependencies of computations are resolved before 
        submission. This makes deadlocks impossible while
        ensuring that the computation may proceed before full
        evaluation of the last operation on the previous sequence.
        The queue's FIFO implementation supports this cause, as
        the new sequence will be submitted by 'map' in the
        correct order (i.e., blocking).
        If explicit synchronization between steps is desired,
        simply toggle the 'sync' argument.
        If the number of elements in the sequence is odd,
        the first element is carried over to the next iteration
        and prepended to the result of 'map' for the next iteration.
        Note: Performance-wise using parallel reduce is only
        more efficient than standard reduce as it comes with a
        huge overhead for distributing the computation. 
        Consequently, parallel reduce is only useful for 
        computationally expensive reductions. Mind also that
        these computations often come with a lot of memory 
        transfer that additionally slows down the submission.
        In order to actually find out, you probably have to 
        try it out.
            fn : callable
                reduction function, takes exactly two arguments and
                returns one
            seq : iterable
                iterable to act the reduction on
            n : int (default None)
                no. of elements in seq, will be inferred internally
                if not provided
            sync : bool
                flag to toggle synchronization between reduction steps
            result : dynamic
                result of reduction operatorion, type of elements of seq
        # Computation of gaussian formula, i.e.,
        # summing up all contiguous elements between
        # 1 and n
        import operator
        n = 27
        a = range(1,n+1)
        with MyPPE() as executor:
            res = executor.reduce(operator.add, a, n)
        print("Result of reduction: ", res)
        print("Result of analytic formula: ", n*(n+1)//2)
        # knowledge of n is required 
        # to take care of remainders;
        # inference of n requires evaluation 
        # of iterator
        if n is None:
            seq = list(seq)
            n = len(seq)
        # prepare initial iterator, will return seq if
        # seq already is an iterator/generator
        res = iter(seq)
            # check for remainder
            remainder = [next(res)] if n%2 else []
            # submit to job queue
            mapres =, res, res, *args, **kwargs)

            # append remainder
            res = itertools.chain(remainder, mapres)
            # apply synchronization
            if sync:
                res = iter(list(res))
            # decrease element counter
            n = (n+1)//2
        return next(res)