Graceful exit from ThreadPoolExecutor when blocked on IO - problem and possible enhancement

(This is my first post here so apologies if it’s pitched wrong.)

I think something is missing from ThreadPoolExecutor, or perhaps its documentation. I have a program doing essentially the example in documentation, but wrapped in a long-running loop that polls for a list of URLs to fetch and then posting the results back. Using requests instead of urllib for brevity, it’s basically:

while keep_running:  # Long-running
    urls = requests.get('http://my-server/urls').json()  # List of URLs
    results = {}
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
        for future in concurrent.futures.as_completed(future_to_url):  # Short-running
            url = future_to_url[future]
            heartbeat()  # keep_running set to `False` by signal handler if heartbeat fails
            try:
                data = future.result()
            except Exception as exc:
                results[url] = '%r generated an exception: %s' % (url, exc)
            else:
                results[url] = len(data)
            if not keep_running:
                break
    requests.post('http://my-server/urls', json=results)

Sometimes an individual URL fails to load in a way that takes an unacceptably-long time to fail, for example a firewall along the way blocks the TCP SYN packet and the underlying OS has TCP parameters that mean it’s many minutes before a timeout is raised. In those cases I’d like a way to break out of the executor and post() the results that have arrived back to my-server. If there are 299 completed results and 1 stuck URL it’s a shame and pain if I can’t execute that last post() with the 299.

In my case keep_running is set to False by a SIGTERM handler, which is triggered by Kubernetes when lack of heartbeat() calls, because as_completed() is waiting forever on that stuck URL, causes the liveness check to fail. Other process-management solutions are of course available but will usually use some kind of signal to give applications a chance to shut down gracefully when it thinks they’re stuck or dead.

The problem is that both context manager and shutdown() won’t complete until all currently-executing futures complete, including the one for the stuck URL, so the program likely won’t get to that last post() before the process manager’s termination grace period expires and it uses SIGKILL for a forceful shutdown. Waiting for currently-executing futures is already documented fairly clearly, along with the warning not to use ThreadPoolExecutorfor long-running tasks”. Perhaps the documentation should say not to use ThreadPoolExecutor in long-running programs?

But then even in short-running programs the inability to exit gracefully is often a problem. If that list of URLs is given via the command-line in a short-running program (i.e. no outer while loop) that’s used as part of an overnight batch run, allowing one bad URL to cause results from 299 good ones to be lost is problematic.

I coded something of a workaround by not using either the context manager or shutdown(), and instead manually cancelling pending futures, only logging whether they were cancelled successfully or not, then continuing to break, but the code was a lot longer. I can post it in a reply on this thread if anyone’s interested.

The other way to prevent failing this way is to make sure everything executing via the ThreadPoolExecutor has suitable timeouts, but that’s actually quite tricky in practice. Every function call has to take a timeout parameter, or agree to get a timeout value from an environment variable or similar, and if third-party libraries are used it’s very likely to have a code path that won’t use the timeout. And since RawIOBase.read() doesn’t have a timeout parameter, any program doing “local” file I/O that, finds itself doing so via a cloudy-abstract remote-mounted file-system, that goes wrong without an exception being raised (e.g. a firewall blocks a TCP connection…) will find itself in the same boat as an HTTP request with an over-long or no timeout.

Could shutdown() and __enter__() have a new kwarg added to allow them to abandon waiting for running futures while exiting? Say “running_exit_timeout” which would default to None i.e. wait forever as current behaviour, but accept a float number of seconds to wait before abandoning the running futures and exiting anyway? While this might mean not clearing up e.g. database connections cleanly, that’s what would happen anyway with a process manager resorting to SIGKILL. “Running_exit_timeout” allows us to get whatever portion of work has successfully completed out, and shutdown everything else gracefully. Misuse of “running_exit_timeout” in scenarios where the program isn’t about to exit could lead to resource leakage, but I think that could be documented clearly enough so that reasonably-skilled programmers don’t make such a mistake.

1 Like

The core problem there, regardless of API changes to ThreadPoolExecutor, is that there is no way to stop code running in other thread in Python.

Ideally, we should move to support ways to kill threads - there is the pthread_kill system call, and I remember reading somewhere about the interpreter being able to gracefully stop threads - but there is no way to do that.

Without that, even if new parameters and controls are added to ThreadPoolExecutor, we’d still have a thread stalled in the executor. As a workaround for unexpected errors, and being able to deliver “299 out of 300” futures when the scenario arises, that might work, though.

I for one, would like one more acquainted to low level or system programing, or the Python internals do give a say on the feasibility of killing specific threads.

(and, in the meantime, Python’s answer for this kind of problems is asynchronous programming - every I/O call will simply pause and return control to a central place - and the final user code can pick one of several ways for determining a task timeout - if something is stuck forever waiting for a reply, it will leak some resources, but far less than a whole thread and interpreter state)

1 Like

Well, you can do it on a specific OS (Linux) with some hacking:

#!/usr/bin/env python3
import ctypes
import os
import signal
import sys
import threading
import time

# Load the C standard library
libc = ctypes.CDLL("libc.so.6", use_errno=True)

# Define the tgkill() function
tgkill = libc.tgkill
tgkill.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_int]
tgkill.restype = ctypes.c_int

def send_signal_to_thread(pid: int, tid: int, signal: int) -> int:
    """
    Calls the tgkill system call to send a signal to a specific thread.

    :param pid: The process ID (TGID).
    :param tid: The thread ID (TID) to which the signal should be sent.
    :param signal: The signal number to send.
    :return: 0 on success, or -1 on failure (errno is set).
    """
    result = tgkill(pid, tid, signal)
    if result != 0:
        err = ctypes.get_errno()
        raise OSError(err, os.strerror(err))
    return result

def send_signal_to_python_thread(thread: threading.Thread, signal_num: int):
    """
    Sends a signal to a specific Python Thread using its native ID.

    :param thread: The target threading.Thread object.
    :param signal_num: The signal number to send.
    """
    if not thread.is_alive():
        raise ValueError("Cannot send signal to a thread that is not running.")

    pid = os.getpid()
    tid = thread.native_id  # Get the native thread ID
    if tid is None:
        raise RuntimeError("Could not determine the thread's native ID.")

    print(f"Sending signal {signal_num} to thread {tid} in process {pid}")

    try:
        send_signal_to_thread(pid, tid, signal_num)
    except OSError as e:
        print(f"Error sending signal: {e}")

# Example usage:
def worker():
    print(f"Worker thread {threading.get_native_id()} started.")
    try:
        while True:
            print('.',end='')
            sys.stdout.flush()
            time.sleep(0.1)  # Simulate work
    except KeyboardInterrupt:
        print(f"Worker thread {threading.get_native_id()} interrupted.")

if __name__ == "__main__":
    worker_thread = threading.Thread(target=worker)
    worker_thread.start()

    time.sleep(2)  # Give the thread some time to start

    # Send SIGTERM to the worker thread
    send_signal_to_python_thread(worker_thread, signal.SIGTERM)

    # Allow some time for handling before exiting
    time.sleep(1)

but it’s typically a dirty, bad idea, which may be why it’s not in stdlib.

1 Like

if load_url is taking too long, the obvious solution is to have it take less time. requests.get( ) has a timeout parameter, why not use that?

I would love if we could make threads properly cancellable, just like asyncio tasks are. This would also pave the way for virtual threads in the future. Without cancellable threads, any future virtual thread implementation will either also not support cancellation (bad) or be too different from normal thread, defeating the point (also bad). From what I remember Java does it, so it must not be impossible, right?

I’m the author of aiofiles and I get complaints about this all the time. Folks expect async cancellation semantics, but since I use threads behind the scenes, I can’t really do much about it.

2 Likes

That’s what I did to solve it in my simple case to get some sort of reliability in production. But what if, as I said…

I don’t think “just use timeouts” solves the general problem I’m talking about here.

1 Like

Would adding the parameter I proposed be a PEP, or is there a lighter-weight process?

It doesn’t sound like I’ve set off a firestorm of demand either way…

Being able to kill threads would indeed be great.

Technically, the “parameter you proposed” if far from being a simple thing -
so, I guess we could chat a bit more here to ensure we are not going the wrong side on a XY problem

To contextualize - IOBase.read will boil down to simply calling the C’s stdlib read method in
Python/fileutils.c and that happens do be a blocking, synchronous call as in: it can’t be stopped but by an O.S. signal.
(And them we are back on handling signals per thread, instead of only on the main thread).

The way to read files in a seemingly unblocking way is to just issue a read call after using some “Selector technique” to ensure the call won’t block the current thread - and, that is what asyncio is about.

However, as @Tinche , the author of aiofiles, posted here in this very thread, even the most widely used async-file related library in Python doesn’t use this approach, and relies on threads instead. I am not sure the motive for that, but it may be that selector-based no blocking file read is hard to do cross-platform.

Changing normal file read to use something that would use a timing-out selector is certainly enough change to require a PEP. And more likely, we should get an asynchronous file API in the stdlib using selectors (I am not even sure it is possible under Windows) rather than changing default file access.

For other “stuck threads” when doing network business, like in your real example: using non-blocking, async calls is the modern way to go, and that is one of the main gains of asynchronous programing. A task blocked on an API return can just be cancelled, and that is it. (Async networking code do make use of “Selector” approaches.

That said, the ability to “kill a thread” seems to be a couple LoC of being able to be complete (as I said, I saw somewhere the cpython runtime supports it on the native code side, supports as in “it won’t die”.)

TL;DR: I am not against a “timeout” parameter to IOBase.read - but I have the understanding there are technical roadblocks that way - and maybe not work-aroundable at all.
And on the other hand, a mechanism to “kill threads” can be enough to the very real use cases you depict. Along with some Executor refactoring which would allow for respawning dead workers, or some other “healing”.
I perceive both of these as “small” enough changes which could possibly go by without a PEP, but both certainly would need a core-dev sponsorship anyway.

And, finally, as a byproduct, maybe it is time to bring about a PEP for stdlib, selector based, asynchronous file I/O - but that, while desirable for the language, probably wouldn’t fix the issues you had in synchronous code.