(This is a follow up to Improving support for non-daemon background threads and Getting Rid of Daemon Threads after recently running into a case where I had to set daemon=True
in my own code)
Working with httpx-ws recently, I wanted to provide both synchronous and asynchronous versions of a websocket client API. I also wanted to allow easy interactive use of the synchronous API by handling the case where callers didn’t explicitly close their client sockets. Trying to do this without daemon threads runs directly into the “threads are joined before atexit
handlers run” problem. However, having the concrete use case to work with gave me a better idea of what features I wish the standard library could have provided to let me solve the resource management problem without daemon threads.
Thread termination callbacks
I still wanted the functionality of atexit.register_early
(suggested in the previous thread), but with a narrower framing: thread termination. For anything other than terminating threads, the existing atexit
machinery is fine, since it won’t block the “join all non-daemon threads” shutdown step.
Framing the registration problem this way leads to the following design proposal:
- Add
terminate=None
to thethreading.Thread
constructor - Add a default
threading.Thread.__terminate__
method that calls theterminate
callback (if set) and does nothing otherwise - Add an additional step to
threading._shutdown
(after marking the main thread as shutting down, but before joining the non-daemon threads) that calls__terminate__
on all the active threads reported bythreading.enumerate()
A dunder-name is suggested to avoid backwards compatibility issues with thread subclasses that already define terminate()
methods that may not be suitable for calling in this shutdown use case. Compatible subclasses can opt-in to the new feature by setting __terminate__ = terminate
.
In the absence of this feature, I emulated it with atexit
by marking the affected threads as daemon
threads (so they didn’t block the implicit join step) and then triggering their termination from the atexit
handler.
Asynchronous background threads
This is technically a separate idea, but it is what makes the thread termination callback approach more generally useful. The problem with synchronous background threads that perform blocking operations is that it isn’t always easy (or reliable) to interrupt those blocking calls from the main thread. Instead, the idea is most effective when the background thread is running an asynchronous event loop, so “please terminate now” is just another event to be processed (and the termination handling can be entirely abstracted away via task groups).
This part of the suggestion would be to provide a BackgroundThread
class in a new asyncio.threading
submodule:
class BackgroundThread(threading.Thread):
def __init__(self, group=None, task_target=None, name=None, args=(), kwargs={}):
# Accepts the same args as `threading.Thread`, *except*:
# * a `task_target` coroutine replaces the `target` function
# * No `daemon` option
# * No `terminate` option (always sets the termination event)
# Variant: accept `debug` and `loop_factory` options to forward to `asyncio.run`
# Alternative: accept a `task_runner` callback, defaulting to `asyncio.run`
self._task_target = task_target
self._terminate = asyncio.Event()
self._event_loop = None
super().__init__(group, None, name, args, kwargs, self.terminate)
def run():
"""Run an async event loop in the background thread"""
asyncio.run(self._run_until_terminated())
async def run_task():
try:
if self._task_target is not None:
await self._task_target(*self._args, **self._kwargs))
finally:
del self._target, self._args, self._kwargs
def terminate(self):
loop = self._event_loop
if loop is None:
return
loop.call_soon_threadsafe(self._terminate.set)
async def _run_until_terminated():
"""Run task in the background thread until termination is requested."""
self._event_loop = asyncio.get_running_loop()
try:
raise_on_termination, terminated_exc = self._raise_on_termination()
async with asyncio.TaskGroup() as tg:
tg.create_task(raise_on_termination)
tg.create_task(self.run_task())
except* terminated_exc:
pass # Graceful shutdown request
finally:
self._event_loop = None
def _raise_on_termination(self):
class TerminateTask(Exception):
pass
async def raise_on_termination() -> NoReturn:
await self._terminate.wait()
raise TerminateTask
return raise_on_termination(), TerminateTask
This part of the idea is taken directly from what I actually implemented to solve my synchronous API design problem (although my current code doesn’t cleanly separate concerns the way this code does - the termination support is implemented directly in the thread class that implements the rest of the background thread behaviour).
Edit: fixed the thread termination request implementation (I had oversimplified it when extracting the generalised proposal from my actual code)