Interleaving simple file operations with iterators

I have a function that takes an iterator that produces bytes chunks, and sends them over telnet (file transfer over telnet is pain but this is for hacking an embedded device with limited busybox so I don’t have many alternatives). I’d like to build a tar file on the fly, make it produce the bytes chunks and yield them to the upload function. My understanding is that tarfile supports passing in a fileobj to tarfile.open which appears to be of type BytesIO. How can I setup BytesIO or whatever this fileobj is so that execution follows this flow:

  • upload func requests chunk from iterator
  • iterator starts tar compression, passes the fake file obj, gets control back when .write() is called
  • iterator yields chunk from tar to upload func
  • upload does its thing, asks for the next chunk
  • repeat until tar creation finishes

And more generally: For cases where simple file operations are sufficient, meaning read and write chunks sequentially. Is there a wrapper in the std lib to interleave these operations? I’m thinking it would look something like this:

reads_bytes_from_file(
  FakeFile(input=lambda writeable: writes_bytes_to_file(writeable))
)

As a sidenote: tar is not compression. It’s just a standard for representing the contents of multiple files within one file (and knowing where the boundaries are, what the filenames are etc.). Of course, it supports compression via a few different algorithms. Either way, the embedded device on the other end needs to know what it will receive and how to deal with it.

I assume that you know the files that will be used for the tarball ahead of time. A generator is a suitable tool for implementing the iterator you want. The control flow that you want is more or less automatic that way. An implementation might look like:

def tarred(chunksize, *filenames):
    buffer = BytesIO()
    with tarfile.open(fileobj=buffer, mode='w') as t:
        # We need to keep track of separate "read" and "write" positions
        # (we always write at the end). Whenever we start yielding chunks,
        # we must temporarily move back to the read position; when there
        # is not enough data for the next chunk, we move back for more.
        read_pos, write_pos = 0, buffer.tell()
        for name in filenames:
            # yield as many chunks as possible, then add the next file.
            # Start with reads; the buffer starts out containing a header.
            buffer.seek(read_pos)
            while (read_pos + chunksize) <= write_pos:
                yield buffer.read(chunksize)
            read_pos = buffer.tell()
            buffer.seek(write_pos)
            t.add(name)
            write_pos = buffer.tell()
        # Yield all chunks that are available after tarring the last file.
        buffer.seek(read_pos)
        while (result := buffer.read(chunksize)):
            yield result

I’ve deliberately complicated this by interlacing the reads and writes so that files are only added to the in-memory tar as needed, in case you have huge files and need to avoid high memory usage. If you just want to make the tarball and yield it in chunks, that’s far simpler - you can just make the complete tar in memory first, and you don’t even need to write the generator logic to iterate over the resulting bytes:

If for whatever reason you need to work with the underlying bytes of the BytesIO after writing all the tar data, simply call its getvalue method.

Sorry for the confusion, but yes I’m using tar mainly to package files into a binary, compression is on top of that. I could easily do without compression.

That example you provided is certainly a way to yield chunks, but my point was doing it while tar is building the binary, so that it doesn’t have to store the whole thing in memory. In my specific case storing the whole tar in memory isn’t really an issue, since I’m dealing with small text files, but it’d be cool to know if something like stream processing is possible by just interleaving iterators.

I ended up doing this:

def upload_mod(tn: Telnet):
    def reset_file(info: tarfile.TarInfo):
        info.uid = 0
        info.gid = 0
        return info

    # writes completely to ram then pushes, file is small so np
    tar_bytes = BytesIO()
    with tarfile.open(mode="w|gz", fileobj=tar_bytes) as tar_file:
        for file in mod_files:
            tar_file.add(file, file.relative_to(mod_dir), filter=reset_file)

    assert_command(
        pipe_binary(
            # pushing as chunks, could use `.read(n)` too
            tn, chunked(tar_bytes.getvalue(), 1024), *script("cat > /tmp/hack.tar.gz")
        )
    )
    checksum, _ = assert_command(
        wait_for_command(tn, "md5sum", "/tmp/hack.tar.gz")
    ).split(b"  ", 2)
    assert md5(tar_bytes.getvalue()).hexdigest().encode() == checksum
    ...

pipe_binary is the function that sends the chunks, as you can see, in this case it just pushes whatever tarfile already processed, there’s no on-the-fly pushing.

I wouldn’t do it this way. I’d make a file-like class which only
supports a .write(self,bytes) method, and which puts the bytes onto a
Queue. (import queue for this.) No extra storage!

Depending on how you do this, you can consume the queue and send them
down the wire.

I’d:

  • dispatch the tarring operation in a Thread
  • have your main code consume from the Queue in your class

Should work just fine. I do this kind of thing a fair bit. So much so
that I’ve even got a QueueIterator class which wraps a Queue in an
iterator, and its usual factory function IterableQueue, like this:

 Q = IterableQueue(16)   # arbitrary capacity, default is unbound
 T = Thread(target=tar_to_queue, args=(filenames, Q), daemon=True)
 T.start()
 for bs in Q:
     write bs to your data stream

You’d need to write tar_to_queue, which made a just-has-a-write
class to consume the tar and put the results to the queue.

You’re welcome to IterableQueue if you want, or you could write your
own (more fun).

1 Like