Modern cloud applications increasingly depend on real-time processing, especially when dealing with fraud detection, personalization, IoT telemetry, or operational monitoring.
In this post, we’ll build a fully functional AWS pipeline that:
- Streams events using Amazon Kinesis
- Enriches and transforms them via AWS Lambda
- Stores real-time feature data in Amazon DynamoDB
- Performs machine-learning inference using a SageMaker Endpoint
1. Architecture Overview

2. Step-By-Step Pipeline Build
2.1. Create a Kinesis Data Stream
aws kinesis create-stream \
--stream-name RealtimeEvents \
--shard-count 2 \
--region us-east-1
This stream will accept incoming events from your apps, IoT devices, or microservices.
2.2. DynamoDB Table for Real-Time Features
aws dynamodb create-table \
--table-name UserFeatureStore \
--attribute-definitions AttributeName=userId,AttributeType=S \
--key-schema AttributeName=userId,KeyType=HASH \
--billing-mode PAY_PER_REQUEST \
--region us-east-1
This table holds live user features, updated every time an event arrives.
2.3. Lambda Function (Real-Time Data Enrichment)
This Lambda:
- Reads events from Kinesis
- Computes simple features (e.g., last event time, rolling count)
- Saves enriched data to DynamoDB
import json
import boto3
from datetime import datetime, timedelta
ddb = boto3.resource("dynamodb")
table = ddb.Table("UserFeatureStore")
def lambda_handler(event, context):
for record in event["Records"]:
payload = json.loads(record["kinesis"]["data"])
user = payload["userId"]
metric = payload["metric"]
ts = datetime.fromisoformat(payload["timestamp"])
# Fetch old features
old = table.get_item(Key={"userId": user}).get("Item", {})
last_ts = old.get("lastTimestamp")
count = old.get("count", 0)
# Update rolling 5-minute count
if last_ts:
prev_ts = datetime.fromisoformat(last_ts)
if ts - prev_ts < timedelta(minutes=5):
count += 1
else:
count = 1
else:
count = 1
# Save new enriched features
table.put_item(Item={
"userId": user,
"lastTimestamp": ts.isoformat(),
"count": count,
"lastMetric": metric
})
return {"status": "ok"}
Attach the Lambda to the Kinesis stream.
2.4. Creating a SageMaker Endpoint for Inference
Train your model offline, then deploy it:
aws sagemaker create-endpoint-config \
--endpoint-config-name RealtimeInferenceConfig \
--production-variants VariantName=AllInOne,ModelName=MyInferenceModel,InitialInstanceCount=1,InstanceType=ml.m5.large
aws sagemaker create-endpoint \
--endpoint-name RealtimeInference \
--endpoint-config-name RealtimeInferenceConfig
2.5. API Layer Performing Live Inference
Your application now requests predictions like this:
import boto3
import json
runtime = boto3.client("sagemaker-runtime")
ddb = boto3.resource("dynamodb").Table("UserFeatureStore")
def predict(user_id, extra_input):
user_features = ddb.get_item(Key={"userId": user_id}).get("Item")
payload = {
"userId": user_id,
"features": user_features,
"input": extra_input
}
response = runtime.invoke_endpoint(
EndpointName="RealtimeInference",
ContentType="application/json",
Body=json.dumps(payload)
)
return json.loads(response["Body"].read())
This combines live enriched features + model inference for maximum accuracy.
3. Production Considerations
Performance
- Enable Lambda concurrency
- Use DynamoDB DAX caching
- Use Kinesis Enhanced Fan-Out for high throughput
Security
- Use IAM roles with least privilege
- Encrypt Kinesis, Lambda, DynamoDB, and SageMaker with KMS
Monitoring
- CloudWatch Metrics
- CloudWatch Logs Insights queries
- DynamoDB capacity alarms
- SageMaker Model error monitoring
Cost Optimization
- Use PAY_PER_REQUEST DynamoDB
- Use Lambda Power Tuning
- Scale SageMaker endpoints with autoscaling