Implementing a Real-Time Anomaly Detection Pipeline on OCI Using Streaming Data, Oracle Autonomous Database & ML

Detecting unusual patterns in real time is critical to preventing outages, catching fraud, ensuring SLA compliance, and maintaining high-quality user experiences.
In this post, we build a real working pipeline on OCI that:

  • Ingests streaming data
  • Computes features in near-real time
  • Stores results in Autonomous Database
  • Runs anomaly detection logic
  • Sends alerts and exposes dashboards

This guide contains every technical step, including:
Streaming → Function → Autonomous DB → Anomaly Logic → Notifications → Dashboards

1. Architecture Overview

Components Used

  • OCI Streaming
  • OCI Functions
  • Oracle Autonomous Database
  • DBMS_SCHEDULER for anomaly detection job
  • OCI Notifications
  • Oracle Analytics Cloud / Grafana

2. Step-by-Step Implementation


2.1 Create OCI Streaming Stream

oci streaming stream create \
  --compartment-id $COMPARTMENT_OCID \
  --display-name "anomaly-events-stream" \
  --partitions 3

2.2 Autonomous Database Table

CREATE TABLE raw_events (
  event_id       VARCHAR2(50),
  event_time     TIMESTAMP,
  metric_value   NUMBER,
  feature1       NUMBER,
  feature2       NUMBER,
  processed_flag CHAR(1) DEFAULT 'N',
  anomaly_flag   CHAR(1) DEFAULT 'N',
  CONSTRAINT pk_raw_events PRIMARY KEY(event_id)
);

2.3 OCI Function – Feature Extraction

func.py:

import oci
import cx_Oracle
import json
from datetime import datetime

def handler(ctx, data: bytes=None):
    event = json.loads(data.decode('utf-8'))

    evt_id = event['id']
    evt_time = datetime.fromisoformat(event['time'])
    value = event['metric']

    # DB Connection
    conn = cx_Oracle.connect(user='USER', password='PWD', dsn='dsn')
    cur = conn.cursor()

    # Fetch previous value if exists
    cur.execute("SELECT metric_value FROM raw_events WHERE event_id=:1", (evt_id,))
    prev = cur.fetchone()
    prev_val = prev[0] if prev else 1.0

    # Compute features
    feature1 = value - prev_val
    feature2 = value / prev_val

    # Insert new event
    cur.execute("""
        INSERT INTO raw_events(event_id, event_time, metric_value, feature1, feature2)
        VALUES(:1, :2, :3, :4, :5)
    """, (evt_id, evt_time, value, feature1, feature2))

    conn.commit()
    cur.close()
    conn.close()

    return "ok"

Deploy the function and attach the streaming trigger.


2.4 Anomaly Detection Job (DBMS_SCHEDULER)

BEGIN
  FOR rec IN (
    SELECT event_id, feature1
    FROM raw_events
    WHERE processed_flag = 'N'
  ) LOOP
    DECLARE
      meanv NUMBER;
      stdv  NUMBER;
      zscore NUMBER;
    BEGIN
      SELECT AVG(feature1), STDDEV(feature1) INTO meanv, stdv FROM raw_events;

      zscore := (rec.feature1 - meanv) / NULLIF(stdv, 0);

      IF ABS(zscore) > 3 THEN
        UPDATE raw_events SET anomaly_flag='Y' WHERE event_id=rec.event_id;
      END IF;

      UPDATE raw_events SET processed_flag='Y' WHERE event_id=rec.event_id;
    END;
  END LOOP;
END;

Schedule this to run every 2 minutes:

BEGIN
  DBMS_SCHEDULER.CREATE_JOB (
    job_name        => 'ANOMALY_JOB',
    job_type        => 'PLSQL_BLOCK',
    job_action      => 'BEGIN anomaly_detection_proc; END;',
    repeat_interval => 'FREQ=MINUTELY;INTERVAL=2;',
    enabled         => TRUE
  );
END;


2.5 Notifications

oci ons topic create \
  --compartment-id $COMPARTMENT_OCID \
  --name "AnomalyAlerts"

In the DB, add a trigger:

CREATE OR REPLACE TRIGGER notify_anomaly
AFTER UPDATE ON raw_events
FOR EACH ROW
WHEN (NEW.anomaly_flag='Y' AND OLD.anomaly_flag='N')
BEGIN
  DBMS_OUTPUT.PUT_LINE('Anomaly detected for event ' || :NEW.event_id);
END;
/


2.6 Dashboarding

You may use:

  • Oracle Analytics Cloud (OAC)
  • Grafana + ADW Integration
  • Any BI tool with SQL

Example Query:

SELECT event_time, metric_value, anomaly_flag 
FROM raw_events
ORDER BY event_time;

2. Terraform + OCI CLI Script Bundle

Terraform – Streaming + Function + Policies

resource "oci_streaming_stream" "anomaly" {
  name           = "anomaly-events-stream"
  partitions     = 3
  compartment_id = var.compartment_id
}

resource "oci_functions_application" "anomaly_app" {
  compartment_id = var.compartment_id
  display_name   = "anomaly-function-app"
  subnet_ids     = var.subnets
}

Terraform Notification Topic

resource "oci_ons_notification_topic" "anomaly" {
  compartment_id = var.compartment_id
  name           = "AnomalyAlerts"
}

CLI Insert Test Events

oci streaming stream message put \
  --stream-id $STREAM_OCID \
  --messages '[{"key":"1","value":"{\"id\":\"1\",\"time\":\"2025-01-01T10:00:00\",\"metric\":58}"}]'

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.