Why isn't this BoundedThreadPoolExecutor bounded as expected?

There’s a stack overflow answer for creating a BoundedThreadPoolExecutor that accepts a limited number of pending work items, blocking additional .submit calls until an existing work item is finished.

I have written another one using a Semaphore to avoid referencing the internal queue of ThreadPoolExecutor. When I wrote this little test to verify that they both worked, I was and am still confused why my version BoundedThreadPoolExecutor2 behaves like I expect, but the stack overflow suggested implementation BoundedThreadPoolExecutor doesn’t… it seems to me that the fixed size Queue should also block additional submit calls.

Am I missing something obvious?

When I trace this with pdb, it shows that the work queue size is 0 whenever BoundedThreadPoolExecutor2.submit is entered, so it makes sense that it’s not being blocked, but also I don’t see what is clearing it. This happens even with perpetually running tasks…

#!/bin/python3

from concurrent import futures
import queue
import threading
import time


class BoundedThreadPoolExecutor(futures.ThreadPoolExecutor):
  """A ThreadPoolExecutor with a work queue of bounded size."""

  def __init__(
      self, max_workers: int | None = None, max_pending_calls: int = 1, **kwargs
  ):
    if max_pending_calls < 1:
      raise ValueError('Minimum `max_pending_calls` is 1.')
    super().__init__(max_workers, **kwargs)
    # Note that this assumes internals of futures.ThreadPoolExecutor but is
    # recommended in StackOverflow.
    self._work_queue = queue.Queue(maxsize=max_pending_calls)


class BoundedThreadPoolExecutor2(futures.ThreadPoolExecutor):
  def __init__(self, *args, max_pending_items, **kwargs):
    super().__init__(*args, **kwargs)
    self._semaphor = threading.Semaphore(max_pending_items)

  def submit(self, fn, /, *args, **kwargs):
    self._semaphor.acquire()
    f = super().submit(fn, *args, **kwargs)
    f.add_done_callback(lambda _: self._semaphor.release())
    return f

def job():
  for i in range(3):
    time.sleep(10)
    print(f'...{i}')
  print('Done')

def test_executor(executor):
  for i in range(5):
    print(f'{executor} submitting job {i}')
    executor.submit(job)
  executor.shutdown()

def main():
  # Works as expected (blocking)
  executor = BoundedThreadPoolExecutor2(max_pending_items=2)
  test_executor(executor)

  # Expected to block at 3rd submission but doesn't
  executor2 = BoundedThreadPoolExecutor(max_workers=5, max_pending_calls=2)
  test_executor(executor2)

if __name__ == '__main__':
  main()

This outputs

<__main__.BoundedThreadPoolExecutor2 object at 0x7f991cf35a90> submitting job 0                                                                             
<__main__.BoundedThreadPoolExecutor2 object at 0x7f991cf35a90> submitting job 1                                                                             
<__main__.BoundedThreadPoolExecutor2 object at 0x7f991cf35a90> submitting job 2                                                                             
...0                                                                                                                                                        
...0                                                                                                                                                        
...1                                                                                                                                                        
...1                                                                                                                                                        
...2                                                                                                                                                        
Done                                                                                                                                                        
<__main__.BoundedThreadPoolExecutor2 object at 0x7f991cf35a90> submitting job 3                                                                             
...2                                                                                                                                                        
Done                                                                                                                                                        
<__main__.BoundedThreadPoolExecutor2 object at 0x7f991cf35a90> submitting job 4                                                                             
...0                                                                                                                                                        
...0                                                                                                                                                        
...1                                                                                                                                                        
...1                                                                                                                                                        
...2                                                                                                                                                        
Done                                                                                                                                                        
...2                                                                                                                                                        
Done                                                                                                                                                        
...0                                                                                                                                                        
...1                                                                                                                                                        
...2                                                                                                                                                        
Done                                                                                                                                                        
<__main__.BoundedThreadPoolExecutor object at 0x7ff776b93f90> submitting job 0
<__main__.BoundedThreadPoolExecutor object at 0x7ff776b93f90> submitting job 1
<__main__.BoundedThreadPoolExecutor object at 0x7ff776b93f90> submitting job 2
<__main__.BoundedThreadPoolExecutor object at 0x7ff776b93f90> submitting job 3
<__main__.BoundedThreadPoolExecutor object at 0x7ff776b93f90> submitting job 4
...0
...0
...1
...1
...2
Done
...2
Done
...0
...0
...1
...1
...2
Done
...2
Done
...0
...1
...2
Done

Obvious? Maybe not. But yours is a better implementation, which does
what you’re after. Theirs is (a) reliant on an internal thing (the
submit queue) and (b) doesn’t do what you want.

Let’s take this each way.

Your approach using a semaphore to control use of pending submit()s is
direct and does exactly what you expected. Use it.

Their approach makes a limited capacity submit queue, behind the submit
call. But this will only block submits when the queue is full. Because
you’re using max_workers=5, the executor will be pulling items off the
queue as soon as they arrive until it has dispatched 5 worker threads.
So the queue doesn’t stay full until the workers are used up. So you get
to submit probably max_workers + max_pending_items times, because the
first max_workers items get pulled off the queue immediately for
execution.

1 Like

Also your setup of your class doesn’t specify max_workers. Might also be relevant.

Ah that makes so much sense, the workers pop the queue to start work so it wouldn’t include any in-progress items, only un-started items.

It also solves the second mystery of why I still see the SO version used without much issue, it still puts a bound and blocks jobs from being submitted, just at a higher bound.

If you have SO account, consider adding an answer with your version.

1 Like