In the past I’ve often worked with the ProcessPoolExecutor from concurrent.futures and found its map
utility very convenient for clean parallel execution. When you want to perform a reduction operation on the results (e.g. adding all the results), you’d have to await the map-results and then perform it in the main thread. While this might only be useful for computationally expensive reductions, this type of operation can easily be carried out in parallel.
I’ve written an implementation that worked nicely with my use cases of adding a sequence of numpy arrays coming from the executor’s map
(see with more details below). I wanted to suggest adding sth. similar to the concurrent.futures module, possibly for both, ProcessPoolExecutor and ThreadPoolExecutor, but I’d like to hear your feedback first.
My main concern is lack of relevance because the huge overhead that comes with this parallelization only yields a speedup in very specific cases, namely for expensive reductions that on the other hand often come with a lot of memory that has to be distributed to the workers. Just adding integers will most likely always be faster just using standard reduce
and even for numpy arrays it’s not always clear if it will yield a speedup.
from concurrent.futures import ProcessPoolExecutor
import itertools
class MyPPE(ProcessPoolExecutor):
def reduce(self, fn, seq, n=None, sync=False, *args, **kwargs):
"""Implementation of parallel reduction using
the process pool;
takes a reduction function and a sequence (either
generator/iterator or list. If no. of elements
is known, provide with argument 'n', else the
iterator will be evaluated once at the beggining
to infer the no. of elements.
The reduction is carried out assuming associativity
while ensuring the correct order of elements, i.e.,
retaining commutativity.
The call of 'next' on the previous step's return generator
of 'map' for submission of the next task implicitly
awaits the evaluation of the previous step. Thus,
dependencies of computations are resolved before
submission. This makes deadlocks impossible while
ensuring that the computation may proceed before full
evaluation of the last operation on the previous sequence.
The queue's FIFO implementation supports this cause, as
the new sequence will be submitted by 'map' in the
correct order (i.e., blocking).
If explicit synchronization between steps is desired,
simply toggle the 'sync' argument.
If the number of elements in the sequence is odd,
the first element is carried over to the next iteration
and prepended to the result of 'map' for the next iteration.
Note: Performance-wise using parallel reduce is only
more efficient than standard reduce as it comes with a
huge overhead for distributing the computation.
Consequently, parallel reduce is only useful for
computationally expensive reductions. Mind also that
these computations often come with a lot of memory
transfer that additionally slows down the submission.
In order to actually find out, you probably have to
try it out.
Args:
fn : callable
reduction function, takes exactly two arguments and
returns one
seq : iterable
iterable to act the reduction on
n : int (default None)
no. of elements in seq, will be inferred internally
if not provided
sync : bool
flag to toggle synchronization between reduction steps
Returns:
result : dynamic
result of reduction operatorion, type of elements of seq
Examples:
# Computation of gaussian formula, i.e.,
# summing up all contiguous elements between
# 1 and n
import operator
n = 27
a = range(1,n+1)
with MyPPE() as executor:
res = executor.reduce(operator.add, a, n)
print("Result of reduction: ", res)
print("Result of analytic formula: ", n*(n+1)//2)
"""
# knowledge of n is required
# to take care of remainders;
# inference of n requires evaluation
# of iterator
if n is None:
seq = list(seq)
n = len(seq)
# prepare initial iterator, will return seq if
# seq already is an iterator/generator
res = iter(seq)
while(n>1):
# check for remainder
remainder = [next(res)] if n%2 else []
# submit to job queue
mapres = self.map(fn, res, res, *args, **kwargs)
# append remainder
res = itertools.chain(remainder, mapres)
# apply synchronization
if sync:
res = iter(list(res))
# decrease element counter
n = (n+1)//2
return next(res)