Can `asyncio.Queue` be safely created outside of the event loop thread?

I know that asyncio.Queue is not threadsafe, per the documentation and the idea that it is only intended to be used to allow coroutines to communicate with each other on the event loop and thus within a single thread.

However, to me, this only implies that the methods put, get, etc. are not threadsafe. What isn’t clear to me is whether creating the queue outside of an event loop and the thread the event looop is running on is safe or not.

The context is that I have a PySide6 application, so PySide6 gets the main thread. I will then launch an asyncio event loop on a separate, non-main thread. (As a note, I am still learning the proper way to do this, so any thoughts on that are also welcome.) In order for the GUI application to communicate with the asyncio event loop, my plan is to create an asyncio.Queue in the PySide6 main thread and then pass it to the asyncio event loop coroutine that gets ran on its separate thread. Then I want to use asyncio.run_coroutine_threadsafe to run a coroutine that calls put or put_nowait to pass a “message” from the PySide6 main thread to the asyncio event loop.

1 Like

In general asyncio constructors may capture the running event loop, so it’s probably safest to assume everything needs to be done on a single event loop thread, regardless of whether it is currently fine in the cpython implementation.

You could pass in a threadsafe Future instead of a Queue and pass the queue back through the Future?

Sounds like you want thread-safe queues then. queue — A synchronized queue class — Python 3.12.2 documentation They should do everything you need.

Waiting on a message with such a queue will block the asyncio event loop though. I want to await a queue in the asyncio event loop such that it “wakes up” the coroutine in the event loop that will then process the queue and dispatch it to the correct coroutine/task. The asyncio event loop should continue all its tasks while a task is waiting on a message through the queue. queue is not suitable for that, to my understanding.

1 Like

Interesting. If that’s the case, why can’t one pass asyncio.Queue an optional loop?

The following code works as I expect, at least right now (to your point):

    def send_controller_message(self, message: ControllerMessage) -> None:
        coroutine = partial(send_controller_message, inbox=self._asyncio_queue, message=message)
        asyncio.run_coroutine_threadsafe(
            coro=send_controller_message(inbox=self._asyncio_queue, message=message),
            loop=self._asyncio_event_loop,
        )

What is the core recommended way to communicate between a non-event loop thread and an event loop?

I will need to do some more reading on that method. Because I also need a way to have the asyncio event loop communicate back to the PySide6 or application thread. I was potentially thinking of using asyncio.to_thread for that to put things on a regular threadsafe queue.Queue or using asyncio.to_thread with a PySide6 Signal.

You shouldn’t need to do that, just write to it directly from the coroutine

So encouraged by your comment to actually look at the code, I do see that asyncio.Queue does indeed implement the mixins._LoopBoundMixin.

mixins._LoopBoundMixin has a single method _get_loop which does indeed try to acquire the loop:

Going back to the implementation of asyncio.Queue, self._get_loop is only called inside get and put. So right now, it is safe to at least construct asyncio.Queue outside of the event loop it is intended to be used on.

I think it would be nice if the asyncio developers could chime in to say whether that is an intended invariant or not. If so, we could update the documentation to make this more clear either way, either stating that it can be constructed in a different thread than the event loop’s thread or that the queue must be constructed on the event loop thread it will be used on.

That makes sense since queue.Queue is threadsafe, but I was trying to preclude any possibility of locking up the asyncio event loop. Are your thoughts that putting on an unbounded queue.Queue should be fast enough to not be actually blocking?

I would use Queue.put_nowait.

I don’t think any threadsafe code guarantees to be non-blocking internally, probably not even to_thread, the thing to avoid is blocking waiting for a state to change (e.g. a full queue to have an item dequeued)

2 Likes

you can use asyncio.run_coroutine_threadsafe from another thread, to submit to an asyncio.Queue bound to an event loop using the normal put method, you’ll need a reference to that queue or to something else that has a reference to it, and the future you receive on the thread you are scheduling from can either be awaited or polled if you need to know when it finishes (based on if you have the ability to await in that context, or just check state otherwise).

Unfortunately, I don’t know of any good public examples of mixing asyncio with threading, especially with event loops in background threads, but this is the way I’ve gone about it. I can construct an example later if it would help to see an example, but it would be a bit contrived since I don’t have any pre-existing public code that does this.

1 Like

Follow up: this is an example of handling multiple event loops and thread safety, adapted from a graphics library I never ended up publishing.

# License: Not really a licensable concept IMO, but in case anyone has an overly strict compliance requirement your choice of MIT, Apache-2.0, MPL2.0, or unlicense.
import asyncio
import threading
from contextlib import contextmanager


class LoopWrapper:

    def __init__(self, thread_wrapped_loop):
        self._loop = loop

    def schedule(coro):
        """Schedule a coroutine to run on the wrapped event loop"""
        return asyncio.run_coroutine_threadsafe(coro, self._loop)

    async def run(coro, wait=False):
        """Schedule a coroutine to run on the background loop,
        awaiting it finishing."""

        future = asyncio.run_coroutine_threadsafe(coro, self._loop)
        finished = threading.Event()
        def future_done_callback(_f):
            finished.set()
        future.add_done_callback(future_done_callback)

        await asyncio.get_running_loop().run_in_executor(None, finished.wait)
        return future.result()

@contextmanager
def threaded_loop():
    def run_forever(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()

    loop = asyncio.new_event_loop()
    try:
        thread = threading.Thread(target=run_forever, args=(loop,))
        thread.start()
        yield LoopWrapper(loop)
    finally:
        loop.call_soon_threadsafe(loop.stop)
        thread.join()

It’s relatively fragile, and you’re responsible for ensuring the thread-safety of all created objects, I suggest if you go this route to come up with what you need to communicate between event loops early on and keep it very contained and limit direct access. asyncio doesn’t have a good high-level story for threading with multiple event loops, but all of the needed tools exist, and this should be a reasonable starting point for anyone working with similar problems.

1 Like

Thanks for the reply!

Yes, this is my exact use case, which is why I was concerned about creating the asyncio.Queue from a different thread than the event loop would run on, which is required for this strategy to work.

Apologies for not getting to your previous reply sooner, as I had already created an example once I looked into the code and indeed saw that asyncio.Queue() (the construction of it) is currently at least threadsafe.

Here is what I am doing in my current prototype, which is a PySide6 GUI application with an asyncio core. The asyncio core currently communicates back to the PySide6 GUI with Signals, which are threadsafe and don’t care if anyone is listening or not.

class MainWindow(QWidget):
    def __init__(self) -> None:
        super().__init__()

        # The the `asyncio` queue and event loop are created here, in the GUI thread (main thread),
        # but they will be passed into a new thread that will actually run the event loop.
        # Under no circumstances should the `asyncio.Queue` be used outside of that event loop. It
        # is only okay to construct it outside of the event loop.
        self._asyncio_queue = asyncio.Queue()
        self._asyncio_event_loop = asyncio.new_event_loop()

       # ...

    # ...

    def send_controller_message(self, message: ControllerMessage) -> None:
        """Send the `asyncio` event loop's `asyncio.Queue` a message by using the coroutine
        `send_controller_message` and sending it to run on the `asyncio` event loop, putting
        the message on the `asyncio.Queue`.
        """
        asyncio.run_coroutine_threadsafe(
            coro=AsyncController.send_controller_message(
                inbox=self._asyncio_queue,
                message=messag
            ),
            loop=self._asyncio_event_loop,
        )


class AsyncController:
    # ...

    @staticmethod
    async def send_controller_message(inbox: asyncio.Queue, message: ControllerMessage):
        inbox.put_nowait(message)


async def read_inbox(queue: asyncio.Queue, controller: AsyncController):
    while True:
        message = await queue.get()
        match message:
            case ControllerMessage.START_CAMERA_EXPOSURE:
                await controller.start_camera_exposure()

            case ControllerMessage.STOP_CAMERA_EXPOSURE:
                await controller.stop_camera_exposure()

            case ControllerMessage.ABORT_CAMERA_EXPOSURE:
                await controller.abort_camera_exposure()

            case ControllerMessage.GET_EXPOSING_TIME:
                await controller.get_exposing_time()


async def periodically_get_status(inbox: asyncio.Queue, controller: AsyncController):
    while True:
        await inbox.put(ControllerMessage.GET_EXPOSING_TIME)
        await asyncio.sleep(0.1)


async def async_controller_main(inbox: asyncio.Queue, signals: list[Signal]):
    controller = AsyncController(initial_state=Idle(), signals=signals)
    await controller.initialize()
    asyncio.gather(
        read_inbox(inbox, controller),
        periodically_get_status(inbox, controller)
    )



def start_asyncio_event_loop(loop: asyncio.AbstractEventLoop) -> None:
    asyncio.set_event_loop(loop)
    loop.run_forever()


def run_event_loop(inbox: asyncio.Queue, loop: asyncio.AbstractEventLoop, signals: list[Signal]) -> None:
    thread = Thread(target=start_asyncio_event_loop, args=(loop,), daemon=True)
    thread.start()

    asyncio.run_coroutine_threadsafe(async_controller_main(inbox, signals), loop=loop)


if __name__ == "__main__":
    application = QApplication(sys.argv)
    window = MainWindow()
    asyncio_queue = window._asyncio_queue
    asyncio_event_loop = window._asyncio_event_loop

    run_event_loop(
        inbox=asyncio_queue,
        loop=asyncio_event_loop,
        signals=window.signals
    )
    sys.exit(application.exec())

Yes, for sure. My plan is to use an asyncio.Queue to communicate into the event loop and either Signals and or a queue.Queue to communicate out. Internal to the event loop, the plan is to use several asyncio.Queues to communicate between the various Tasks acting as workers.

1 Like

If anyone is interested, here’s a more fully featured prototype using asyncio.Queues heavily to communicate between asyncio workers and also as a way for a PySide6 (Qt for Python) GUI to send messages to the asyncio workers.

1 Like