I want to control the input and output streams through subprocess.Popen, but when I encounter the input function, I always need to input before I can get the input parameters from stdout, which is not consistent with the actual flow。Here is the code involved:
def thread_monitoring(self):
process = Popen(
# Enable unbuffered mode
["python", "-u", self.script_path],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
text=True,
bufsize=1, # 行缓冲
universal_newlines=True,
encoding="utf-8",
# Force the child process to use unbuffered output, but will affect the encoding scheme, abandoned
# env={"PYTHONUNBUFFERED": "1"}
)
def read_stream(stream, stream_type, output_queue):
"""Read the output stream of the child process (stdout or stderr)"""
while True:
try:
line = stream.readline() # .encode("utf-8").decode("latin1")
if not line:
if process.poll() is not None:
break
continue
output_queue.put((stream_type, line.rstrip()))
except Exception as e:
output_queue.put((stream_type, f"Error: {str(e)}"))
break
# Create an output stream queue
output_queue = Queue()
# Start the output read thread
stdout_thread = Thread(
target=read_stream,
args=(process.stdout, "[stdout] " if self.stream_id else "", output_queue),
daemon=True
)
stderr_thread = Thread(
target=read_stream,
args=(process.stderr, "[stderr] " if self.stream_id else "", output_queue),
daemon=True
)
stdout_thread.start()
stderr_thread.start()
# The main thread processes the input
while process.poll() is None and self.thread_lock:
try:
# Processing output
try:
stream_type, output = output_queue.get(timeout=0.1)
self._receiver(f"{stream_type}{output}\n")
except Empty:
pass
# Processing input
try:
user_input = self.input_queue.get(timeout=0.1)
if process.stdin and not process.stdin.closed:
process.stdin.write(user_input[10:] if self.stream_id else user_input + "\n")
process.stdin.flush()
except Empty:
if len(self.user_input) > 0:
if process.stdin and not process.stdin.closed:
process.stdin.write(self.user_input[0][10:] if self.stream_id else self.user_input[0] + "\n")
process.stdin.flush()
del self.user_input[0]
except Exception as e:
self._receiver(f"Error in main loop: {str(e)}\n")
break
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=5)
except:
process.kill()
# Processing other outputs
while not output_queue.empty():
stream_type, output = output_queue.get()
self._receiver(f"[{stream_type}] {output}\n")
# Pipe closure
for pipe in [process.stdout, process.stderr, process.stdin]:
if pipe and not pipe.closed:
try:
pipe.close()
except:
pass
# Wait for the output thread to finish
stdout_thread.join()
stderr_thread.join()
In addition to this one bug, other functions can be implemented, for example, there is a print(“123”) in front of input,123 can be output normally, but input parameters need to be input content before stdout