Detecting unusual patterns in real time is critical to preventing outages, catching fraud, ensuring SLA compliance, and maintaining high-quality user experiences.
In this post, we build a real working pipeline on OCI that:
- Ingests streaming data
- Computes features in near-real time
- Stores results in Autonomous Database
- Runs anomaly detection logic
- Sends alerts and exposes dashboards
This guide contains every technical step, including:
Streaming → Function → Autonomous DB → Anomaly Logic → Notifications → Dashboards
1. Architecture Overview

Components Used
- OCI Streaming
- OCI Functions
- Oracle Autonomous Database
- DBMS_SCHEDULER for anomaly detection job
- OCI Notifications
- Oracle Analytics Cloud / Grafana
2. Step-by-Step Implementation
2.1 Create OCI Streaming Stream
oci streaming stream create \
--compartment-id $COMPARTMENT_OCID \
--display-name "anomaly-events-stream" \
--partitions 3
2.2 Autonomous Database Table
CREATE TABLE raw_events (
event_id VARCHAR2(50),
event_time TIMESTAMP,
metric_value NUMBER,
feature1 NUMBER,
feature2 NUMBER,
processed_flag CHAR(1) DEFAULT 'N',
anomaly_flag CHAR(1) DEFAULT 'N',
CONSTRAINT pk_raw_events PRIMARY KEY(event_id)
);
2.3 OCI Function – Feature Extraction
func.py:
import oci
import cx_Oracle
import json
from datetime import datetime
def handler(ctx, data: bytes=None):
event = json.loads(data.decode('utf-8'))
evt_id = event['id']
evt_time = datetime.fromisoformat(event['time'])
value = event['metric']
# DB Connection
conn = cx_Oracle.connect(user='USER', password='PWD', dsn='dsn')
cur = conn.cursor()
# Fetch previous value if exists
cur.execute("SELECT metric_value FROM raw_events WHERE event_id=:1", (evt_id,))
prev = cur.fetchone()
prev_val = prev[0] if prev else 1.0
# Compute features
feature1 = value - prev_val
feature2 = value / prev_val
# Insert new event
cur.execute("""
INSERT INTO raw_events(event_id, event_time, metric_value, feature1, feature2)
VALUES(:1, :2, :3, :4, :5)
""", (evt_id, evt_time, value, feature1, feature2))
conn.commit()
cur.close()
conn.close()
return "ok"
Deploy the function and attach the streaming trigger.
2.4 Anomaly Detection Job (DBMS_SCHEDULER)
BEGIN
FOR rec IN (
SELECT event_id, feature1
FROM raw_events
WHERE processed_flag = 'N'
) LOOP
DECLARE
meanv NUMBER;
stdv NUMBER;
zscore NUMBER;
BEGIN
SELECT AVG(feature1), STDDEV(feature1) INTO meanv, stdv FROM raw_events;
zscore := (rec.feature1 - meanv) / NULLIF(stdv, 0);
IF ABS(zscore) > 3 THEN
UPDATE raw_events SET anomaly_flag='Y' WHERE event_id=rec.event_id;
END IF;
UPDATE raw_events SET processed_flag='Y' WHERE event_id=rec.event_id;
END;
END LOOP;
END;
Schedule this to run every 2 minutes:
BEGIN
DBMS_SCHEDULER.CREATE_JOB (
job_name => 'ANOMALY_JOB',
job_type => 'PLSQL_BLOCK',
job_action => 'BEGIN anomaly_detection_proc; END;',
repeat_interval => 'FREQ=MINUTELY;INTERVAL=2;',
enabled => TRUE
);
END;
2.5 Notifications
oci ons topic create \
--compartment-id $COMPARTMENT_OCID \
--name "AnomalyAlerts"
In the DB, add a trigger:
CREATE OR REPLACE TRIGGER notify_anomaly
AFTER UPDATE ON raw_events
FOR EACH ROW
WHEN (NEW.anomaly_flag='Y' AND OLD.anomaly_flag='N')
BEGIN
DBMS_OUTPUT.PUT_LINE('Anomaly detected for event ' || :NEW.event_id);
END;
/
2.6 Dashboarding
You may use:
- Oracle Analytics Cloud (OAC)
- Grafana + ADW Integration
- Any BI tool with SQL
Example Query:
SELECT event_time, metric_value, anomaly_flag
FROM raw_events
ORDER BY event_time;
2. Terraform + OCI CLI Script Bundle
Terraform – Streaming + Function + Policies
resource "oci_streaming_stream" "anomaly" {
name = "anomaly-events-stream"
partitions = 3
compartment_id = var.compartment_id
}
resource "oci_functions_application" "anomaly_app" {
compartment_id = var.compartment_id
display_name = "anomaly-function-app"
subnet_ids = var.subnets
}
Terraform Notification Topic
resource "oci_ons_notification_topic" "anomaly" {
compartment_id = var.compartment_id
name = "AnomalyAlerts"
}
CLI Insert Test Events
oci streaming stream message put \
--stream-id $STREAM_OCID \
--messages '[{"key":"1","value":"{\"id\":\"1\",\"time\":\"2025-01-01T10:00:00\",\"metric\":58}"}]'