Asyncio.run_coroutine_threadsafe does not send websockets messages in time but only after main loop coroutine is finished

This bug is windows specific, on Linux the code works ok. The code below is demonstrating the issue:
1.start server.
2.start client.
3. will see that on windows messages to client come all after 4 sec. Has to come every 0.3 second.
How to overcome the bug?

client:

import aiohttp, asyncio
from aiohttp import web,ClientSession

async def client_run():

    async with ClientSession().ws_connect('http://localhost:8000/') as ws:
        
        async for msg in ws:
            print(msg.data)
                

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(client_run())

server:

from aiohttp import web
import asyncio
from threading import Thread

ws = None

def sync_send(obj, loop):
    asyncio.run_coroutine_threadsafe(ws.send_str(obj), loop)            

def test_send():
    loop = asyncio.new_event_loop()
    import time
    def f(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever() 

    async_thread = Thread(target=f, args=(loop,))
    async_thread.start()

    for i in range(3):
        time.sleep(0.3)
        sync_send(str(i), loop)

    time.sleep(3)

async def echo_handler(request):
    global ws
    ws = web.WebSocketResponse()

    await ws.prepare(request)

    test_send()
    
    async for msg in ws:
        ws.send_str(msg.data)

    return ws


app = web.Application()
app.router.add_route('GET', '/', echo_handler)    
web.run_app(app, host='localhost', port=8000)

You’re creating a new event loop AND a new thread for every incoming request. This is very complicated. At very least, that makes it harder to track down the cause of the issue here.

I presume you can’t replace all of that with asyncio on a single loop, as that would normally be the most obvious way to write this. Can you elaborate on why the need for multiple event loops?

My gut feeling is that this would most likely work better if the thread (I’m assuming that this is necessary; in the toy example here, it isn’t, but if the real task depends on something that can’t be done asynchronously, then you’ll need that thread) sends messages back to the server’s one and only asyncio event loop, which then manages everything else. Ideally, spin off a limited number of threads (rather than one new one for every request), and then they can simply notify the main event loop, which can manage all the actual I/O.

“You’re creating a new event loop AND a new thread for every incoming request.”
No, it is only for simplifying and real code does not have such issue and well optimized. https://github.com/Claus1/unigui/blob/main/unigui/users.py.
“Can you elaborate on why the need for multiple event loops?”
Because I have send to send messages outside main async cycle- broadcasting, e.t.c.
Details: https://github.com/Claus1/unigui/blob/main/unigui/server.py

So the code has to work and works in Linux, but buggy in windows. How to fix?

This code is just undefined behavior, just because it works for Linux now does not mean it will in the future. You just got lucky that it works there.

Can you share a link, why it is undefined? For me it has to work, nothing special. I do not use hacks…

Your linked code is still working with multiple event loops, though, so that’s a measure of complexity. Are you sure this can’t be done on the main event loop? What is the isolation between event loops providing?

Every bit of complexity in the example makes it harder to work out what’s happening.

Blockquote Are you sure this can’t be done on the main event loop? What is the isolation between event loops providing?

I am sure I have to send messages from sync code, because user handlers in my custom layer are normal sync functions.
So I need any method, hack or library that will do it on Windows. Otherwise my framework will be only Linux compatible. as last 5 years)

Currently, your thread sends a message to a special event loop just for that purpose. What I’m asking is whether that run_coroutine_threadsafe could instead be sent to the primary loop. That way, you only have ONE asyncio thread, plus the threads that are doing the other work.

Of course it will not work. Messages will not be sent until main loop is finished. In my example messages will come all after 4 sec. It is inappropriate and do not have sense. The second thread has to send, only a way i found. So asyncio forced me to use second loop and it works in Linux only.

I don’t understand. You have an asyncio event loop, but you can’t add something else to that loop until the main loop finishes?

The entire point of asyncio is to run multiple tasks concurrently. When one’s waiting, others continue. Maybe the issue here is the time.sleep calls?

I can add calls send_str but message will not be sent until the adding function is finished. It is nonsense, messages has to be sent in time, not in 1 hours. Imagine long time process which is inform every 5 sec about it current state. not async. a framework user programs the process and call in there
progress('Progress now is 18%) then progress('Progress now is 19%), how generate in time message from progress function. Force the user use await async stuff? It is silly for me.

Sleep emulates some code executing only. Process can takes 1 hours, how to inform about its state outside the sync code?

Is it an hour of CPU usage, or is it doing something else? It’s very hard to judge from the example.

Can you simply put messages on a queue for the main thread to retrieve? That would be a far more common pattern.

as I said before "I can add calls send_str but message will not be sent until the adding function is finished. " how to make the server code above to work as expected? May be I really do not see magic trick. Main thread loop do not send messages until current func is finished, despite of place they come, from queue or directly.

In your example, you have time.sleep calls in your main thread. I don’t understand your original code well enough to know what’s giong on, but the example is definitely going to have problems here.

My main thread has to execute user layer sync code. From that sync code user wants to immediately send message to inform the remote client. client.py as code above. the message has to be sent before sync code is finished.
How to change def test_send in code above for getting messages in time with test_send has to stay synchronous. ?

It appears to be a CPU-bound algorithm capable of reporting. I would recommend using a synchronous HTTP server for this purpose.