In the past I’ve often worked with the ProcessPoolExecutor from concurrent.futures and found its
map utility very convenient for clean parallel execution. When you want to perform a reduction operation on the results (e.g. adding all the results), you’d have to await the map-results and then perform it in the main thread. While this might only be useful for computationally expensive reductions, this type of operation can easily be carried out in parallel.
I’ve written an implementation that worked nicely with my use cases of adding a sequence of numpy arrays coming from the executor’s
map (see with more details below). I wanted to suggest adding sth. similar to the concurrent.futures module, possibly for both, ProcessPoolExecutor and ThreadPoolExecutor, but I’d like to hear your feedback first.
My main concern is lack of relevance because the huge overhead that comes with this parallelization only yields a speedup in very specific cases, namely for expensive reductions that on the other hand often come with a lot of memory that has to be distributed to the workers. Just adding integers will most likely always be faster just using standard
reduce and even for numpy arrays it’s not always clear if it will yield a speedup.
from concurrent.futures import ProcessPoolExecutor import itertools class MyPPE(ProcessPoolExecutor): def reduce(self, fn, seq, n=None, sync=False, *args, **kwargs): """Implementation of parallel reduction using the process pool; takes a reduction function and a sequence (either generator/iterator or list. If no. of elements is known, provide with argument 'n', else the iterator will be evaluated once at the beggining to infer the no. of elements. The reduction is carried out assuming associativity while ensuring the correct order of elements, i.e., retaining commutativity. The call of 'next' on the previous step's return generator of 'map' for submission of the next task implicitly awaits the evaluation of the previous step. Thus, dependencies of computations are resolved before submission. This makes deadlocks impossible while ensuring that the computation may proceed before full evaluation of the last operation on the previous sequence. The queue's FIFO implementation supports this cause, as the new sequence will be submitted by 'map' in the correct order (i.e., blocking). If explicit synchronization between steps is desired, simply toggle the 'sync' argument. If the number of elements in the sequence is odd, the first element is carried over to the next iteration and prepended to the result of 'map' for the next iteration. Note: Performance-wise using parallel reduce is only more efficient than standard reduce as it comes with a huge overhead for distributing the computation. Consequently, parallel reduce is only useful for computationally expensive reductions. Mind also that these computations often come with a lot of memory transfer that additionally slows down the submission. In order to actually find out, you probably have to try it out. Args: fn : callable reduction function, takes exactly two arguments and returns one seq : iterable iterable to act the reduction on n : int (default None) no. of elements in seq, will be inferred internally if not provided sync : bool flag to toggle synchronization between reduction steps Returns: result : dynamic result of reduction operatorion, type of elements of seq Examples: # Computation of gaussian formula, i.e., # summing up all contiguous elements between # 1 and n import operator n = 27 a = range(1,n+1) with MyPPE() as executor: res = executor.reduce(operator.add, a, n) print("Result of reduction: ", res) print("Result of analytic formula: ", n*(n+1)//2) """ # knowledge of n is required # to take care of remainders; # inference of n requires evaluation # of iterator if n is None: seq = list(seq) n = len(seq) # prepare initial iterator, will return seq if # seq already is an iterator/generator res = iter(seq) while(n>1): # check for remainder remainder = [next(res)] if n%2 else  # submit to job queue mapres = self.map(fn, res, res, *args, **kwargs) # append remainder res = itertools.chain(remainder, mapres) # apply synchronization if sync: res = iter(list(res)) # decrease element counter n = (n+1)//2 return next(res)