Add a new submit_f to ThreadPoolExecutor

Sorry if this idea has already been brought up.

concurrent.futures.ThreadPoolExecutor is a great way to execute a function in a thread pool. But it’s a bit inconvenient for functions that will trigger another function in the same thread pool and need to wait for the result to return.

Give the following code as an example:

import concurrent.futures
import time

class Tree:
    def __init__(self, left=None, right=None):
        self.left = left
        self.right = right

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)

def count_tree(tree):
    time.sleep(1)
    count = 1

    if tree.left:
        f = executor.submit(count_tree, tree.left)
        count += f.result()

    if tree.right:
        f = executor.submit(count_tree, tree.right)
        count += f.result()

    return count

t = Tree(left=Tree(left=Tree(left=Tree()), right=Tree()))
f = executor.submit(count_tree, t)
print(f.result())

We can’t actually do this because if the tree is deep enough, we will use up all the threads and cause a deadlock.

We can change the code to something like this:

import concurrent.futures
import time

class Tree:
    def __init__(self, left=None, right=None):
        self.left = left
        self.right = right

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)

def count_tree_in_thread(tree):
    future = concurrent.futures.Future()
    executor.submit(count_tree_inner, future, tree)
    return future

def count_tree_inner(future, tree):
    time.sleep(1)
    left_counted = right_counted = False
    count = 1

    def _set_result():
        if left_counted and right_counted:
            future.set_result(count)

    if tree.left:

        def _left_result(f):
            nonlocal count, left_counted
            left_counted = True
            count += f.result()
            _set_result()

        f = count_tree_in_thread(tree.left)
        f.add_done_callback(_left_result)
    else:
        left_counted = True

    if tree.right:
        right = None

        def _right_result(f):
            nonlocal count, right_counted
            right_counted = True
            count += f.result()
            _set_result()
        f = count_tree_in_thread(tree.right)
        f.add_done_callback(_right_result)
    else:
        right_counted = True

    _set_result()

t = Tree(left=Tree(left=Tree(left=Tree()), right=Tree()))
f = count_tree_in_thread(t)
print(f.result())

We can use a future to get the result from count_tree_inner so we don’t need to wait for it. The problem is we already have a future when we call submit, it’s just we can’t access to the future in the target function.

We can add a new submit_f method to ThreadPoolExecutor, when the target function is called, the first parameter will be the future returned by submit_f. So the code will be like this:

import concurrent.futures
import time

class Tree:
    def __init__(self, left=None, right=None):
        self.left = left
        self.right = right

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)

def count_tree(future, tree):
    time.sleep(1)
    left_counted = right_counted = False
    count = 1

    def _set_result():
        if left_counted and right_counted:
            future.set_result(count)

    if tree.left:

        def _left_result(f):
            nonlocal count, left_counted
            left_counted = True
            count += f.result()
            _set_result()

        f = executor.submit_f(count_tree, tree.left)
        f.add_done_callback(_left_result)
    else:
        left_counted = True

    if tree.right:
        right = None

        def _right_result(f):
            nonlocal count, right_counted
            right_counted = True
            count += f.result()
            _set_result()
        f = executor.submit_f(count_tree, tree.right)
        f.add_done_callback(_right_result)
    else:
        right_counted = True

    _set_result()

t = Tree(left=Tree(left=Tree(left=Tree()), right=Tree()))
f = executor.submit_f(count_tree, t)
print(f.result())

No wrapper method is needed and no future is wasted.

Some potential problems:

  1. now it’s the target function’s responsibility to ensure the future will eventually be done.
  2. The document of future says the set_result is meant for use in executor implementation, now target function will also use it.

I have a very simple draft implementation here. The test code is here.

In your sample code, it’s unnecessary for count_tree to set_result manually because it can simply return the result.

Your requirement reminds me of asyncio.current_task(), which returns the enclosing task (future) object for the running coroutine. Maybe a counterpart, such as a current_future() function that returns the enclosing future, would serve your purpose and potentially others?

Though it feels like a design problem if a function has to handle how it is executed. Maybe there should be a better solution!

If tree.left and tree.right are not both None, we have to manually call _set_result, so we use _set_result uniformly.

current_future will definitely solve our problem. But I think it’s relatively harder to implement for threading environment. And more importantly, it will add overhead that normal ThreadPoolExecutor usage do not need.

Interestingly, for coroutines, we don’t need to call asyncio.current_task() because we can just use await to let other coroutines run. We actually prefer coroutine instead of threading in our work, but this task is heavily file I/O bound(both high IO output and high IOPS) so we have to use threading here.