Hi,
Is the .add_done_callback[0] method in concurrent.futures.Future thread-safe per executor? Can it be used to modify mutable data without worry of corruption?
Here’s a quick, silly example using the ProcessPoolExecutor. The range is reversed to simulate CPU intensive tasks being completed at different times.
from concurrent.futures import ProcessPoolExecutor
from functools import partial
def main():
total = {
'success': 0,
'errors': 0,
'total': 0,
'results': [],
}
totaller_cb = partial(totaller, total)
with ProcessPoolExecutor() as executor:
for i in range(1_000, 0, -1):
future = executor.submit(f, i)
future.add_done_callback(totaller_cb)
future.add_done_callback(totaller_cb)
assert total['errors'] == 284
assert total['success'] == 1_716
assert total['total'] == 2_000
assert len(total['results']) == total['success']
def f(x):
result = x*x
if result % 7 == 0:
raise Exception
return result
def totaller(total, future):
try:
result = future.result()
except Exception:
total['errors'] += 1
else:
total['success'] += 1
total['results'].append(result)
finally:
total['total'] += 1
if __name__ == '__main__':
main()
As far as I can tell, there is only one worker thread in the ProcessPoolExecutor to manage the callbacks and it is only created once[1].
If it is as thread-safe as I hope it is, it might be nice to mention it in the documentation of .add_done_callback to make it explicit. The current documentation has:
Added callables are called in the order that they were added and are always called in a thread belonging to the process that added them.
which, if it is thread-safe, could be more explicit as
Added callables are called in the order that they were added and are always called in the managed worker thread belonging to the process that added them. The managed worker thread guarantees thread-safe execution.
I only ever see the .as_completed function mentioned and never the .add_done_callback and I’m assuming the thread-safety assumption is why. Personally, I find the .add_done_callback easier to maintain as well as appearing more responsive to the user.
For example, the ThreadPoolExecutor example could then be re-written as,
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistant-subdomain.python.org/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
def printer(url):
def cb(future):
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
return cb
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for url in URLS:
future = executor.submit(load_url, url, 60)
future.add_done_callback(printer(url))
which would log the output as each thread completes its task even while new tasks are still to be added. Not a big deal when it’s only a few URLs to check, but, if there hundreds to thousands to millions, it’d make a huge difference.
[0] concurrent.futures — Launching parallel tasks — Python 3.12.3 documentation
[1] cpython/Lib/concurrent/futures/process.py at 3.12 · python/cpython · GitHub
Thanks in advance for any help or clarification.
Regards,
Sam