How To Handle Bulk File Uploads To S3 Bucket In Flask?

Starting my first day as an Associate Software Engineer at Gurzu, I was given a problem to solve bulk file uploads to S3 bucket in flask. The first question that popped into my head was WHY!?? I mean, Can’t we just upload it to s3 itself ? That is when I suddenly came to realize , flask is a synchronous framework and would only take next request after the initial request was completely processed. (no wonder I am an associate. But hey! you learn from your mistakes right!) So , For every upload request , The user had to wait for insignificant amount of time for its processing.

So after few hours of frying my brains (i mean googling) I figured, we could use to perform multiple uploads at a time. I know python is not known for its threading skills but if it works, IT WORKS!. For all that, threading are faster in comparison to multi processing but not as much reliable. They can end up in a deadlock or premature death of the thread and there are all sorts of problem. But In my case it worked like a charm!.

I tried solving it with many different approaches but it kinda failed successfully lol. So, After spending some time I acknowledged, There were some delay that we could do nothing of. For instance the time taken for the files to reach the server endpoint. Ignoring that delay the solution was to save the file locally because in flask multipart data more than 500KB are stored as a file so it would be faster to save them locally and then push it to the cloud using some background process. For background processing I used celery with redis back end.

The catch was that for every request with multiple file we save it locally and initiate a background process that uploads them to s3 using threads.

So I wrote a function that handles the request and kicks off a background process.

@app2.route('/upload', methods=['POST'])
@auth_required
def upload_file():
# Get the name of the uploaded files
uploaded_files = request.files.getlist("file")
filenames = []
for file in uploaded_files:
# Check if the file is one of the allowed types/extensions
if file and allowed_file(file.filename):
# Make the filename safe, remove unsupported chars
filename = secure_filename(file.filename)
file.save(os.getcwd() + '/temp/' + filename)
filenames.append(filename)

background.delay(filenames, request.remote_addr)

resp = jsonify({'message': 'File successfully uploaded'})
resp.status_code = 200
return resp
@celery.task
def background(filenames, remote_address):
app2.logger.info("##############################################")
app2.logger.info("Upload Request Sent From: " + str(remote_address))
app2.logger.info("##############################################")
for name in filenames:
app2.logger.info(f"File {name} Entered For Processing on {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
with open(str(os.getcwd() + '/temp/' + name), 'rb') as upload:
app2.logger.info(f"File {name} Entered IO Stream on {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
session = boto3.session.Session(aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name)
s3_client = session.client('s3')
with ThreadPoolExecutor(max_workers=8) as executor:
app2.logger.info(
f"File {name} Entered Thread Processor on {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
future_uploads = executor.submit(handle_bulk_upload_to_s3, s3_client, upload, name, app2, bucket_name,
'data/')
result = future_uploads.result()
if result:
try:
os.remove(os.getcwd() + '/temp/' + name)
app2.logger.info(
f"File {name} Successfully Deleted on {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
except FileNotFoundError:
app2.logger.error(
f"File {name} failed to delete locally on {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
uploaded_files = request.files.getlist("file")

Let’s start by saving all the file references to a variable so that it would be easier to filter them.

filenames = []

Initializing a list so that only filtered file names could be sent for further processing. You might think why not to send all the file at once ? . The answer to that question is — file objects are not serializable and celery only takes serializable objects as it parameters.

for file in uploaded_files:
# Check if the file is one of the allowed types/extensions
if file and allowed_file(file.filename):
# Make the filename safe, remove unsupported chars
filename = secure_filename(file.filename)
file.save(os.getcwd() + '/temp/' + filename)
filenames.append(filename)

So here we are iterating through all the files and checking if they are valid or not. If they are valid we append that file name to the list previously initialized and save them locally to a temp directory otherwise continue the iteration.

background.delay(filenames, request.remote_addr)resp = jsonify({'message': 'File successfully uploaded'})
resp.status_code = 200
return resp

So after all files are checked a background process is triggered and 200 status code is sent to the user with respect to the request.

For the background process I wont be explaining the logging and stuff part. I will be only explaining the threading portion.

Iterating over all the file references previously initialized

for name in filenames:

Opening that reference of the file stored in temp dir

with open(str(os.getcwd() + '/temp/' + name), 'rb') as upload:

Initializing our S3 session

aws_secret_access_key=aws_secret_access_key,
region_name=region_name)
s3_client = session.client('s3')

Initializing the with maximum of 8 workers

with ThreadPoolExecutor(max_workers=8) as executor:
executor.submit(handle_bulk_upload_to_s3, s3_client, upload, name, app2, bucket_name,'data/')
result = future_uploads.result()
if result:
try:
os.remove(os.getcwd() + '/temp/' + name)

except FileNotFoundError:
pass

So in this part we initialize a thread pool and pass a function to the executor which handles all the action performed within that function using threads. So if the work is completely performed without any exceptions then returns True otherwise False. After successful completion of the operation we remove that particular file from out temp directory otherwise leave it there so that unsuccessful uploads can be examined later. To solve this we can write another function so that after 3 retries it deletes the file from the local server therefore unwanted files does not take the resources.

def handle_bulk_upload_to_s3(client, file, filename, app2, bucket_name, path):
try:
file_name = filename.split(".")[0] + ".txt"

response = client.put_object(
Bucket=bucket_name,
Body=file,
Key=path + file_name
)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
return True
return False

except Exception as e:
return False

I cant provide you the actual code due to project confidentiality but you get the run down.

So , now you can easily handle multiple file uploads to s3 without any issue.

Life is like gradient .. learn calculate update