How To Handle Bulk File Uploads To S3 Bucket In Flask?

Sabin Sharma
5 min readJun 14, 2021

Starting my first day as an Associate Software Engineer at Gurzu, I was given a problem to solve bulk file uploads to S3 bucket in flask. The first question that popped into my head was WHY!?? I mean, Can’t we just upload it to s3 itself ? That is when I suddenly came to realize , flask is a synchronous framework and would only take next request after the initial request was completely processed. (no wonder I am an associate. But hey! you learn from your mistakes right!) So , For every upload request , The user had to wait for insignificant amount of time for its processing.

Some background info: Back in my college days , I did solve few problems like this but wasn’t exactly sure if it’s gonna work here. I was unsure about choose between threading or multi processing.

So after few hours of frying my brains (i mean googling) I figured, we could use threading to perform multiple uploads at a time. I know python is not known for its threading skills but if it works, IT WORKS!. For all that, threading are faster in comparison to multi processing but not as much reliable. They can end up in a deadlock or premature death of the thread and there are all sorts of problem. But In my case it worked like a charm!.

I tried solving it with many different approaches but it kinda failed successfully lol. So, After spending some time I acknowledged, There were some delay that we could do nothing of. For instance the time taken for the files to reach the server endpoint. Ignoring that delay the solution was to save the file locally because in flask multipart data more than 500KB are stored as a file so it would be faster to save them locally and then push it to the cloud using some background process. For background processing I used celery with redis back end.

The catch was that for every request with multiple file we save it locally and initiate a background process that uploads them to s3 using threads.

So I wrote a function that handles the request and kicks off a background process.

Function to handle the request

@app2.route('/upload', methods=['POST'])
@auth_required
def upload_file():
# Get the name of the uploaded files
uploaded_files = request.files.getlist("file")
filenames = []
for file in uploaded_files:
# Check if the file is one of the allowed types/extensions
if file and allowed_file(file.filename):
# Make the filename safe, remove unsupported chars
filename = secure_filename(file.filename)
file.save(os.getcwd() + '/temp/' + filename)
filenames.append(filename)

background.delay(filenames, request.remote_addr)

resp = jsonify({'message': 'File successfully uploaded'})
resp.status_code = 200
return resp

Function to handle background process

@celery.task
def background(filenames, remote_address):
app2.logger.info("##############################################")
app2.logger.info("Upload Request Sent From: " + str(remote_address))
app2.logger.info("##############################################")
for name in filenames:
app2.logger.info(f"File {name} Entered For Processing on {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
with open(str(os.getcwd() + '/temp/' + name), 'rb') as upload:
app2.logger.info(f"File {name} Entered IO Stream on {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
session = boto3.session.Session(aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name)
s3_client = session.client('s3')
with ThreadPoolExecutor(max_workers=8) as executor:
app2.logger.info(
f"File {name} Entered Thread Processor on {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
future_uploads = executor.submit(handle_bulk_upload_to_s3, s3_client, upload, name, app2, bucket_name,
'data/')
result = future_uploads.result()
if result:
try:
os.remove(os.getcwd() + '/temp/' + name)
app2.logger.info(
f"File {name} Successfully Deleted on {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")
except FileNotFoundError:
app2.logger.error(
f"File {name} failed to delete locally on {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}")

Explanation of the code line by line

uploaded_files = request.files.getlist("file")

Let’s start by saving all the file references to a variable so that it would be easier to filter them.

filenames = []

Initializing a list so that only filtered file names could be sent for further processing. You might think why not to send all the file at once ? . The answer to that question is — file objects are not serializable and celery only takes serializable objects as it parameters.

for file in uploaded_files:
# Check if the file is one of the allowed types/extensions
if file and allowed_file(file.filename):
# Make the filename safe, remove unsupported chars
filename = secure_filename(file.filename)
file.save(os.getcwd() + '/temp/' + filename)
filenames.append(filename)

So here we are iterating through all the files and checking if they are valid or not. If they are valid we append that file name to the list previously initialized and save them locally to a temp directory otherwise continue the iteration.

background.delay(filenames, request.remote_addr)resp = jsonify({'message': 'File successfully uploaded'})
resp.status_code = 200
return resp

So after all files are checked a background process is triggered and 200 status code is sent to the user with respect to the request.

For the background process I wont be explaining the logging and stuff part. I will be only explaining the threading portion.

Iterating over all the file references previously initialized

for name in filenames:

Opening that reference of the file stored in temp dir

with open(str(os.getcwd() + '/temp/' + name), 'rb') as upload:

Initializing our S3 session

aws_secret_access_key=aws_secret_access_key,
region_name=region_name)
s3_client = session.client('s3')

Initializing the thread pool executor with maximum of 8 workers

with ThreadPoolExecutor(max_workers=8) as executor:
executor.submit(handle_bulk_upload_to_s3, s3_client, upload, name, app2, bucket_name,'data/')
result = future_uploads.result()
if result:
try:
os.remove(os.getcwd() + '/temp/' + name)

except FileNotFoundError:
pass

So in this part we initialize a thread pool and pass a function handle_bulk_upload_to_s3 to the executor which handles all the action performed within that function using threads. So if the work is completely performed without any exceptions then future_uploads.result() returns True otherwise False. After successful completion of the operation we remove that particular file from out temp directory otherwise leave it there so that unsuccessful uploads can be examined later. To solve this we can write another function so that after 3 retries it deletes the file from the local server therefore unwanted files does not take the resources.

handle_bulk_upload_to_s3 function somewhat looks like this

def handle_bulk_upload_to_s3(client, file, filename, app2, bucket_name, path):
try:
file_name = filename.split(".")[0] + ".txt"

response = client.put_object(
Bucket=bucket_name,
Body=file,
Key=path + file_name
)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
return True
return False

except Exception as e:
return False

I cant provide you the actual code due to project confidentiality but you get the run down.

So , now you can easily handle multiple file uploads to s3 without any issue.

--

--