Version of multiprocessing.Queue that works with unrelated processes?

I’m looking for an implementation of a cross-process queue, to pass data between two parts of my app. The only one I can find in the stdlib is multiprocessing.Queue, which requires the two processes be managed by multiprocessing (which isn’t the case for me).

Is there a more general queue implementation that I could use? All I’ve found via google is links back to multiprocessing.Queue or heavyweight solutions like RabbitMQ. I’m after something lightweight, reasonably robust, and not terribly slow, which is why I don’t want to just hack something up for myself. Just in case it matters, I also want something self-contained, so (for example) a redis-based solution isn’t any use to me (but a sqlite-based one would be fine). I probably could write something, but that would convert a minor aspect of my current project into a side-project in its own right - and I’m sufficiently easily distracted that I really don’t need any more of those :slightly_smiling_face:

We used unix domain sockets (UDS) to send pickled messages between processes.
On windows you would use named sockets.
Both guarantee to not lose data.

Its not that much code for 2 processes. 1 to many is more complex.
You have to manage blocking on the both sides.
If async manage retrying the write once the read side pulls data out of the pipe.
Also need to set the size of the pipe with an ioctl as the defaults are rather small.

That will be a small side project :slight_smile:

Sounds like you want a socket or a pipe. Both are cross-process and supported on many platforms. Both are language-agnostic (as they’re OS-level facilities). If you control both ends of the pipe (as implied by them being “two parts of [your] app”), the simplest is probably an unnamed pipe but it might be easier to use a named pipe or a TCP socket. A TCP socket has the advantage that it can even go across computers if needed, but naturally that brings with it some extra overhead.

Indeed, that’s what I imagined in terms of implementation. I was looking for a higher level abstraction built on them, though, not just the raw bytestream. Although given what I’m familiar with, I’d honestly probably use a sqlite database to transport the data if I had to write it myself. I could write something like that fairly quickly. It would be clumsy, and wouldn’t perform very well, but it would do the job. If I had to learn socket programming, or design a message framing protocol on a bytestream, what I’d end up with would likely be far worse.

I was hoping for something higher level I could just pick up and use. I can’t quite work out whether the fact that there isn’t something obvious for this is because it’s an unusual requirement, or because there’s actually a really simple way of doing it that I just don’t know (or maybe I’m over-complicating the whole thing in my mind)…

As I said, multiprocessing.Queue is exactly what I want, except that it’s based on (anonymous) pipes, and so doesn’t work for completely independent processes.

Ah, yeah, I’m not aware of something prebuilt. It’s not too hard to throw something together using JSON or pickle through a pipe, and that means there’s sure to be something on PyPI, but I’m not aware of any in particular to recommend.

I have to say, that looking at multiprocessing.Queue, that doesn’t look like something I’d call “not too hard” :slightly_smiling_face: As usual, I suspect “make sure it’s robust” is where all of the complexity comes in…

While I did say I want something lightweight and “reasonably” robust, the application in question is doing a lot of work over a long-ish period (hours, not days), and so “not having the data transfer fail for some weird reason part way through” is important. So protecting against synchronisation errors and that sort of thing is worthwhile to me.

Agreed, but a simple idea of “just push stuff into a pipe”, while not nearly as robust as a proper mp.Queue, is going to cover a lot of your needs.

Fair enough! Sounds to me like the idea of using an external broker (you mentioned SQLite) would be the right choice here. Eliminating the concern of “what if data transfer doesn’t work properly” may well be worth a bit of a reduction in raw throughput.

In a previous life (that is, before I retired), I used Kafka and really liked it. You’d have to set up a broker somewhere, but then the pubsub stuff is straightforward.

Edit: Not “pubsub,” but “producer/consumer.”

Interesting. I’m still reluctant to need a separate service, but maybe I’m just pushing back too hard on something that really is the best way. This originally came from a suggestion that I could push/pop from a redis list, which would be fine except for the need to fire up a redis instance. But it’s not like running redis is hard (as far as I can tell!) - it’s just that the docs tend to talk in terms of setting up a permanent service and setting it to autorun, etc, rather than just firing it up for one job then shutting it down. So it feels like a bigger overhead than maybe it is in practice.

A quick skim of the Kafka “quickstart” document feels like there’s quite a lot of setup needed (which is probably what you meant by “set up a broker somewhere”). But maybe there’s a “really quick start” that I’m missing there, as well.

I freely admit that someone else had already set up the broker. I was just the guy doing the producing and consuming. I wouldn’t be surprised if you found a Docker image for a broker though. That might be the quickest start.

1 Like

Another (kind of odd) solution could be something like Part 3: Services and New Style RPyC — RPyC

Then expose a ‘service’, connect to it and call a python function that just adds to that process’ queue.

Have you considered ZeroMQ? This is built on top of sockets, but provides message framing, so the API only gives you data when a complete message is received. The PUSH/PULL socket type gives you something very like a queue; if both sides are running Python, you can pickle one object into one message and unpickle it on the other side. If not, you can use something like JSON or msgpack. The Python bindings (pyzmq) are well maintained.

Unlike other ‘MQ’ systems, processes using ZeroMQ connect directly, so you don’t have to run a separate broker.

It does have some odd (to my mind) design decisions, though. E.g. it lets you connect to a socket before it has been bound - it will retry making the underlying connection in the background. This is nice if your producer & consumer are starting together and you don’t know which will be ready first - one binds, one connects, and ZMQ makes it work. But if you connect to a socket that doesn’t exist, you have to wait for a timeout rather than getting an error straight away.

I did take a look at it. I’m not sure if I tried PUSH/PULL (it was quite a way down in the ZeroMQ guide, and I got stuck at PUB/SUB, which looked like what I wanted but wasn’t). Also, from what I recall I had trouble setting up the sockets - I got things like “socket in use”. Part of the problem there might have been that I don’t need the “ventilator/worker” part of the example, as I’m using an existing worker scheduling library (huey). And I got stuck trying to create the worker-end sockets without getting “socket in use”. But maybe I was getting that with PUB/SUB, so I should probably have another go.

Thanks for the suggestion. ZeroMQ does look like the right sort of level (once you get past the setup, which makes me do yucky low level things like picking port numbers :wink:) so I’ll take another look and focus on PUSH/PULL this time!

For picking port numbers, you can bind with a port number of zero, which allocates an unused port (or pyzmq has a bind_to_random_port method to make this explicit). But then of course you need to communicate this somehow to the process that will connect to it. :slightly_smiling_face:

Looks like Huey also has its own queuing system which can use sqlite as a backend (if you don’t want to run Redis). Maybe reusing that would fit together better? I don’t know what your overall system looks like, though.

This is all to handle the fact that Huey doesn’t have a means of receiving worker results as they complete (apart from polling, which I’m trying to avoid). But yes, there is a distinct element here of me over-engineering things, which is mostly because I want to learn what options are available.

1 Like

How about celery’s Task Queue with an sqlite backend? I’ve used it with a mysql backend; I think sqlite is possible too.

Thanks, but celery needs a broker setting up. As I said in my original post, I want something that doesn’t need infrastructure like redis or RabbitMQ.

I got your point that you don’t want to install redis or rabbitmq. I’ve got an app in prod (for several years) using celery with an sqlalchemy/mysql broker (I was in a similar situation). It seems this works with sqla/sqlite broker too. That said, I see celery have dropped the older docs describing this experimental feature. The version of celery I am using is 4.4. It might be worth a try still.

In the vein of ZeroMQ, there is also pynng. I found it pretty straightforward to construct multiprocess pipelines with that. I don’t think it would take much to write a little queue wrapper that handled pickle/unpickle and the NNG connections, but I haven’t done that specifically so of course your mileage may vary. :slightly_smiling_face:

1 Like

Thanks @takluyver! Having worked through the PUSH/PULL example so I understood it, and then used it to implement what I was doing, it does indeed work perfectly for my use case, and requires very little code to set up. Admittedly I went for a messy “just get it working” approach - packaging it up a bit more nicely would take a little more effort. But it’s a great approach, and I now have another tool in my toolkit, so your help is much appreciated.

1 Like