Add `filter()` method for `multiprocessing.Pool` class

Pitch

Currently multiprocessing.Pool.map is a parallel replacement for map builtin. It would be nice to add multiprocessing.Pool.filteras a parallel replacement for filter builtin.

Implementation

Similar to Pool.map() method, it should be divide the input sequence to multiple chunks and feed in different pool workers. But the filter procedures after mapping should per-chunks and in parallel. Finally join the filtered sequence together. This can maximum the filtering parallelism.

2 Likes

How would you use it? Could you please show a simple realistic example?

@storchaka
Very simple:

for number theory: (given the prime number under 10000000)

with multiprocessing.Pool(8) as pool:
    primes = pool.filter(isprime, range(2,10000000))

All the place where filter is used can be replaced by this.

If user currently want to filter a sequence in parallel, please read https://stackoverflow.com/a/34059823 it has to add additional data structure (flag/return value) in the result list. And filtering process has to be sequentially. This is low efficient.

I believe this new method is not difficult to implement.

I don’t see how you get a speed-up compared to using map() and then filtering post-execution. Is there a performance win somehow? Or is this just for ergonomics?

If the testing cost is dominant, Pool.filter() can be simulted with Pool.map(). For example: primes = filter(None, pool.map(prime_or_none, range(2,10000000)) where prime_or_none(n) returns n if it is prime and None otherwise. It has an overhead of transferring all these Nones. In this case it is small, but it will be greater if the ratio between negative and positive test results be greater.

But what if the input is expensive too? filter() is usually combined with map() or other non-trivial generator (file iterator, CSV reader generator, etc). How do you combine Pool.filter() with Pool.map()? What if you have more complex flow like filter(..., map(..., filter(...), filter(..., map(...))))? Making simple Pool.filter() is easy, but it could be only used in some simple cases, making a tool useful in complex cases is difficult.

3 Likes

I don’t see how you get a speed-up compared to using map() and then filtering post-execution. Is there a performance win somehow? Or is this just for ergonomics?

Because the ideal multiprocessing Pool filter kicks the negative result out from each worker on-the-fly with testing. But The built-in filter need to wait all the worker is finished. And then execute the screening procedures unparallelly. The performance difference is significant when the number of workers is big.

The ergonomics is also a important reason. Because Python code need to be Pythonic.

When input is expensive too. I don’t think there is any disadvantage of Pool.filter(...) in comparison with filter(...) . Pool.filter(...) and filter(...) is semantically equivalent and can be inter-changable in any situation.

I thought about a way to approach the problem without a Pool.filter().

The parallelized function could take in the range of items to process, filter, and return an iterator or a list that can be merged (chain, zip) from within the caller.

1 Like