Concurrent downloader, ideas needed

I’ve written a tool to speed up the download time for large datasets which can be parallelized using multiprocessing.pool.ThreadPool. My primary use case is for downloading historical price data, such as crypto prices, but I want to the tool to be open-ended enough that it could be used for any similar use case. I’m looking for some ideas on how to make the library easily adapted to whatever API a developer is working with.

One challenge I haven’t solved yet is how to limit the number of max connections per given timeframe. Any help on this would be appreciated.

The other matter I’m contemplating is how to simplify the post-processing step after the download completes. The tool will output one file for every payload received, so the end result is a folder of files, which then need to be merged together into a CSV. I’d like to make it easy for developers to provide a set of instructions the library can apply to the data before merging it all together. Some data providers will give us JSON, and others will give us CSV. Some companies will form each JSON element as key-value pairs, while others will form each element as a list. So in the first scenario, we have the column name included in every row, but in the second scenario, we may not have the columns names in our data at all. Additionally, for JSON data, we probably need to specify a branch of the JSON object that we want to keep. For example, with data from Bitstamp, the code to extract just the array of historical records is:

items = r.json()['data']['ohlc']

I don’t want end users of the tool to be forced to process the output files themselves, but I do think they should have the option. What would be a way to implement a tool that could do this? I’m thinking along the lines of using a YAML or JSON config file, where operations that the tool can perform are specified as elements in an array, and some operations may accept options. But another method could be to have the developer provide a function that receives the file content as a string, and expects the function to return an object of a certain type, but what type that should be, I’m not sure. A string response, or a list of strings, may work, but a pandas dataframe would probably be more useful for ensuring proper row order. Thoughts?

To make the tool open ended I would keep it in three parts:

  1. The data collector itself.
  2. The manifest that is interpreted by the collector and which contains the prescriptive format that the data source needs to be interpreted as.
  3. The interpreter that reads the manifests and gives that as a task to the data collector.

Finally I would shy away from using CSV. I’ve spent 20 years parsing CSV files and they contain an intractable problem summarised by the W3C: CSV on the Web: Use Cases and Requirements
See more here: github pandas issue 42479 and here: root-11 notes on text as tables

Instead I would recommend to collect the data in HDF5 using h5py as interface as HDF5 gives concurrent reads/writes and has a searchable structure.

… Just my 5p :wink:

1 Like

Do you mean the connection event itself? Or the whole connect/download process?

I would be inclined to use a Semaphore for this. Set the semaphore to the max connections in your timeframe. Surround the whole connection/fetch with your semaphore:

with conn_sem:
    connect/fetch/download/etc ...

… or just the “connect” if you’re just rate limiting the connections.

Have a separate ticker function which polls the semaphore value as time passes, and calls sem.release() as the time allows more connections. You’ll need a little mucking about to avoid an underflow (too many release() calls). I once wrote an AdjustableSemaphore class for this kind of thing, code here: Bitbucket
It is not heavily tested, but I’d use it to give the semaphore more capacity as the time passed in the ticker.

Cheers,
Cameron SImpson

1 Like

@root-11 I hear you on the CSV issue. Thanks for sharing this link, this is really on point. I think you’re right, CSV should not be the recommended or only output option from this tool. I picked CSV because it seemed to be preferred by pandas, and I’m just recently wading into the data science world. I’ll experiment with h5py, thanks.

@cameron yes, I need to rate-limit the connections. Certain data providers ban IP’s if they exceed the max concurrent connections per given time period. Thanks for the advice and bitbucket link, this looks like the thing I was needing!

FYI, you can pip install cs.threads if you want the class trivially.

The other crude but simple approach you could take is like this:

mutex = Lock()
mutex.acquire()
connect(.....)
Thread.run(target=lambda:(time.sleep(mintime), mutex.release())).start()

i.e. require a mutex for the connect, release it the right amount of time later to prevent a too-fast connect afterwards.

1 Like