Task/concurrent sugar

There’s a quantity and quality of boilerplate and overhead to doing concurrent work in Python that makes it difficult or counter-productive to leverage for small units of work.

Take the following:

class T:
  def __init__(self, a1, a2, a3) -> int:
    self.v1 = Class1(a1, 42, True)
    self.v2 = Class2(a2, 1.0, False, [])
    self.v3 = Class3(a3, None)

    count = self.v1.load(True)
    self.v1.other()

    na3 = sum(1 for i in a3 if i.initial_immutable_value = True)
    count += self.v2.validate(a3, na3)
    self.v2.other()

    count += self.v3.analyze(a3, ...)
    self.v3.other()

    return count

Every time I find myself handling something like this where just some small sub-pocket of code should be trivially parallelizable, wishing for something like:

def __init__(self, a1, a2, a3) -> int:
    @async:
        task with count1:
            self.v1 = Class1(a1, 42, True)
            count1 = self.v1.load(True)
            self.v1.other()

        task with count2:
            self.v2 = Class2(...)
            na3 = sum(1 for i in a3 if i.initial_immutable_value is True)
            count2 = self.v2.validate(a3, na3)
            self.v2.other()

        task with count3:
            self.v3 = Class3(...)
            count3 = self.v3.analyze(a3, ...)
            self.v3.other()

        else:
            # joined all 3 tasks
            count = count1 + count2 + count3

There is plenty of peril here, but this code doesn’t introduce the peril, and it’s easier to see when the code spends more time on what the programmer wants the code to implement rather than tries to wrestle with configuration and boilerplate trying to wrestle the system into giving it a little spurt of parallelism.

Async might be the wrong decorator-keyword here, I was just reaching for something that would let me follow an established pattern for breaking things up into obviously domained units.

Why not just use a concurrent.futures.ThreadPoolExecutor inside the initializer?

from concurrent.futures import ThreadPoolExecutor
from time import sleep

if __name__ == '__main__':
    with ThreadPoolExecutor() as executor:
        executor.submit(sleep, 5)
        executor.submit(sleep, 4)
        executor.submit(sleep, 3)

    # not called till all the submitted are done.
    print("Done!")

    # note that you may want to check the results, etc.

When used as a contextmanager, it’ll clean itself up after.

2 Likes

Not really sold on the syntax, or have something better in mind, but this is kinda annoying to do without proper lambdas depending on the task as you need to have standalone functions