Adding “finalizer” to the threading library

Well, as far as i known, the current solution of doing some cleanup stuffs after a thread ended, is to start a new monitor-thread.
But this is limited. E.g. in sqlite3, you can’t use or close a connection object created in other threads.
So this way of cleanup can’t be used.

As the result, we can only create a subclass of the threading.Threading, and modify the Thread.run() method to do the cleanup job. Here’s an example:

class MyThread(threading.Thread):
    def __init__(self):
        ...
    def run(self):
        super().run()
        cleanup(...)

I wonder if we can impliment a cleanup api in the threading library.
It will surely be a great help in some situations.

1 Like

The run() method doesn’t really do much. You can just as easily put whatever code you need directly into your own run() method and not bother calling super.

Having a callback on a thread would lead to some confusion as to which thread runs that callback. Is it run in the thread itself, or in the thread that set the callback, or in the main thread? All three could be useful. For the semantics you’re describing, where the callback happens in the same thread that ran the original function, overriding run() is easily the best way to do it. (You may want to stick your callback into a try/finally to ensure that it happens even if your main thread function bombs.) If the thread were to grow an actual callback, I would expect that it happens in the caller’s context - something like this (pseudocode):

t = Thread(target=some_func)
t.start()
...
...
t.join(); callback()

but without explicitly joining. However, this sort of thing is easy to get wrong, and if I found myself wanting this, I would most likely reach for ThreadPoolExecutor or some other such API instead.

2 Likes

add_done_callback(*callback* , *** , *context=None* ):

from concurrent.futures import ThreadPoolExecutor


def task():
    # Perform some work
    return "Result"


def callback(future):
    print(f"Callback got: {future.result()}")


executor = ThreadPoolExecutor(max_workers=2)
future = executor.submit(task)
future.add_done_callback(callback)
executor.shutdown(wait=True)

Would you consider moving this thread to Python Help? It will receive much more exposure there.

This is a good idea, and we actually need this to solve some issues in ThreadPoolExecutor (see concurrent.future.ThreadPoolExecutor should parameterize class used for threads · Issue #89502 · python/cpython · GitHub). There is a way to execute the user code when initializing a new thread in ThreadPoolExecutor, but there is no way to execute the user when finalizing it, therefore resources acquired in the initializer are leaked ar depend on the garbage collector to be released. It is less needed in the explicitly created threads because you can simply use try/finally in your target functuin or Thread.run method, it would be just a convenience.

It would be great to have something like atexit.register() or TestCase.addCleanup() to register a cleanup code executed when the thread is finalizing. There are only two questions:

  • Where to place it? Should it be the threading function (applied to the current thread) or the Thread method?
  • How to name it?

We can also add a convenient function/method for context managers, like ExitStack.enter_context() and TestCase.enterContext().

2 Likes

Currently, i can use a thread local object (threading.local()) to try to do some cleanup works.

I write a special class which only has a __del__() method. When the thread is terminated, the __del__() method will be executed, and it will do some cleanup.

However, according to my experiment, the __del__() method is executed by the grabage collector, which is in the main thread, and this is the inconvenient point for me now.

1 Like

Agreed this would be nice. It’s not something I’ve had much need for in practice, but it’s a logical addition to the API, and as you say makes cleanup of thread pool workers easier.

A thread method is more flexible, as you can always do threading.current_thread().add_finalizer(). And while I don’t have an actual use case, there may be reasons to want to add a finalizer to a thread that isn’t the current one.

I don’t have strong feelings on the name or API - I went for add_finalizer (with a corresponding remove_finalizer) on the assumption that the API would be like atexit, managing a “stack” of finalizers, but that may be more complexity than we actuallyu need.

1 Like

That’s a good idea, add_finalizer(), pretty clear.

Btw, does anyone known how can i change the title of this topic??

There should be a little pencil icon by the title on the first post:

You can’t, you don’t have the necessary privileges. Specifically, you can only edit your own post for 24 hours after creation. If you tell us what the title should be, we can still do that.

Adding “finalizer” to the threading library

How do you think of this?

It will be appreciated if you can do so.

Would it be valid to add a finalizer to a different thread?

… what if that thread is already in progress (or in the middle of processing other finalizers… or just completed?.. can a finalizer add another finalizer?)

2 Likes

Good question!
In my opinion,

  • If the thread is in progress, it shall be able to add new finalizers, since it may open some resources that needs to be free up. Otherwise, if the finalizer can only be added before the thread starts, then why not just use a try/finally?
  • However, inside the finalizer, adding another finalizer is meaningless, since itself is supposed to do all the clean-up works.

I had figured there could be more than one finalizer. Kind of like how there can be multiple atexits.

In that world I assumed that a finalizer could add another finalizer if they were in FIFO order.

Is there just one finalizer allowed? Would there be a getter for it too?

One other random thought is that it might be nice to have a symmetric API on the multiprocessing side.

Supporting a queue of finalizers is more converninent. You can imitate this with an ExitStack, but you need to have control on all finalizers, it is not so easy if some finalizer was already added and you need to add another. So it is better to support this from the start.

I do not think that there is a need to remove finalizers. This can create new problems due to breaking the FIFO order.

Context managers in many cases can be imitated by add_finalizer(cm.__exit__, None, None, None). But there is more here. The context manager can work differently depending whether the exception was raised, and it can silence the current exception, so we may need a separate API for context managers if we want to support this feature.

It is easy to implement support of finalizers in the thread created in Python, but there may be problems with other threads (the main thread and threads created in extensions). So this feature may be limited.

It’s not something I’ve had much need for in practice

Hi, I recently came across a case that I really need this thread_finalizer for ThreadPoolExecutor.

I have a web service with Django and MySQL, and nightly it recreates the whole database from several CSVs. Some tables have a few million data, and some have tens of thousands.

A minimal reproducible example is something like

executor = ThreadPoolExecutor(max_workers=100)
# The actual code to generate models is far more complex,
# and some parts use bulk_create.
for model in models:
    executor.submit(lambda x: x.save(), model)
executor.shutdown()

So, here is the problem.
Django creates a connection to database for each thread automatically.
When the threads are finished, there is no way to close these connections because it is necessary to close each connection on each thread.

You just need to call a different function. e.g.

def save_and_close(model):
    try:
        model.save()
    finally:
        pass
        # close connection here

executor = ThreadPoolExecutor(max_workers=100)
# The actual code to generate models is far more complex,
# and some parts use bulk_create.
for model in models:
    executor.submit(save_and_close, model)
executor.shutdown()

Yes, that works, but each model opens and closes connection in that way.
However, a typical local three way handshake takes a few ms, and it is hard to accept it when there are many models and when the speed matters.

I ended up adding a line like
executor.map(lambda x: db.connections.close_all(), range(100))
It is a workaround but not ideal.

I haven’t used Django in a few years but this indicates it uses connection pooling if you tell it to: Databases | Django documentation | Django

Since each thread maintains its own connection, your database must support at least as many simultaneous connections as you have worker threads.

There is an answer in the same document. It is basically hard to share a connection between threads. It is not impossible but hard.

See also similar (old) feature requests: