I have a Python Singleton class which exposes an API put_msg_to_queue
to users. This API puts a string message to queue. The Singleton Tester
class creates a thread which gets the message and just prints it.
Complete code used is given below. This code was working fine with Python 3.9.12
, but seems broken with Python 3.9.14
. The queue.get
API throws OSError
when process exits.
Other than handling this exception (the commented out code given below), please suggest how to adapt this code with new python version.
Probably below change mentioned in changelog caused this change in behavior.
Always close the read end of the pipe used by multiprocessing.Queue after the last write of buffered data to the write end of the pipe to avoid BrokenPipeError at garbage collection and at multiprocessing.Queue.close() calls. Patch by Géry Ogam.
# python -V
Python 3.9.12
#
# python sample.py
Closing..
Received msg: sample msg
cleaning
#
# python -V
Python 3.9.14
#
# python sample.py
Closing..
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python3.9/threading.py", line 980, in _bootstrap_inner
self.run()
File "/usr/lib/python3.9/threading.py", line 917, in run
self._target(*self._args, **self._kwargs)
File "/root/sample.py", line 32, in print_data
record = self.myqueue.get(timeout=0.3)
File "/usr/lib/python3.9/multiprocessing/queues.py", line 117, in get
res = self._recv_bytes()
File "/usr/lib/python3.9/multiprocessing/connection.py", line 217, in recv_bytes
self._check_closed()
File "/usr/lib/python3.9/multiprocessing/connection.py", line 141, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
#
# cat sample.py
#!/usr/bin/python
import queue
import multiprocessing
import time
import threading
import atexit
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class Tester(metaclass = Singleton):
def __init__(self):
self._is_close = False
atexit.register(self.close)
self.myqueue = multiprocessing.Queue(-1)
self.reader_thread = threading.Thread(target=self.print_data)
self.reader_thread.daemon = True
self.reader_thread.start()
def put_msg_to_queue(self, msg):
self.myqueue.put(msg)
def print_data(self):
while (not self._is_close):
try:
record = self.myqueue.get(timeout=0.3)
print("Received msg: " + str(record))
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except queue.Empty:
pass
#except OSError as ex:
# if str(ex) == "handle is closed":
# print("Handle is closed, breaking")
# break
print("cleaning")
self.myqueue.close()
self.myqueue.join_thread()
def close(self):
print("Closing..")
self._is_close=True
self.reader_thread.join(5.0)
tester = Tester()
tester.put_msg_to_queue("sample msg")