(This is my first post here so apologies if it’s pitched wrong.)
I think something is missing from ThreadPoolExecutor
, or perhaps its documentation. I have a program doing essentially the example in documentation, but wrapped in a long-running loop that polls for a list of URLs to fetch and then posting the results back. Using requests
instead of urllib
for brevity, it’s basically:
while keep_running: # Long-running
urls = requests.get('http://my-server/urls').json() # List of URLs
results = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url): # Short-running
url = future_to_url[future]
heartbeat() # keep_running set to `False` by signal handler if heartbeat fails
try:
data = future.result()
except Exception as exc:
results[url] = '%r generated an exception: %s' % (url, exc)
else:
results[url] = len(data)
if not keep_running:
break
requests.post('http://my-server/urls', json=results)
Sometimes an individual URL fails to load in a way that takes an unacceptably-long time to fail, for example a firewall along the way blocks the TCP SYN packet and the underlying OS has TCP parameters that mean it’s many minutes before a timeout is raised. In those cases I’d like a way to break out of the executor
and post()
the results that have arrived back to my-server
. If there are 299 completed results and 1 stuck URL it’s a shame and pain if I can’t execute that last post()
with the 299.
In my case keep_running
is set to False
by a SIGTERM handler, which is triggered by Kubernetes when lack of heartbeat()
calls, because as_completed()
is waiting forever on that stuck URL, causes the liveness check to fail. Other process-management solutions are of course available but will usually use some kind of signal to give applications a chance to shut down gracefully when it thinks they’re stuck or dead.
The problem is that both context manager and shutdown()
won’t complete until all currently-executing futures complete, including the one for the stuck URL, so the program likely won’t get to that last post()
before the process manager’s termination grace period expires and it uses SIGKILL for a forceful shutdown. Waiting for currently-executing futures is already documented fairly clearly, along with the warning not to use ThreadPoolExecutor
“for long-running tasks”. Perhaps the documentation should say not to use ThreadPoolExecutor
in long-running programs?
But then even in short-running programs the inability to exit gracefully is often a problem. If that list of URLs is given via the command-line in a short-running program (i.e. no outer while
loop) that’s used as part of an overnight batch run, allowing one bad URL to cause results from 299 good ones to be lost is problematic.
I coded something of a workaround by not using either the context manager or shutdown()
, and instead manually cancelling pending futures, only logging whether they were cancelled successfully or not, then continuing to break
, but the code was a lot longer. I can post it in a reply on this thread if anyone’s interested.
The other way to prevent failing this way is to make sure everything executing via the ThreadPoolExecutor
has suitable timeouts, but that’s actually quite tricky in practice. Every function call has to take a timeout parameter, or agree to get a timeout value from an environment variable or similar, and if third-party libraries are used it’s very likely to have a code path that won’t use the timeout. And since RawIOBase.read()
doesn’t have a timeout parameter, any program doing “local” file I/O that, finds itself doing so via a cloudy-abstract remote-mounted file-system, that goes wrong without an exception being raised (e.g. a firewall blocks a TCP connection…) will find itself in the same boat as an HTTP request with an over-long or no timeout.
Could shutdown()
and __enter__()
have a new kwarg added to allow them to abandon waiting for running futures while exiting? Say “running_exit_timeout” which would default to None
i.e. wait forever as current behaviour, but accept a float
number of seconds to wait before abandoning the running futures and exiting anyway? While this might mean not clearing up e.g. database connections cleanly, that’s what would happen anyway with a process manager resorting to SIGKILL. “Running_exit_timeout” allows us to get whatever portion of work has successfully completed out, and shutdown everything else gracefully. Misuse of “running_exit_timeout” in scenarios where the program isn’t about to exit could lead to resource leakage, but I think that could be documented clearly enough so that reasonably-skilled programmers don’t make such a mistake.