INTEGRATING S3 EVENT NOTIFICATIONS WITH AWS LAMBDA FOR AUTOMATED JSON TO CSV CONVERSION.

Okey Ebere
6 min readJun 16, 2024

--

Repetitive tasks are a significant aspect of the IT world. In an environment where every second counts, automating tasks as soon as they arise is crucial for IT operations and businesses. This need drives the adoption of event-driven architecture, especially in data engineering, where data from third parties and other sources is frequently stored in systems like S3 buckets. Among the various methods for building data pipelines for file processing, a combination of S3, SQS, and AWS Lambda stands out as both efficient and straightforward.

CASE STUDY

In organization XYZ, the current architecture involves provisioning an On-Demand EC2 instance and running a cron job periodically. This cron job initiates a bash script that makes a curl call to an API endpoint to fetch data and uses jq to convert the JSON data to CSV. This approach is not cost-effective and consumes a lot of resources since the data is dynamic and does not follow a predictable pattern for when a new data will be available at the endpoint.

You have been tasked with automating the conversion of dynamic JSON data, stored in an S3 bucket by a third-party application, into a CSV file. Additionally, you need to filter the JSON data based on the ‘status_reason’ attribute not being “think_it_passed,” count occurrences, group the data by status and status reason, and format it into CSV. Finally, the output in CSV format should be sent to another S3 bucket.

In this article, we will explore how to set up and utilize S3 event notifications and AWS Lambda configuration to streamline data processing efficiently.

SERVICE OVERVIEW

  1. AWS S3 BUCKET- S3 is a secured block storage. It securely stores various types of data, including files and photos etc. Whenever an s3:ObjectCreated:* event with a suffix of .json occurs, it initiates a trigger to AWS Lambda.
  2. AWS LAMBDA — Lambda acts as the processing engine of our setup. It functions as a service that executes tasks without requiring a dedicated server to be constantly running and you pay for only the compute time used, making it cost effective. When S3 bucket detects a new file upload, Lambda springs into action, retrieving the file from S3 and immediately processing it—whether that involves transforming data, transferring it elsewhere, or analyzing it for valuable insights.
  3. AWS CLOUD WATCH — Enabled on AWS lambda for monitoring.

Prerequisites

Before you begin, ensure you have:

i. An AWS account with appropriate privileges.

ii. Fundamental understanding of S3, Lambda, and Python.

STEP ONE — Provision an S3 bucket

We will create two S3 buckets: one for storing the .json files and another as the destination bucket for storing the .csv files

First, let’s create an S3 bucket to store our CSV files:

i. Sign in to the AWS Management Console and open the Amazon S3 console.

ii. Click **Create bucket**

iii. Enter a unique **Bucket name**.

iv. Enable versioning for avoid deletion and also change track of changes.

v. Keep on other settings as default and click **Create bucket** at the bottom.

Repeat the same process to create the destination bucket.

STEP TWO - Create an AWS Lambda function

Next, we’ll create a Lambda function to process the CSV files:

i. Open the AWS Lambda console.

ii. Click on Create function.

iii. Select Author from scratch.

iv. Enter a Function name.

v. Choose Python 3.12 as the runtime.

vi. Under Permissions, choose Create a new role with basic Lambda permissions.

vii. Keep on other settings as default and click on Create function.

Execution Role

In the Lambda function code editor, paste the following Python code:

import json
import csv
import boto3
import os

s3 = boto3.client('s3')

def lambda_handler(event, context):
try:
# Get the S3 bucket and object key from the event
source_bucket = event['Records'][0]['s3']['bucket']['name']
object_key = event['Records'][0]['s3']['object']['key']

# Extract the filename without extension
base_filename = os.path.splitext(object_key)[0]

# Download the JSON file from S3
download_path = '/tmp/{}'.format(object_key)
s3.download_file(source_bucket, object_key, download_path)

# Read and process the JSON data
with open(download_path, 'r') as json_file:
try:
data = json.load(json_file)
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
return {
'statusCode': 400,
'body': json.dumps('Error decoding JSON')
}

# Ensure the 'json_data' key exists and is a list
if 'json_data' not in data or not isinstance(data['json_data'], list):
print(f"Expected 'json_data' to be a list, but got {type(data.get('json_data')).__name__}")
return {
'statusCode': 400,
'body': json.dumps('Invalid JSON structure')
}

# Filter and group data
json_data = data['json_data']
filtered_data = [item for item in json_data if item.get('status_reason') != "think_it_passed"]
grouped_data = {}
for item in filtered_data:
status = item.get('status')
reason = item.get('status_reason')
if status not in grouped_data:
grouped_data[status] = {}
if reason not in grouped_data[status]:
grouped_data[status][reason] = 0
grouped_data[status][reason] += 1

# Convert to CSV
csv_data = []
for status, reasons in grouped_data.items():
for reason, count in reasons.items():
csv_data.append([status, reason, count])

csv_file_path = '/tmp/{}.csv'.format(base_filename)
with open(csv_file_path, 'w', newline='') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(['Status', 'Status Reason', 'Count'])
csvwriter.writerows(csv_data)

# Upload the CSV file to the destination S3 bucket
destination_bucket = 'aws-destination-001'
s3.upload_file(csv_file_path, destination_bucket, '{}.csv'.format(base_filename))

return {
'statusCode': 200,
'body': json.dumps('CSV file created and uploaded successfully!')
}
except Exception as e:
print(f"An error occurred: {e}")
return {
'statusCode': 500,
'body': json.dumps('An internal error occurred')
}

STEP THREE — Create an S3 Event Notification.

i. Select the bucket that was created to store the .json files.

ii. Go to the properties tab. Scroll down to Event notification and click create event notification.

iii. Set up the notification. Specify the name and event type, which should be s3:ObjectCreated:*.

iv. Configure the prefix to .json.

v. Choose lambda function as the destination. Then select the lambda function you created above.

vi. click on save change.

s3 added as a lambda function trigger.

STEP FOUR — Update Lambda Function IAM Role.

Attach the S3 read and write access policy to the execution role associated with your Lambda function. This policy grants the necessary permissions to get and put objects in the S3 bucket.


{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket"
],
"Resource": "*"
}
]
}

STEP FOUR— Upload a JSON File and Test

Upload a JSON file to the source S3 bucket and verify that the Lambda function processes it and uploads a CSV file to the destination S3 bucket.

source s3 bucket
Destination s3 bucket
lambda sucess logs

NOTE

If you encounter a timeout error, consider increasing the Lambda function’s execution timeout, as the default duration is set to 3 seconds.

GitHub Link to project files.

CONGRATULATIONS

Congratulations! You have successfully automated the conversion of dynamic JSON data stored in an S3 bucket by a third-party application into a CSV file. By implementing S3 event notifications and AWS Lambda, you have enabled real-time filtering, counted occurrences, and grouped the data based on the ‘status_reason’ attribute, ensuring the attribute is not “think_it_passed.” The processed data is then formatted into a CSV file and seamlessly sent to another S3 bucket.

This solution not only enhances data processing efficiency but also optimizes resource utilization and cost-effectiveness for your organization.

--

--

No responses yet