Hello the following code either early terminates with error code (the window terminated unexpectedly (reason ‘clean exit’, code ‘0’). I have tried several variations of the same code to either completely freeze my laptop or receive that message. Here are my laptop specs (4 core AMD A6-3400 APU w/1thread per core and 5.5 gig available RAM) Note: huge file 1&2 have 1 billion lines of random numbers. Any help or suggestions are greatly appreciated.
import multiprocessing as mp
from multiprocessing import Manager
import time
import os
from math import ceil
class FileProcessor:
def __init__(self, file1_path, file2_path, output_path, chunk_size=10000):
"""
Initialize the file processor with file paths and processing parameters.
Args:
file1_path (str): Path to first input file
file2_path (str): Path to second input file
output_path (str): Path to output file
chunk_size (int): Number of lines to process in each chunk
"""
self.file1_path = file1_path
self.file2_path = file2_path
self.output_path = output_path
self.chunk_size = chunk_size
self.num_processes = mp.cpu_count() # Get number of CPU cores
def get_file_size(self, filepath):
"""Get the number of lines in a file."""
with open(filepath, 'r') as f:
return sum(1 for _ in f)
def process_chunk(self, start_line, chunk_size, shared_dict):
"""
Process a chunk of lines from both input files.
Args:
start_line (int): Starting line number
chunk_size (int): Number of lines to process
shared_dict (dict): Shared dictionary to store results
"""
results = {}
with open(self.file1_path, 'r') as f1, open(self.file2_path, 'r') as f2:
# Skip to the start line
for _ in range(start_line):
next(f1, None)
next(f2, None)
# Process the chunk
for i in range(chunk_size):
line1 = f1.readline().strip()
line2 = f2.readline().strip()
if not line1 or not line2: # End of file
break
try:
num1 = float(line1)
num2 = float(line2)
results[start_line + i] = num1 + num2
except ValueError:
results[start_line + i] = "ERROR"
# Update shared dictionary with results
shared_dict.update(results)
def run_parallel(self):
"""
Run the file processing in parallel using multiple processes.
Returns:
float: Execution time in seconds
"""
start_time = time.time()
# Get total number of lines
total_lines = self.get_file_size(self.file1_path)
# Create a manager for shared dictionary
with Manager() as manager:
shared_dict = manager.dict()
# Calculate chunk sizes and create processes
processes = []
for i in range(self.num_processes):
start = i * ceil(total_lines / self.num_processes)
chunk = ceil(total_lines / self.num_processes)
p = mp.Process(
target=self.process_chunk,
args=(start, chunk, shared_dict)
)
processes.append(p)
# Start all processes
for p in processes:
p.start()
# Wait for all processes to complete
for p in processes:
p.join()
# Write results to output file
with open(self.output_path, 'w') as out:
for i in range(total_lines):
value = shared_dict.get(i, "ERROR")
out.write(f"{value}\n")
end_time = time.time()
execution_time = end_time - start_time
return execution_time
def main():
# File paths
file1_path = "hugefile1.txt"
file2_path = "hugefile2.txt"
output_path = "totalfile1.txt"
# Create processor instance
processor = FileProcessor(file1_path, file2_path, output_path)
# Run processing and get execution time
print(f"Starting processing using {processor.num_processes} CPU cores...")
execution_time = processor.run_parallel()
print(f"\nProcessing completed:")
print(f"- Execution time: {execution_time:.2f} seconds")
print(f"- Output file: {output_path}")
print(f"- CPU cores used: {processor.num_processes}")
if __name__ == "__main__":
main()