Building a Real-Time Recommendation Engine on Oracle Cloud Infrastructure (OCI) Using Generative AI & Streaming

Introduction

In many modern applications — e-commerce, media platforms, SaaS services — providing real-time personalized recommendations is a key differentiator. With OCI’s streaming, AI/ML and serverless capabilities you can build a recommendation engine that:

  • Ingests user events (clicks, views, purchases) in real time
  • Applies a generative-AI model (or fine-tuned model) to generate suggestions
  • Stores, serves, and updates recommendations frequently
  • Enables feedback loop to refine model based on real usage

In this article you’ll learn how to:

  1. Set up a streaming pipeline using OCI Streaming Service to ingest user events.
  2. Use OCI Data Science or OCI AI Services + a generative model (e.g., GPT-style) to produce recommendation outputs.
  3. Build a serving layer to deliver recommendations (via OCI Functions + API Gateway).
  4. Create the feedback loop — capturing user interactions, updating model or embeddings, automating retraining.
  5. Walk through code snippets, architectural decisions, best practices and pitfalls.

1. Architecture Overview

Here’s a high-level architecture for our recommendation engine:

  • Event Ingestion: User activities → publish to OCI Streaming (Kafka-compatible)
  • Processing Layer: A consumer application (OCI Functions or Data Flow) reads events, preprocesses, enriches with user/profile/context data (from Autonomous DB or NoSQL).
  • Model Layer: A generative model (e.g., fine-tuned GPT or embedding-based recommender) inside OCI Data Science. It takes context + user history → produces N recommendations.
  • Serving Layer: OCI API Gateway + OCI Functions deliver recommendations to front-end or mobile apps.
  • Feedback Loop: User clicks or ignores recommendations → events fed back into streaming topic → periodic retraining/refinement of model or embedding space.
  • Storage / Feature Store: Use Autonomous NoSQL DB or Autonomous Database for storing user profiles, item embeddings, transaction history.

2. Setting Up Streaming Ingestion

Create an OCI Streaming topic

oci streaming stream create \
  --compartment-id $COMPARTMENT_OCID \
  --display-name "user-event-stream" \
  --partitions 4

Produce events (example with Python)

import oci
from oci.streaming import StreamClient
from oci.streaming.models import PutMessagesDetails, Message

config = oci.config.from_file()
stream_client = StreamClient(config)
stream_id = "<your_stream_OCID>"

def send_event(user_id, item_id, event_type, timestamp):
    msg = Message(value=f"{user_id},{item_id},{event_type},{timestamp}")
    resp = stream_client.put_messages(
        put_messages_details=PutMessagesDetails(
            stream_id=stream_id,
            messages=[msg]
        )
    )
    return resp

# Example
send_event("U123", "I456", "view", "2025-10-19T10:15:00Z")

3. Model Layer: Generative/Embedding-Based Recommendations

Option A: Embedding + similarity lookup

We pre-compute embeddings for users and items (e.g., using a transformer or collaborative model) and store them in a vector database (or NoSQL). When a new event arrives, we update the user embedding (incrementally) and compute top-K similar items.

Option B: Fine-tuned generative model

We fine-tune a GPT-style model on historical user → recommendation sequences so that given “User U123 last 5 items: I234, I456, I890… context: browsing category Sports” we get suggestions like “I333, I777, I222”.

Example snippet using OCI Data Science and Python

import oci
# assume model endpoint is deployed
from some_sdk import RecommendationModelClient  

config = oci.config.from_file()
model_client = RecommendationModelClient(config)
endpoint = "<model_endpoint_url>"

def get_recommendations(user_id, recent_items, context, top_k=5):
    prompt = f"""User: {user_id}
RecentItems: {','.join(recent_items)}
Context: {context}
Provide {top_k} item IDs with reasons:"""
    response = model_client.predict(endpoint, prompt)
    recommended = response['recommendations']
    return recommended

# example
recs = get_recommendations("U123", ["I234","I456","I890"], "Looking for running shoes", 5)
print(recs)

Model deployment

  • Train/fine-tune in OCI Data Science environment
  • Deploy as a real-time endpoint (OCI Data Science Model Deployment)
  • Or optionally use OCI Functions for low-latency, light-weight inference

4. Serving Layer & Feedback Loop

Serving via API Gateway + Functions

  • Create an OCI Function getRecommendations that takes user_id & context and returns recommendations by calling the model endpoint or embedding lookup
  • Expose via OCI API Gateway for external apps

Feedback capture

  • After the user sees recommendations and either clicks, ignores or purchases, capture that as event rec_click, rec_ignore, purchase and publish it back to the streaming topic
  • Use this feedback to:
    • Incrementally update user embedding
    • Record reinforcement signal for later batch retraining

Scheduled retraining / embedding update

  • Use OCI Data Science scheduled jobs or Data Flow to run nightly or weekly batch jobs: aggregate events, update embeddings, fine-tune model
  • Example pseudo-code:
from datetime import datetime, timedelta
import pandas as pd
# fetch events last 7 days
events = load_events(start=datetime.utcnow()-timedelta(days=7))
# update embeddings, retrain model

Conclusion

Building a real-time recommendation engine on OCI, combining streaming ingestion, generative AI or embedding-based models, and serverless serving, enables you to deliver personalized experiences at scale. By capturing user behaviour in real time, serving timely recommendations, and closing the feedback loop, you shift from static “top N” lists to dynamic, context-aware suggestions. With careful architecture, you can deliver high performance, relevance, and scalability.


Power of the OCI AI
Enjoy
Osama