File copy using python queue and threading from sftp source to Google Cloud storage fails with no error if greater than 4 GB

from apache_beam.io.filesystems import FileSystems
from dataclasses import dataclass
from logging import Logger
from multiprocessing import Queue
from threading import Thread
from typing import Iterable
import queue
from apache_beam import DoFn
@dataclass
class SourceTargetPair:
    source_path: str
    target_path: str
    overwrite: bool = True
class CopyFile(DoFn):
    def process(self, element: SourceTargetPair, *args, **kwargs) -> Iterable[SourceTargetPair]:
        source_path, target_path = element.source_path, element.target_path
        CopyFile._copy_file(
                        source_path,
                        target_path,
                        1048576,
                        32,
                        10,
                        1800,
                    )
    @staticmethod
    def _copy_file(
        source_path,
        target_path,
        chunk_size,
        queue_size,
        queue_max_wait_time_sec,
        process_max_wait_time_sec,
    ):
        with FileSystems.open(source_path) as src_file:
            dst_file = None
            try:
                dst_file = FileSystems.create(target_path)

                data_queue = Queue(maxsize=queue_size)
                error_queue = Queue(maxsize=10)

                CopyFile._copy(
                    src_file,
                    dst_file,
                    chunk_size,
                    data_queue,
                    error_queue,
                    queue_max_wait_time_sec,
                    process_max_wait_time_sec,
                )

                dst_file.close()
            except Exception as ex:
                if dst_file:
                    try:
                        dst_file.close()
                    except:
                        pass

                    # Delete incomplete target file
                    if FileSystems.exists(target_path):
                        FileSystems.delete([target_path])

                raise ex
    @staticmethod
    def _copy(
        src_file,
        dst_file,
        chunk_size,
        data_queue,
        error_queue,
        queue_max_wait_time_sec,
        process_max_wait_time_sec,
    ):
        try:
            reader = Thread(
                target=CopyFile._read,
                args=(
                    src_file,
                    chunk_size,
                    data_queue,
                    error_queue,
                    queue_max_wait_time_sec,
                ),
            )
            reader.start()

            writer = Thread(
                target=CopyFile._write,
                args=(
                    dst_file,
                    data_queue,
                    error_queue,
                    queue_max_wait_time_sec,
                ),
            )
            writer.start()

            reader.join(process_max_wait_time_sec)
            writer.join(process_max_wait_time_sec)

            if not error_queue.empty():
                ex = error_queue.get()
                raise ex
        finally:
            data_queue.cancel_join_thread()
            error_queue.cancel_join_thread()

    @staticmethod
    def _read(src_file, chunk_size, data_queue, error_queue, queue_max_wait_time_sec):
        try:
            data = src_file.read(chunk_size)
            while data and error_queue.empty():
                try:
                    data_queue.put(data, True, queue_max_wait_time_sec)
                    data = src_file.read(chunk_size)
                except queue.Full:
                    pass

            data_queue.put(b"", True, queue_max_wait_time_sec)
        except Exception as ex:
            error_queue.put(ex)

    @staticmethod
    def _write(dst_file, data_queue, error_queue, queue_max_wait_time_sec):
        data = True
        while data and error_queue.empty():
            try:
                data = data_queue.get(True, queue_max_wait_time_sec)
                dst_file.write(data)
            except queue.Empty:
                pass
            except Exception as ex:
                error_queue.put(ex)
                break  

so this is the code. There is no error message if the source is above 4 GB csv. Source is SFTP location and target is google cloud storage GCS location. The target doesnt seem to match the file size of the source. if the file source is less than 4G there is no issue. I am completely stumped on this issue.Please help on this python code as I cant seem to get the flow/ debug this issue. Note that FileSystems is from apache_beam a distributed data processing framework.

After posting this thread I feel like the reader thread might be taking more time for large files, however as we specify reader.join(process_max_wait_time_sec) a wait time,the main thread will continue and will not wait for the reader thread/writer threads. Could that be ? Any hints/suggestions for this

4GB is a suspicious size. It’s 232 bytes, the maximum value that can fit in 32-bits. Could that be something to do with it? Is there a 32-bit limit somewhere?

Try adding some logging so that you can see what it’s doing. Maybe there’s an error somewhere that’s getting swallowed.

Hello :waving_hand:

Yes what you write is correct: using a timeout for joining a thread doesn’t stop the thread, it only allows the joiner to do something when the timeout expires. The docs: threading — Thread-based parallelism — Python 3.14.3 documentation

You may share a stop event between the two threads to signal that the timeout was reached and to allow the writer thread to exit gracefully. Note that this means the joiner thread should call join twice: once with the timeout argument set and the second time without it. Something like:

writer.join(timeout)
timed_out.set()
writer.join()

# in writer:
while … and not timed_out.is_set():
    …