Duplexing Pipes/Queues

Hi, I need bi-directional IPC for 2 processes (processes not threads). I’ve ended up creating 2 separate multiprocessing.Queue objects, one for each way.

I started using sockets until I caused a deadlock by both trying to sock.send() before the other had sock.recv(). So then I switched to a single Queue, but found that the sender would dequeue its own object before the other process had a chance to Queue.get() it. So to avoid that I’ve got 2 Queues.

I’ve failed to find easy duplex examples on google (mainly stackoverflow). I know I could write a ping-pong approach so each side knows whose turn it is to send/receive but I’ve avoided this complexity with 2 Queues but I feel there maybe a better way. Let me know if you can help

You can use sockets, but you cannot use blocking I/O.

Either use async I/O or poll using one of the functions in the select module on non blocking sockets.

queue.Queue does not work between processes, only between threads.

what about a SimpleQueue :upside_down_face:

I’m using multiprocessing.Queue. Seems to be working fine. SimpleQueue doesn’t have a put_nowait method so I’ve avoided it it. Want to fire and forget between processes

Do you mean that each of the two processes has its own “messages-in” queue? Or do you mean you have two shared multiprocessing queues, one for “messages-out” and one for “messages-in”?
If you have shared queues, I assume you are using a self-defined protocol for passing and processing objects? (How do your processes know if an object on the queue is sent by them or to them?)

It’s the former, so the processes know which queue to get messages from and which to put messages onto.

I tried the latter with a single queue but I had exactly the problem you describe. The sender would often dequeue packets intended for the other process

So, it seems to me that in this kind of setup you almost always want to have a little custom protocol, no matter whether you have one queue or multiple queues. So, each process knows its own identifier, and any message would explicitly contain the id of the sender and the receiver. Would that not solve your problem?

No because a process needs to deque the message to read the protocol. If processA deques a message and finds out it is not the intended recipient it is already too late; the message has been removed from the queue and processB has missed its chance

Yes, that’s a bit of a chicken and egg problem… Well, then why not use multiprocessing.Pipe(duplex=True) for the communication? Then each process would only need it’s own (non-shared) queue of incoming messages.

Yes that’s what I’ve got. 2 processes, 2 queues. But each queue only operates in one direction so the duplex option can be false. But this is what makes me think I’m missing a trick. I expect there to be a simple way to duplex a single queue but so far I’ve found I’d need to implement a ping-pong algo to make that work

I still don’t get why you’d need a ping-pong algorithm - By that I suppose you mean one where A sends and then needs to wait for B to respond? Why do you need that kind of synchronization (unless the business logic requires synchronization of course, but then there is also no problem)?

If you have a single shared data structure, then a queue doesn’t seem appropriate. I’m reminded of the Linda and Ease languages – which almost no one seems to know about :slight_smile: These are languages for parallel computing. The basic idea is that you have a “space” or “blackboard” (not a queue) in which asynchronous processes or threads can read/write/modify messages. So the processes can take any task that is suitable, that they can handle, and when done they may write a result back into the same shared space. So, it’s a collaborative multi-task environment, without inherent need for a central scheduler. (Kind of like a community forum :slight_smile: )

I need a ping pong algo because of this:

That’s exactly what I don’t understand. Why is the sender putting an object on its own queue of incoming messages? If you have 2 processes, then imo each would need to have its own (non-shared) queue of incoming messages (so the queue would be an internal implementation detail of that process, it doesn’t have to be known outside). Using two shared queues doesn’t make sense to me here, since then yes, you’re losing all the advantages of asynchronous processing and forcing the processes to walk in a ping-pong lockstep… Or am I still missing sth?

Btw - as an aside – just to show you that queue usage and design is not trivial even for the best among us, you might find this thread interesting: Queue termination :slight_smile:

Consider implementing a server-client architecture. Utilize a blocking socket in the main thread, as there’s no immediate action required until the next message arrives.