Many applications today require real-time data processing—whether it’s for analytics, monitoring, or triggering actions. AWS provides powerful services like Amazon Kinesis for streaming data, AWS Lambda for serverless processing, and DynamoDB for scalable storage.
In this blog, we’ll build a real-time data pipeline that:
- Ingests streaming data (e.g., clickstream, IoT sensor data, or logs) using Kinesis Data Streams.
- Processes records in real-time using Lambda.
- Stores aggregated results in DynamoDB for querying.
Architecture Overview
![AWS Kinesis + Lambda + DynamoDB Architecture]
(Visual: Kinesis → Lambda → DynamoDB)
- Kinesis Data Stream – Captures high-velocity data.
- Lambda Function – Processes records as they arrive.
- DynamoDB Table – Stores aggregated results (e.g., counts, metrics).
Step-by-Step Implementation
1. Set Up a Kinesis Data Stream
Create a Kinesis stream to ingest data:
aws kinesis create-stream --stream-name ClickStream --shard-count 1
Producers (e.g., web apps, IoT devices) can send data like:
{
"userId": "user123",
"action": "click",
"timestamp": "2024-05-20T12:00:00Z"
}
2. Create a Lambda Function to Process Streams
Write a Python Lambda function (process_stream.py) to:
- Read records from Kinesis.
- Aggregate data (e.g., count clicks per user).
- Update DynamoDB.
import json
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('UserClicks')
def lambda_handler(event, context):
for record in event['Records']:
payload = json.loads(record['kinesis']['data'])
user_id = payload['userId']
# Update DynamoDB (increment click count)
table.update_item(
Key={'userId': user_id},
UpdateExpression="ADD clicks :incr",
ExpressionAttributeValues={':incr': 1}
)
return {"status": "success"}
3. Configure Lambda as a Kinesis Consumer
In the AWS Console:
- Go to Lambda → Create Function → Python.
- Add Kinesis as the trigger (select your stream).
- Set batch size (e.g., 100 records per invocation).
4. Set Up DynamoDB for Aggregations
Create a table with userId as the primary key:
aws dynamodb create-table \
--table-name UserClicks \
--attribute-definitions AttributeName=userId,AttributeType=S \
--key-schema AttributeName=userId,KeyType=HASH \
--billing-mode PAY_PER_REQUEST
5. Test the Pipeline
Send test data to Kinesis:
aws kinesis put-record \
--stream-name ClickStream \
--data '{"userId": "user123", "action": "click"}' \
--partition-key user123
Check DynamoDB for aggregated results:
aws dynamodb get-item --table-name UserClicks --key '{"userId": {"S": "user123"}}'
Output:
{ "userId": "user123", "clicks": 1 }
Use Cases
- Real-time analytics (e.g., dashboard for user activity).
- Fraud detection (trigger alerts for unusual patterns).
- IoT monitoring (process sensor data in real-time).
Enjoy
Thank you
Osama