Python 3 popen does not work (subprocess hangs waiting for data to be returned)

Good day, i apologise but i spent 4 days straight on trying to get this to work

basically i run a cron job once an hour that taps a pgsql database to decide which back jobs to run and at what frequency

the cron job runs a second python script that actually run freebsd tar (bsdtar) as a 3rd process

simply put tar seems to run ok but the 2nd script (what invokes the tar command) hangs waiting for the data from bsdtar program and never gets back the tar output

cron pythong program runs a straight popen (no pipes etc, meant to fork to the background)

backup.py (that runs tar) uses popen with threading to capture the full output (see commands_large) class below, returns data most of the time but not all of the time

running python 3.7, 3.8, 3.10

on

Freebsd 12.1 / 13.2 / 14.1

just can not get a subprocess to reliabily return the tar output.

more then happy to donate to the cause (just say how much & where)

relative code below :

class commands: #Standard run with shell (no background, smal output)
def init(self,command) :
self.command = command
#print (self.command)
self.output = ‘Error’
self.status = ‘255’
#print (sys.version)

	#result = subprocess.run(self.command, shell=True, capture_output=True, text=True, encoding='ASCII')
	
	result = subprocess.Popen(self.command, close_fds=False, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
	result.wait()
	
	stdout, stderr = result.communicate()
	
	self.status_error = int(result.returncode)
	
	self.output = stdout.decode("UTF-8" ,errors="ignore")
	self.error = stderr.decode("UTF-8",  errors="ignore")
	
	
	
	#print (self.error)
	#sys.exit()

	self.error_cr = self.error.split('\n')
	
	#print ('Error Code   : %s\n' %self.status_error)
	#print ('Output       : %s\n' %self.output)
	#print ('Error Output : %s\n' %self.error)

	try:
	  self.cr = self.output.split('\n')
	except :
	  self.cr = []
	try:
	  self.count = len(self.cr)
	except :
	  self.count = 0

	if self.status_error == 0 :
		self.status = 'OK'
	else : 
		self.status = 'BAD'
	
	#print (self.status)
	
	#return count=number of lines, cr = lines split, getoutput = actual output returned, status = return code

	return

class commands_large: #run a system command return large output. NO shell
def init(self,commands_large) :
self.command = commands_large

	#self.args = commands_large.split(' ',1)[1]
	
	#self.command = ["/usr/bin/tar", "-v", "-y", "--exclude", "/rmt", "--exclude", "/usr/ports", "-c", "-f", "/offsite-1/backup.test/BACKUP.temp.bz2", "/var"]
	
	#print ('Running : %s' %self.command)
	#print ()
	#print ('With    : %s' %self.args)
	#print ()
	#print (self.command)
	self.output = 'Error'
	self.status = '255'
	#print (sys.version)
	import subprocess
	import threading

	def read_stream(pipe, output_list):
		for line in iter(pipe.readline, b''):
			#output_list.append(line.decode().strip()) # Decode bytes and strip whitespace
			output_list.write(line.decode().strip() + '\n')
			
	# Example usage
	#command = "/programs/common3/hddstatus".split(' ') # Replace with your command
	#stdout_lines = []
	#stderr_lines = []
	#print ('Opening tmp files')
	temp_file = str( time.time() )
	stdout_lines = open('/tmp/std_out.%s' %temp_file,'w')
	stderr_lines = open('/tmp/std_err.%s' %temp_file,'w')
	
	
	#self.command must be in format ['xxx','yyy','zzz')
	#print ('Running : %s' %str(self.command) )
	proc = subprocess.Popen(self.command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

	stdout_thread = threading.Thread(target=read_stream, args=(proc.stdout, stdout_lines))
	stderr_thread = threading.Thread(target=read_stream, args=(proc.stderr, stderr_lines))

	stdout_thread.start()
	stderr_thread.start()

	stdout_thread.join()
	stderr_thread.join()

	# Wait for the subprocess to finish
	#print ('Waiting ....')
	proc.communicate()
	proc.wait()

	stdout_lines.close()
	stderr_lines.close()
	
	f_stdout_lines = open('/tmp/std_out.%s' %temp_file,'r')
	stdout_lines = f_stdout_lines.read().split('\n')
	f_stdout_lines.close()
	os.remove('/tmp/std_out.%s' %temp_file)

	f_stderr_lines = open('/tmp/std_err.%s' %temp_file,'r')		
	stderr_lines = f_stderr_lines.read().split('\n')
	f_stderr_lines.close()	
	os.remove('/tmp/std_err.%s' %temp_file)
	
	stdout_raw = ''
	stderr_raw = ''

	#print (stdout_lines)
	#print (stderr_lines)

	#print("Stdout output:")
	for line in stdout_lines:
		stdout_raw = stdout_raw + line + '\n'
		#print(line)

	#print("\nStderr output:")
	for line in stderr_lines:
		stderr_raw = stderr_raw + line + '\n'
		#print(line)

	self.status_error = int(proc.returncode)
	
	#print (stdout_raw)
	#print (stderr_raw)
	
	#self.output = stdout_raw.decode("UTF-8" ,errors="ignore") #Strip ASCII
	#self.error = stderr_raw.decode("UTF-8",  errors="ignore") #Strip ASCII
	
	self.output = stdout_raw
	self.error = stderr_raw

	#Convert standard output to useable
	try:
	  self.cr = self.output.split('\n')
	except :
	  self.cr = []
	try:
	  self.count = len(self.cr)
	except :
	  self.count = 0

	try:
	  self.error_cr = self.error.split('\n')
	except :
	  self.error_cr = []
	try:
	  self.err_count = len(self.error_cr)
	except :
	  self.err_count = 0

	if self.status_error == 0 :
		self.status = 'OK'
	else : 
		self.status = 'BAD'

	return

if sys_name == 'backup-1.scom.ca' and backup[0] == 'B2' : #found a backup to run on backup-1
	if backup[5] == '' or backup[5] == None:
		command = ['/programs/common3/backup', '-s', '%s' %backup[1], '-d', '%s' %backup[2], '-j', '%s' %backup[4], '-c', '%s' %backup[3]] 
	else :
		command = ['/programs/common3/backup', '-s', '%s' %backup[1], '-d', '%s' %backup[2], '-j', '%s' %backup[4], '-c', '%s' %backup[3], '-t', '%s' %backup[5] ] 
	
	print (command)

	backups_running = commands(command_backups).cr

	for n in range (0,len(backups_running)) :
		if '-j %s' %backup[4] in backups_running[n] or ( (len(backups_running) -2) >= max_runs and backup[9] != 'HIGH' ):
			log_debug ( debug , 'Found Running Report : %s, Exiting ....' %report_name )
			found = 1
			break
	
	if found == 1:
		print ('Condition Found, Continnuing ...')
		continue		
	
	print ('Running : %s' %command)
	
	#Update date/time for this entry in pgdatabase.
	updatecommand = '''update report_check_history set lastdatechange = $$%s$$ where controlid=%s ''' %(current_date,backup[8])
	print (updatecommand)
	pg.execute(updatecommand)
	conn.commit()
	
	#proc = subprocess.Popen(command,stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
	proc = subprocess.Popen(command,stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
	#sys.exit()
	#os.system (command)

class runbackup :
def init(self,os,sys,commands_large,strftime,options,send_text,count,retrycount) :
#Verify jobname
self.jobname = options.jobname
self.send_text = send_text
self.retrycount = retrycount + 1
if self.jobname == None :
log_debug (debug, (‘Bad Job Name - Existing’) )
sys.exit()

	#Verify Source
	self.source = options.source
	self.check = os.path.exists(self.source)
	if self.check == False :
		log_debug (debug,  ('Bad Source Directory : %s' %options.source) )
		sys.exit()

	#Verify Destination
	self.destination = options.destination
	self.check = os.path.exists(self.destination)
	if self.check == False :
		log_debug (debug,  ('Bad Destination Directory : %s' %options.destination) )
		sys.exit()




	log_debug (debug,  ('Processing Source         : %s' %str(self.source) ) )
	log_debug (debug,  ('Processing Destination    : %s' %str(self.destination) ) )
	log_debug (debug,  ('Processing Number of Days : %s' %str(count) ) )
	log_debug (debug,  ('Processing Job Name       : %s' %str(self.jobname) ) )
	

	#sys.exit()

	#print ()
	 
	self.copyfrom = self.source #+ '/*'
	#print (copyfrom)

	self.copyto =  self.destination + '/' + str(self.jobname) + '.1.bz2'

	#Define where the backup temp file is to reside
	
	if options.tempdir == None : #Dump in destination folder as per usual.
		self.copytemp = self.destination + '/' + str(self.jobname) + '.temp.bz2'

	else :
		self.copytemp = options.tempdir + '/' + str(self.jobname) + '.temp.bz2'


	log_debug (debug,  ('Starting Backup Job .... : %s' %str( strftime("%Y-%m-%d %H:%M:%S")  ) ) )

	self.send_text = self.send_text + '\nProcessing Source         : %s' %str(self.source)
	self.send_text = self.send_text + '\nProcessing Destination    : %s' %str(self.destination)
	self.send_text = self.send_text + '\nProcessing Number of Days : %s' %str(count)
	self.send_text = self.send_text + '\nProcessing Job Name       : %s' %str(self.jobname)

	self.send_text = self.send_text + '\n\n'
	self.send_text = self.send_text + '\nStarting Backup Job .... : %s' %str( strftime("%Y-%m-%d %H:%M:%S")  )


	if options.ignore_rmt == True :
		#self.tarball = 'env LC_ALL=C /usr/bin/tar -v -y --exclude /usr/ports -c -f ' + self.copytemp + ' "' + self.copyfrom + '"'
		#	self.tarball = ['/usr/bin/tar','-v -y --exclude /usr/ports -c -f ' + self.copytemp + ' "' + self.copyfrom + '"' 
		self.tarball = ["/usr/bin/bsdtar", "-v", "--exclude", "/rmt", "--exclude", "/usr/ports", "--use-compress-program", "/usr/local/bin/pbzip2", "-c", "-f",self.copytemp, self.copyfrom]
		
	else :
		#self.tarball = 'env LC_ALL=C /usr/bin/tar -v -y --exclude=/rmt --exclude /usr/ports -c -f ' + self.copytemp + ' "' + self.copyfrom + '"'
		#self.tarball = '/usr/bin/tar -v -y --exclude=/rmt --exclude /usr/ports -c -f ' + self.copytemp + ' "' + self.copyfrom + '"'
		self.tarball = ["/usr/bin/bsdtar", "-v", "--exclude", "/usr/ports", "--use-compress-program", "/usr/local/bin/pbzip2", "-c", "-f",self.copytemp, self.copyfrom]
	
	#print ()
	#print (self.tarball)
	#print ()
	
	log_debug (debug,  ('Executing : %s' %str(self.tarball)	) )
	
	self.command = commands_large(self.tarball)
	
	log_debug (debug,  ('Tarball Finished : %s' %str(self.tarball)	) )
	
	self.output = self.command.output + '\n' + self.command.error
	
	#print ('self.output : \n%s\n' %self.output)
	
	self.status = self.command.status
	log_debug (debug,  ('Tarball Status [%s] : %s' %(self.status,str(self.tarball) )	) )
	self.status_error = self.command.status_error
	
	
	return

backup = runbackup(os,sys,commands_large,strftime,options,send_text,count,0)

log_debug (debug, (‘Backup Returned : %s’ %str(backup.tarball) ) )

The docs for Popen.wait includes the warning:

Note This will deadlock when using stdout=PIPE or stderr=PIPE and the child process generates enough output to a pipe such that it blocks waiting for the OS pipe buffer to accept more data. Use Popen.communicate() when using pipes to avoid that.
Note When the timeout parameter is not None, then (on POSIX) the function is implemented using a busy loop (non-blocking call and short sleeps). Use the asyncio module for an asynchronous wait: see asyncio.create_subprocess_exec.

You’re using Popen.wait and then Popen.communicate.

Try using only Popen.communicate.

2 Likes