I am trying to design a snippet of code to learn asyncio along with the Azure SDK for Python for its Blob Storage. To date, I created a simple class to manage Azure resources and access, exploiting also asyncio in order to download multiple blobs (files) faster.
My concern is that when I create an instance of this class, the container_client
is created and then in the main
it is managed with the with async with BSA.container_client:
line, e.g. the resource is closed after. What if I wish to call again some function depending on that resource?
How can I move the creation of the resource from the __init__
function and its management with the with
keyword in the main
function inside the download_blobs_async
function?
import asyncio
import logging
from io import BytesIO
from azure.storage.blob.aio import ContainerClient
from azure.core.exceptions import ResourceNotFoundError
logger = logging.getLogger('app')
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
logger.addHandler(console_handler)
class BlobStorageAsync:
def __init__(self, connection_string, container_name):
self.connection_string = connection_string
self.container_name = container_name
container_client = ContainerClient.from_connection_string(
conn_str=connection_string,
container_name=container_name,
)
self.container_client = container_client
async def download_blob_async(self, blob_name: str) -> bytes:
try:
blob_client = self.container_client.get_blob_client(blob=blob_name)
async with blob_client:
stream = await blob_client.download_blob()
data = await stream.readall()
logger.info("The file %s was downloaded", blob_name)
return BytesIO(data)
except ResourceNotFoundError:
logger.info("The file %s was not found", blob_name)
return None
async def download_blobs_async(self, blobs_list):
#task = tg.create_task(self.download_blob_async(blob_name))
tasks = []
for blob_name in blobs_list:
task = asyncio.create_task(self.download_blob_async(blob_name))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
async def main():
connection_string = ...
container_name = ...
blobs_list = ['one.parquet', 'two.parquet', 'three.parquet']
BSA = BlobStorageAsync(connection_string, container_name)
async with BSA.container_client:
results = await BSA.download_blobs_async(blobs_list)
results = [result for result in results if result is not None] # filter only for data that exist
if __name__ == '__main__':
asyncio.run(main())