Code either terminates early or freezes laptop

Hello the following code either early terminates with error code (the window terminated unexpectedly (reason ‘clean exit’, code ‘0’). I have tried several variations of the same code to either completely freeze my laptop or receive that message. Here are my laptop specs (4 core AMD A6-3400 APU w/1thread per core and 5.5 gig available RAM) Note: huge file 1&2 have 1 billion lines of random numbers. Any help or suggestions are greatly appreciated.

import multiprocessing as mp
from multiprocessing import Manager
import time
import os
from math import ceil

class FileProcessor:
    def __init__(self, file1_path, file2_path, output_path, chunk_size=10000):
        """
        Initialize the file processor with file paths and processing parameters.
        
        Args:
            file1_path (str): Path to first input file
            file2_path (str): Path to second input file
            output_path (str): Path to output file
            chunk_size (int): Number of lines to process in each chunk
        """
        self.file1_path = file1_path
        self.file2_path = file2_path
        self.output_path = output_path
        self.chunk_size = chunk_size
        self.num_processes = mp.cpu_count()  # Get number of CPU cores

    def get_file_size(self, filepath):
        """Get the number of lines in a file."""
        with open(filepath, 'r') as f:
            return sum(1 for _ in f)

    def process_chunk(self, start_line, chunk_size, shared_dict):
        """
        Process a chunk of lines from both input files.
        
        Args:
            start_line (int): Starting line number
            chunk_size (int): Number of lines to process
            shared_dict (dict): Shared dictionary to store results
        """
        results = {}
        
        with open(self.file1_path, 'r') as f1, open(self.file2_path, 'r') as f2:
            # Skip to the start line
            for _ in range(start_line):
                next(f1, None)
                next(f2, None)
            
            # Process the chunk
            for i in range(chunk_size):
                line1 = f1.readline().strip()
                line2 = f2.readline().strip()
                
                if not line1 or not line2:  # End of file
                    break
                
                try:
                    num1 = float(line1)
                    num2 = float(line2)
                    results[start_line + i] = num1 + num2
                except ValueError:
                    results[start_line + i] = "ERROR"
        
        # Update shared dictionary with results
        shared_dict.update(results)

    def run_parallel(self):
        """
        Run the file processing in parallel using multiple processes.
        
        Returns:
            float: Execution time in seconds
        """
        start_time = time.time()
        
        # Get total number of lines
        total_lines = self.get_file_size(self.file1_path)
        
        # Create a manager for shared dictionary
        with Manager() as manager:
            shared_dict = manager.dict()
            
            # Calculate chunk sizes and create processes
            processes = []
            for i in range(self.num_processes):
                start = i * ceil(total_lines / self.num_processes)
                chunk = ceil(total_lines / self.num_processes)
                p = mp.Process(
                    target=self.process_chunk,
                    args=(start, chunk, shared_dict)
                )
                processes.append(p)
            
            # Start all processes
            for p in processes:
                p.start()
            
            # Wait for all processes to complete
            for p in processes:
                p.join()
            
            # Write results to output file
            with open(self.output_path, 'w') as out:
                for i in range(total_lines):
                    value = shared_dict.get(i, "ERROR")
                    out.write(f"{value}\n")
        
        end_time = time.time()
        execution_time = end_time - start_time
        
        return execution_time

def main():
    # File paths
    file1_path = "hugefile1.txt"
    file2_path = "hugefile2.txt"
    output_path = "totalfile1.txt"
    
    # Create processor instance
    processor = FileProcessor(file1_path, file2_path, output_path)
    
    # Run processing and get execution time
    print(f"Starting processing using {processor.num_processes} CPU cores...")
    execution_time = processor.run_parallel()
    
    print(f"\nProcessing completed:")
    print(f"- Execution time: {execution_time:.2f} seconds")
    print(f"- Output file: {output_path}")
    print(f"- CPU cores used: {processor.num_processes}")

if __name__ == "__main__":
    main()

If I understand your code you end up with 1 entry in shared_dict for each line in the input files. Given you have 1,000,000,000 lines that means that the dict will have 1,000,000,000 entries.

You do not have enough RAM to store that amount of data, 5.5GiB.
RAM will be used up and depending on your OS setup it will swap to disk or kill the process. If it swaps that will appear as a hung system. Are you using Windows?

Also are you sure you mean lines and not bytes for the file?

total_lines is not the number of lines in file1. It’s the size in bytes of the file.

@barry-scott would a generator function work for this type of application?

Given the output is a dict a generator cannot be used.

The algorithm avoids reading all the input file input memory.
But it then creates a huge dict.

1 Like

No, it’s number of lines.

Oh! Missed that - thanks for the correction.

That means that the files are read many times in the algorithm.
Completely for total_lines calculation and then over and over again here as each chunk is processed.