ThreadPoolExecutor.map with buffersize returns truncated iterable under certain circumstances

Hi folks!

I am wondering why my first example of ThreadPoolExecutor(max_workers=10).map(fn, iterable, buffersize=20) returns only 20 mapped items of 100. Using CPython 3.14.0.

Looking at the implementation of map() there is some weakref wizardry, is the reason for returning a truncated iterable the executor being garbage collected by the time the the buffer needs to be appended to? https://github.com/python/cpython/blob/a005835f699b5ba44beb8c856db1f62454522e1e/Lib/concurrent/futures/\_base.py#L633

Is this a CPython bug perhaps? Or expected behavior? Or am I holding it wrong?

import concurrent.futures


def get_results():
    ints = range(100)
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        return executor.map(str, ints, buffersize=20)

print(len(list(get_results())))  # -> 20 (???)


def get_results_without_buffersize():
    ints = range(100)
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        return executor.map(str, ints)


print(len(list(get_results_without_buffersize())))  # -> 100

def get_results_2():
    ints = range(100)
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        yield from executor.map(str, ints, buffersize=20)

print(len(list(get_results_2())))  # -> 100

def get_results_3():
    ints = range(100)
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        print(len(list(executor.map(str, ints, buffersize=20))))

get_results_3()  # -> 100

What happens if you collect all the results inside the context manager? I suspect this is because the function returns before finishing all the work, and the thread pool is closed.

That works as expected, see last example.

1 Like

Oh yeah :person_facepalming: sorry for not reading through. So I think you fully identified the issue. I wouldn’t call it a bug in Python–exiting from a context manager early is going to do that.

Not sure. Removing the buffersize argument makes the example work as expected (added this example above too):

def get_results_without_buffersize():
    ints = range(100)
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        return executor.map(str, ints)


print(len(list(get_results_without_buffersize())))  # -> 100

This might just be a race, because str([int]) is so fast. You could try with a slower function (i.e. something with a time.sleep in it) and see how many results you get.

Hm I just tested this myself (on 3.11) and it does seem to block until the whole map is complete…so perhaps this is indeed a bug…