Help with Quattro/Asyncio

Hi all,

I’m trying my hand at using Quattro to keep my sanity but I have a problem I don’t understand. I’ve used Python in the past but never Quattro nor Asyncio.

I’m writing a program for a PA to emulate a sort of dumbed down P2P network. Why choose Python given I’m not that experienced with it? Java was the other option…


I reduced the program down to the following single-file MVP that demonstrates the problem:

import asyncio
import sys
import socket
from socket import socket as Socket
from quattro import TaskGroup

HOST = "127.0.0.1"

async def peer_handler(conn):
    print("Accepted a connection")
    while True:
        print("Still kickin'!")

async def connect_to(handler, port):
    print("DEBUG ({}:{})".format(__file__, sys._getframe().f_lineno))
    with Socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect((HOST, port))
        await handler(s)

async def connect_to_peers(peer_ports):
    async with TaskGroup() as tg:
      print("DEBUG ({}:{})".format(__file__, sys._getframe().f_lineno))
      for peer_port in peer_ports:
          tg.create_task(connect_to(peer_handler, peer_port))

async def listen_to_peers(peers_port):
    print("DEBUG ({}:{})".format(__file__, sys._getframe().f_lineno))
    async with TaskGroup() as tg:
        with Socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            print("DEBUG ({}:{})".format(__file__, sys._getframe().f_lineno))
            s.bind((HOST, peers_port))
            s.listen()

            while True:
                print("DEBUG ({}:{})".format(__file__, sys._getframe().f_lineno))
                conn, addr = s.accept()
                print("DEBUG ({}:{})".format(__file__, sys._getframe().f_lineno))
                print("Accepted {}".format(peers_port))
                args = make_args(conn)
                tg.create_task(peer_handler(conn))

async def main(args):
    # PEERS_PORT [PEER_PORTS...]
    #   PEERS_PORT => Port where we'll listen for peers
    #   PEER_PORTS => Ports of peers we'll try to connect to
    # Example:
    #  server: 4000
    #  client: 4001 4000
    peers_port = args[0]
    peer_ports = args[1:]

    async with TaskGroup() as tg:
        tg.create_task(listen_to_peers(peers_port))
        tg.create_task(connect_to_peers(peer_ports))

    print("The End")

if __name__ == '__main__':
    asyncio.run(main(list(map(int, sys.argv[1:]))))

The plan is to have a thread listening on peers_port and, for each peer in peer_ports, have a thread try to connect to it.

When converting from the previous attempt using threading.Thread I started with connect_to_peers() and it worked! The problem started when I converted listen_to_peers().


The actual problem: in listen_to_peers() no connection is ever accepted. Unfortunately I confirmed with nc localhost 4000 that the listener is working.

So why the heck was the program working after converting connect_to_peers() but stopped after converting listen_to_peers(), even though the problem is not with listen_to_peers()?!

Any help very much appreciated! I’m stuck because of this and the deadline is tightening fast… :s

I’m not entirely sure of your port number usage, but if you want to emulate a P2P system, I would recommend picking a single port number to use everywhere and then having each client run on a unique IP address. That makes the logic a lot easier to get your head around. (For testing, you can use 127.0.0.1 and 127.0.0.2 which are both “yourself”. In fact, all of 127.x.y.z is going to point back to your own computer.)

Your listen_to_peers function seems like a fairly straight-forward socket listen loop, but the trouble is, you haven’t put any await points in it. All you’re doing is standard blocking socket calls, which means you’re stalling out your event loop. With asyncio, until you reach an await (or an async for or async with etc), the current task keeps on running.

Fortunately, asyncio knows how to make socket servers for you! Here’s an example from the docs: Streams — Python 3.11.0 documentation

But every one of your tasks has to be written with await points in it. Anything that spins without awaiting (eg spamming “Still kickin”) will stop the rest of the program from running. Unfortunately, this is one of the fundamentally hard aspects of asyncio; with threads, a borked thread can be more easily isolated, but with tasks, they have to cooperate perfectly or the entire system collapses. It may be easier to first design a server/client system (with the two in separate programs), and then look at combining them to create your P2P system (where every node is simultaneously a server and a client).

2 Likes

I didn’t know that, that’s awesome! In the future I’ll be running all this in CORE. I was using different port numbers just to test things out without CORE (I didn’t know about 127.*.*.*).

I see, that’s good to know. I was under the impression that this “framework” was just syntactic sugar to work with threads in a more obvious (and easier) way.

Thanks a lot for your reply! I think this will be enough to unblock me. I’ll report back later.

Progress, they can connect to each other! Now I can continue work. Thanks again!

1 Like

Yeah! The entire 127/8 block is meant to be to localhost, although some systems cheat and only respond to 127.0.0.1. The only caveat is that, if you’re going to have independent servers on 127.0.0.2, 127.0.0.3, etc, you have to then bind to their IP addresses correctly - and possibly have the clients bind to IP addresses too. That’s definitely a good feature to have eventually, but it’s extra complexity when getting started.

Excellent!

1 Like

Asynchronous I/O is quite different from threading. There are advantages and disadvantages to both. Broadly speaking, async programs have an event loop that asks the system to let it know when “this, this, this, or this” happens, where threaded programs have individual threads that are each going “wait until this happens”, “wait until this happens”, etc.

Threads pay a price in system overhead (they have individual call stacks and other context). There is likely to be a limit to the total number of threads your computer can run at a time, and that number is a LOT lower than the total number of open sockets you can have. This can mean that, in a worst case scenario, your app could get clogged because of a huge number of broken connections that the system hasn’t noticed are broken yet.

Async I/O, instead, pays a price in coding rigour (you absolutely have to go back to an await point or other tasks can’t run). It’s remarkably easy to miss noticing something where you stall out, particularly if your test system is sufficiently fast that it isn’t a problem. One extremely sneaky culprit is printing log messages to the console. Yes, that’s going to be pretty fast in development (printing to the actual console), and in production, you might point that to a good log file for future analysis; but if you accidentally point the log file to a network share, your app could get clogged just waiting for a basic log operation because you never thought to build an async/await friendly logger.

Nothing is perfect. Choose your tradeoffs!

Thanks for the details! I get the picture now.

Given your explanation, I think using tasks alone won’t do for my PA. I thought about it a bit and think that I’ll need at least two OS threads: one for the code I showed before, and another for the actual objective of the PA (streaming video).

So I was wondering what would be the best/most idiomatic way to get this. I came up with something like this:

from concurrent.futures import ThreadPoolExecutor as ThreadPool
from quattro import TaskGroup
import asyncio

def start_network_tasks(host, peers):
    async def _start_network_tasks():
        async with TaskGroup() as tg:
            # Like before -- the listeners &c
            #tg.create_task(listen_to_peers(host))
            #tg.create_task(connect_to_peers(peers))
            tg.create_task(...)

    asyncio.run(_start_network_tasks())

def start_client_tasks():
    # Similar to start_network_tasks()
    pass

def main():
    with ThreadPool(max_workers=2) as tp:
        tp.submit(start_network_tasks, '127.0.0.1', [])
        tp.submit(start_client_tasks)

    print("The End")

if __name__ == '__main__':
    main()

It seems to be working the same as before, so I’m crossing my fingers. In my head this makes complete sense: start two threads, one thread runs a set of tasks and the other another set of tasks.

I read some other options on SO (e.g. multithreading - How to combine python asyncio with threads? - Stack Overflow, multithreading - How to combine python asyncio with threads? - Stack Overflow) but they seemed more complicated and more fine-grained than what I need. Is there anything wrong with this approach?

Possible followup questions:

  1. can tasks of different threads interact safely (i.e. no concurrency problems)?
  2. can tasks of different threads interact without blocking entirely? I imagine that threading.Lock will block tasks, if their thread is trying to acquire the lock. Can asyncio.Lock be used (safely) across different OS threads?

Combining async I/O with threads is definitely possible. The easiest way is to dedicate your main thread to the asyncio event loop, and then spawn other threads as needed (possibly before the event loop starts, possibly after). Those other threads have nothing whatsoever to do with asyncio, and just do their own work.

So in this case, you can have your arbitrary number of P2P connections running through async I/O (and thus safe against socket spam), while the video streaming runs in its own thread - a single other thread. You won’t have to worry about ThreadPool or anything; starting explicit threads is done thus:

def thread():
    ...

t = threading.Thread(target=thread)
t.start()

You can spawn and start the thread from inside your async main() function, or prior to going into asyncio.run() - either way is fine.

asyncio.Lock is not thread-safe. So you’ll need to be using threading.Lock, but you’re right, the most common way to do this IS blocking. Fortunately, non-blocking lock acquisition is possible:

async def some_task():
    if some_lock.acquire(False):
        print("Got the lock!")

How this should be incorporated into your broader logic is another question, though. What exactly are the tasks going to be doing that depends on synchronization with the video thread? It might be that there’s a facility for doing what you want, such as a thread-safe queue or some kind of event signal with a payload.

Hope that helps!

Thanks a lot for your detailed reply again, it does help for sure! Now I know where to look and some of the options available. Have to think how best to do this…

1 Like