Real-Time Data Processing with AWS Kinesis, Lambda, and DynamoDB

Many applications today require real-time data processing—whether it’s for analytics, monitoring, or triggering actions. AWS provides powerful services like Amazon Kinesis for streaming data, AWS Lambda for serverless processing, and DynamoDB for scalable storage.

In this blog, we’ll build a real-time data pipeline that:

  1. Ingests streaming data (e.g., clickstream, IoT sensor data, or logs) using Kinesis Data Streams.
  2. Processes records in real-time using Lambda.
  3. Stores aggregated results in DynamoDB for querying.

Architecture Overview

![AWS Kinesis + Lambda + DynamoDB Architecture]
(Visual: Kinesis → Lambda → DynamoDB)

  1. Kinesis Data Stream – Captures high-velocity data.
  2. Lambda Function – Processes records as they arrive.
  3. DynamoDB Table – Stores aggregated results (e.g., counts, metrics).

Step-by-Step Implementation

1. Set Up a Kinesis Data Stream

Create a Kinesis stream to ingest data:

aws kinesis create-stream --stream-name ClickStream --shard-count 1

Producers (e.g., web apps, IoT devices) can send data like:

{
  "userId": "user123",
  "action": "click",
  "timestamp": "2024-05-20T12:00:00Z"
}

2. Create a Lambda Function to Process Streams

Write a Python Lambda function (process_stream.py) to:

  • Read records from Kinesis.
  • Aggregate data (e.g., count clicks per user).
  • Update DynamoDB.
import json
import boto3

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('UserClicks')

def lambda_handler(event, context):
    for record in event['Records']:
        payload = json.loads(record['kinesis']['data'])
        user_id = payload['userId']
        
        # Update DynamoDB (increment click count)
        table.update_item(
            Key={'userId': user_id},
            UpdateExpression="ADD clicks :incr",
            ExpressionAttributeValues={':incr': 1}
        )
    return {"status": "success"}

3. Configure Lambda as a Kinesis Consumer

In the AWS Console:

  • Go to Lambda → Create Function → Python.
  • Add Kinesis as the trigger (select your stream).
  • Set batch size (e.g., 100 records per invocation).

4. Set Up DynamoDB for Aggregations

Create a table with userId as the primary key:

aws dynamodb create-table \
    --table-name UserClicks \
    --attribute-definitions AttributeName=userId,AttributeType=S \
    --key-schema AttributeName=userId,KeyType=HASH \
    --billing-mode PAY_PER_REQUEST

5. Test the Pipeline

Send test data to Kinesis:

aws kinesis put-record \
    --stream-name ClickStream \
    --data '{"userId": "user123", "action": "click"}' \
    --partition-key user123

Check DynamoDB for aggregated results:

aws dynamodb get-item --table-name UserClicks --key '{"userId": {"S": "user123"}}'

Output:

{ "userId": "user123", "clicks": 1 }

Use Cases

  • Real-time analytics (e.g., dashboard for user activity).
  • Fraud detection (trigger alerts for unusual patterns).
  • IoT monitoring (process sensor data in real-time).

Enjoy
Thank you
Osama

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.