Sorry if this idea has already been brought up.
concurrent.futures.ThreadPoolExecutor
is a great way to execute a function in a thread pool. But it’s a bit inconvenient for functions that will trigger another function in the same thread pool and need to wait for the result to return.
Give the following code as an example:
import concurrent.futures
import time
class Tree:
def __init__(self, left=None, right=None):
self.left = left
self.right = right
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
def count_tree(tree):
time.sleep(1)
count = 1
if tree.left:
f = executor.submit(count_tree, tree.left)
count += f.result()
if tree.right:
f = executor.submit(count_tree, tree.right)
count += f.result()
return count
t = Tree(left=Tree(left=Tree(left=Tree()), right=Tree()))
f = executor.submit(count_tree, t)
print(f.result())
We can’t actually do this because if the tree is deep enough, we will use up all the threads and cause a deadlock.
We can change the code to something like this:
import concurrent.futures
import time
class Tree:
def __init__(self, left=None, right=None):
self.left = left
self.right = right
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
def count_tree_in_thread(tree):
future = concurrent.futures.Future()
executor.submit(count_tree_inner, future, tree)
return future
def count_tree_inner(future, tree):
time.sleep(1)
left_counted = right_counted = False
count = 1
def _set_result():
if left_counted and right_counted:
future.set_result(count)
if tree.left:
def _left_result(f):
nonlocal count, left_counted
left_counted = True
count += f.result()
_set_result()
f = count_tree_in_thread(tree.left)
f.add_done_callback(_left_result)
else:
left_counted = True
if tree.right:
right = None
def _right_result(f):
nonlocal count, right_counted
right_counted = True
count += f.result()
_set_result()
f = count_tree_in_thread(tree.right)
f.add_done_callback(_right_result)
else:
right_counted = True
_set_result()
t = Tree(left=Tree(left=Tree(left=Tree()), right=Tree()))
f = count_tree_in_thread(t)
print(f.result())
We can use a future to get the result from count_tree_inner
so we don’t need to wait for it. The problem is we already have a future when we call submit
, it’s just we can’t access to the future in the target function.
We can add a new submit_f
method to ThreadPoolExecutor
, when the target function is called, the first parameter will be the future returned by submit_f
. So the code will be like this:
import concurrent.futures
import time
class Tree:
def __init__(self, left=None, right=None):
self.left = left
self.right = right
executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
def count_tree(future, tree):
time.sleep(1)
left_counted = right_counted = False
count = 1
def _set_result():
if left_counted and right_counted:
future.set_result(count)
if tree.left:
def _left_result(f):
nonlocal count, left_counted
left_counted = True
count += f.result()
_set_result()
f = executor.submit_f(count_tree, tree.left)
f.add_done_callback(_left_result)
else:
left_counted = True
if tree.right:
right = None
def _right_result(f):
nonlocal count, right_counted
right_counted = True
count += f.result()
_set_result()
f = executor.submit_f(count_tree, tree.right)
f.add_done_callback(_right_result)
else:
right_counted = True
_set_result()
t = Tree(left=Tree(left=Tree(left=Tree()), right=Tree()))
f = executor.submit_f(count_tree, t)
print(f.result())
No wrapper method is needed and no future is wasted.
Some potential problems:
- now it’s the target function’s responsibility to ensure the future will eventually be done.
- The document of future says the
set_result
is meant for use in executor implementation, now target function will also use it.
I have a very simple draft implementation here. The test code is here.