Seamless async and network messaging

Is there anyone out there using some combination of items below to implement complex and responsive networking?

  • threading.Thread
  • asyncio.start_server
  • reader, writer = await asyncio.open_connection
  • data = await reader.read(100)
  • await reader.read(100)

Have you also mastered the issue of getting instances of complex messages on and off the streams?

Have you also implemented a separation of your streaming machinery and your message handlers, that you are satisfied with?

How do you communicate/interact with your threads?

What happens when multiple threads send messages down the same stream?

If you havent quite conquered all of these issues or you are curious about a pypi library that takes care of all of these things, you might find something interesting in the following code;

	import ansar.connect as ar
	
	class User(object):
		def __init__(self, name='', age=0, height=0.0):
			self.name = name
			self.age = age
			self.height = height
	
	ar.bind(User)
		
	def server(self):
		any = ar.HostPort('127.0.0.1', 0)
		ar.listen(self, any)
		listening = self.select(ar.Listening)
		self.send(listening.listening_ipp, self.parent_address)
		accepted = self.select(ar.Accepted)
		user = self.select(User)
		self.console(f'Hi {user.name}, are you really {user.height}m tall?')
		self.select(ar.Abandoned)
		self.select(ar.Stop)
	
	ar.bind(server)
	
	def client(self, address):
		ar.connect(self, address)
		connected = self.select(ar.Connected)
		server = self.return_address
		self.send(User(name='Cheryl', height=1.5), server)
		self.send(ar.Close(), server)
		self.select(ar.Closed)
	
	ar.bind(client)
	
	def main(self):
		s = self.create(server)
		address = self.select(ar.HostPort)
		c = self.create(client, address)
		self.select(ar.Completed)
		self.send(ar.Stop(), s)
		self.select(ar.Completed)
	
	ar.bind(main)
	
	if __name__ == '__main__':
		ar.create_object(main)

Under the hood this code creates platform threads for main, server and client. The server establishes a network listen and then sends the listen address to its parent (main). The client uses the address parameter to initiate a connection and then it sends an instance of the User object over that connection. The server receives that object and logs a diagnostic. Which covers most of issues mentioned above.

To see this run, put the code in a file called play.py and execute these commands;

	$ python3 -m venv .env
	$ source .env/bin/activate
	$ pip3 install ansar-connect
	$ python3 play.py --debug-level=DEBUG

Both client and server are in a single module mostly to save space, but also as another demonstration of asynchronicity. The send() and reply() methods are used to communicate between threads and across network transports - same methods. The User class can contain just about any type including collections and further user-defined types. Both the client and server can send messages at any time.

Note that function definitions like main, server and client can be replaced with special class definitions. A class can be used to define a FSM;

	class Server(ar.Point, ar.StateMachine):
		def __init__(self):
			..

To switch from using the function version to the FSM version is trivial;

	def main(self):
		s = self.create(Server)
			..

Among other significant runtime differences, FSMs are not allocated threads in the same manner as functions.