Is the .add_done_callback in Future thread-safe?

Hi,

Is the .add_done_callback[0] method in concurrent.futures.Future thread-safe per executor? Can it be used to modify mutable data without worry of corruption?

Here’s a quick, silly example using the ProcessPoolExecutor. The range is reversed to simulate CPU intensive tasks being completed at different times.

from concurrent.futures import ProcessPoolExecutor
from functools import partial


def main():
    total = {
        'success': 0,
        'errors': 0,
        'total': 0,
        'results': [],
    }
    totaller_cb = partial(totaller, total)
    with ProcessPoolExecutor() as executor:
        for i in range(1_000, 0, -1):
            future = executor.submit(f, i)
            future.add_done_callback(totaller_cb)
            future.add_done_callback(totaller_cb)
    assert total['errors'] == 284
    assert total['success'] == 1_716
    assert total['total'] == 2_000
    assert len(total['results']) == total['success']


def f(x):
    result = x*x
    if result % 7 == 0:
        raise Exception
    return result


def totaller(total, future):
    try:
        result = future.result()
    except Exception:
        total['errors'] += 1
    else:
        total['success'] += 1
        total['results'].append(result)
    finally:
        total['total'] += 1


if __name__ == '__main__':
    main()

As far as I can tell, there is only one worker thread in the ProcessPoolExecutor to manage the callbacks and it is only created once[1].

If it is as thread-safe as I hope it is, it might be nice to mention it in the documentation of .add_done_callback to make it explicit. The current documentation has:

Added callables are called in the order that they were added and are always called in a thread belonging to the process that added them.

which, if it is thread-safe, could be more explicit as

Added callables are called in the order that they were added and are always called in the managed worker thread belonging to the process that added them. The managed worker thread guarantees thread-safe execution.

I only ever see the .as_completed function mentioned and never the .add_done_callback and I’m assuming the thread-safety assumption is why. Personally, I find the .add_done_callback easier to maintain as well as appearing more responsive to the user.

For example, the ThreadPoolExecutor example could then be re-written as,

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistant-subdomain.python.org/']


def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()


def printer(url):
    def cb(future):
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
    return cb


with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    for url in URLS:
        future = executor.submit(load_url, url, 60)
        future.add_done_callback(printer(url))

which would log the output as each thread completes its task even while new tasks are still to be added. Not a big deal when it’s only a few URLs to check, but, if there hundreds to thousands to millions, it’d make a huge difference.

[0] concurrent.futures — Launching parallel tasks — Python 3.12.3 documentation
[1] cpython/Lib/concurrent/futures/process.py at 3.12 · python/cpython · GitHub

Thanks in advance for any help or clarification.

Regards,

Sam