Modern cloud infrastructure demands continuous monitoring and automated governance to ensure configurations remain compliant with intended designs. Oracle Cloud Infrastructure Resource Manager provides powerful capabilities for implementing GitOps workflows with sophisticated drift detection and automated remediation mechanisms. This comprehensive guide explores advanced patterns for infrastructure state management, policy enforcement, and automated compliance restoration.
GitOps Architecture with OCI Resource Manager
GitOps represents a paradigm shift where Git repositories serve as the single source of truth for infrastructure definitions. OCI Resource Manager extends this concept by providing native integration with Git repositories, enabling automatic infrastructure updates triggered by code commits and sophisticated state reconciliation mechanisms.
The architecture centers around declarative infrastructure definitions stored in Git repositories, with Resource Manager continuously monitoring for changes and automatically applying updates. This approach eliminates configuration drift, ensures audit trails, and enables rapid rollback capabilities when issues arise.
Unlike traditional infrastructure management approaches, GitOps with Resource Manager provides immutable infrastructure deployments where every change goes through version control, peer review, and automated testing before reaching production environments.
Advanced Drift Detection Mechanisms
Infrastructure drift occurs when deployed resources deviate from their intended configurations due to manual changes, external automation, or service updates. OCI Resource Manager’s drift detection capabilities go beyond simple configuration comparison to provide comprehensive analysis of resource state variations.
The drift detection engine continuously compares actual resource configurations against Terraform state files, identifying discrepancies in real-time. Advanced algorithms analyze configuration changes, resource dependencies, and policy violations to provide actionable insights into infrastructure deviations.
Machine learning models enhance drift detection by establishing baseline behaviors and identifying anomalous configuration changes that might indicate security incidents or operational issues. This intelligent analysis reduces false positives while ensuring critical deviations receive immediate attention.
Production Implementation with Automated Remediation
Here’s a comprehensive implementation demonstrating GitOps workflows with advanced drift detection and automated remediation capabilities:
Terraform Configuration with Policy Enforcement
# main.tf - Infrastructure with built-in compliance checks
terraform {
required_providers {
oci = {
source = "oracle/oci"
version = "~> 5.0"
}
}
backend "remote" {
organization = "your-org"
workspaces {
name = "oci-production"
}
}
}
# Data source for compliance validation
data "oci_identity_policies" "required_policies" {
compartment_id = var.tenancy_ocid
filter {
name = "name"
values = ["security-baseline-policy"]
}
}
# Compliance validation resource
resource "null_resource" "compliance_check" {
triggers = {
always_run = timestamp()
}
provisioner "local-exec" {
command = "python3 ${path.module}/scripts/compliance_validator.py --compartment ${var.compartment_id}"
}
lifecycle {
precondition {
condition = length(data.oci_identity_policies.required_policies.policies) > 0
error_message = "Required security baseline policy not found."
}
}
}
# VCN with mandatory security configurations
resource "oci_core_vcn" "production_vcn" {
compartment_id = var.compartment_id
display_name = "production-vcn"
cidr_blocks = ["10.0.0.0/16"]
dns_label = "prodvcn"
# Mandatory tags for compliance
defined_tags = {
"Security.Classification" = "Confidential"
"Operations.Environment" = "Production"
"Compliance.Required" = "True"
}
lifecycle {
prevent_destroy = true
postcondition {
condition = contains(self.defined_tags["Security.Classification"], "Confidential")
error_message = "Production VCN must be tagged as Confidential."
}
}
}
# Security list with drift detection webhook
resource "oci_core_security_list" "production_security_list" {
compartment_id = var.compartment_id
vcn_id = oci_core_vcn.production_vcn.id
display_name = "production-security-list"
# Ingress rules with compliance requirements
dynamic "ingress_security_rules" {
for_each = var.allowed_ingress_rules
content {
protocol = ingress_security_rules.value.protocol
source = ingress_security_rules.value.source
source_type = "CIDR_BLOCK"
tcp_options {
min = ingress_security_rules.value.port_range.min
max = ingress_security_rules.value.port_range.max
}
}
}
# Egress rules with monitoring
egress_security_rules {
destination = "0.0.0.0/0"
destination_type = "CIDR_BLOCK"
protocol = "6"
tcp_options {
min = 443
max = 443
}
}
# Custom webhook for change notifications
provisioner "local-exec" {
when = create
command = "curl -X POST ${var.webhook_url} -H 'Content-Type: application/json' -d '{\"event\":\"security_list_created\",\"resource_id\":\"${self.id}\"}'"
}
lifecycle {
postcondition {
condition = length([
for rule in self.egress_security_rules : rule
if rule.destination == "0.0.0.0/0" && rule.protocol == "6"
]) <= 2
error_message = "Security list has too many permissive egress rules."
}
}
}
# Compute instance with security baseline
resource "oci_core_instance" "production_instance" {
count = var.instance_count
availability_domain = data.oci_identity_availability_domains.ads.availability_domains[count.index % 3].name
compartment_id = var.compartment_id
display_name = "prod-instance-${count.index + 1}"
shape = var.instance_shape
shape_config {
ocpus = var.instance_ocpus
memory_in_gbs = var.instance_memory
}
create_vnic_details {
subnet_id = oci_core_subnet.production_subnet.id
display_name = "primaryvnic-${count.index + 1}"
assign_public_ip = false
nsg_ids = [oci_core_network_security_group.production_nsg.id]
}
source_details {
source_type = "image"
source_id = data.oci_core_images.oracle_linux.images[0].id
}
metadata = {
ssh_authorized_keys = var.ssh_public_key
user_data = base64encode(templatefile("${path.module}/templates/cloud-init.yaml", {
monitoring_agent_config = var.monitoring_config
security_agent_config = var.security_config
}))
}
# Anti-drift configuration
defined_tags = {
"Security.Classification" = "Confidential"
"Operations.Environment" = "Production"
"Drift.Monitor" = "Enabled"
"Compliance.Baseline" = "CIS-1.0"
}
lifecycle {
postcondition {
condition = self.state == "RUNNING"
error_message = "Instance must be in RUNNING state after creation."
}
replace_triggered_by = [
oci_core_security_list.production_security_list
]
}
}
# Network Security Group with policy enforcement
resource "oci_core_network_security_group" "production_nsg" {
compartment_id = var.compartment_id
vcn_id = oci_core_vcn.production_vcn.id
display_name = "production-nsg"
defined_tags = {
"Security.Purpose" = "Application-Tier"
"Drift.Monitor" = "Enabled"
}
}
# Dynamic NSG rules based on compliance requirements
resource "oci_core_network_security_group_security_rule" "application_rules" {
for_each = {
for rule in var.security_rules : "${rule.direction}-${rule.protocol}-${rule.port}" => rule
if rule.enabled
}
network_security_group_id = oci_core_network_security_group.production_nsg.id
direction = each.value.direction
protocol = each.value.protocol
source = each.value.direction == "INGRESS" ? each.value.source : null
source_type = each.value.direction == "INGRESS" ? "CIDR_BLOCK" : null
destination = each.value.direction == "EGRESS" ? each.value.destination : null
destination_type = each.value.direction == "EGRESS" ? "CIDR_BLOCK" : null
dynamic "tcp_options" {
for_each = each.value.protocol == "6" ? [1] : []
content {
destination_port_range {
min = each.value.port
max = each.value.port
}
}
}
}
# Load balancer with health monitoring
resource "oci_load_balancer_load_balancer" "production_lb" {
compartment_id = var.compartment_id
display_name = "production-lb"
shape = "flexible"
subnet_ids = [oci_core_subnet.production_subnet.id]
shape_details {
minimum_bandwidth_in_mbps = 10
maximum_bandwidth_in_mbps = 100
}
defined_tags = {
"Security.Classification" = "Confidential"
"Drift.Monitor" = "Enabled"
"HealthCheck.Critical" = "True"
}
lifecycle {
postcondition {
condition = self.state == "ACTIVE"
error_message = "Load balancer must be in ACTIVE state."
}
}
}
# Backend set with automated health checks
resource "oci_load_balancer_backend_set" "production_backend" {
load_balancer_id = oci_load_balancer_load_balancer.production_lb.id
name = "production-backend"
policy = "ROUND_ROBIN"
health_checker {
port = 8080
protocol = "HTTP"
url_path = "/health"
interval_ms = 10000
timeout_in_millis = 3000
retries = 3
return_code = 200
}
session_persistence_configuration {
cookie_name = "X-Oracle-BMC-LBS-Route"
disable_fallback = false
}
}
# Backend instances with drift monitoring
resource "oci_load_balancer_backend" "production_backends" {
count = length(oci_core_instance.production_instance)
load_balancer_id = oci_load_balancer_load_balancer.production_lb.id
backendset_name = oci_load_balancer_backend_set.production_backend.name
ip_address = oci_core_instance.production_instance[count.index].private_ip
port = 8080
backup = false
drain = false
offline = false
weight = 1
lifecycle {
postcondition {
condition = self.health_status == "OK"
error_message = "Backend must be healthy after creation."
}
}
}
Advanced Drift Detection and Remediation Script
#!/usr/bin/env python3
"""
Advanced Infrastructure Drift Detection and Remediation System
Provides comprehensive monitoring, analysis, and automated remediation
of infrastructure configuration drift in OCI environments.
"""
import json
import logging
import asyncio
import hashlib
import difflib
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import oci
import git
import requests
import yaml
from pathlib import Path
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DriftSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class RemediationAction(Enum):
ALERT_ONLY = "alert_only"
AUTO_REMEDIATE = "auto_remediate"
MANUAL_APPROVAL = "manual_approval"
ROLLBACK = "rollback"
@dataclass
class DriftDetection:
"""Container for drift detection results"""
resource_id: str
resource_type: str
resource_name: str
expected_config: Dict[str, Any]
actual_config: Dict[str, Any]
drift_items: List[str]
severity: DriftSeverity
confidence_score: float
detected_at: datetime
remediation_action: RemediationAction = RemediationAction.ALERT_ONLY
tags: Dict[str, str] = field(default_factory=dict)
@dataclass
class RemediationResult:
"""Container for remediation operation results"""
drift_detection: DriftDetection
action_taken: str
success: bool
error_message: Optional[str] = None
execution_time: float = 0.0
rollback_info: Optional[Dict] = None
class InfrastructureDriftMonitor:
def __init__(self, config_file: str = 'drift_config.yaml'):
"""Initialize the drift monitoring system"""
self.config = self._load_config(config_file)
self.signer = oci.auth.signers.get_resource_principals_signer()
# Initialize OCI clients
self.compute_client = oci.core.ComputeClient({}, signer=self.signer)
self.network_client = oci.core.VirtualNetworkClient({}, signer=self.signer)
self.lb_client = oci.load_balancer.LoadBalancerClient({}, signer=self.signer)
self.resource_manager_client = oci.resource_manager.ResourceManagerClient({}, signer=self.signer)
# Drift detection state
self.baseline_configs = {}
self.drift_history = []
self.remediation_queue = asyncio.Queue()
# Initialize Git repository for state tracking
self.git_repo = self._initialize_git_repo()
def _load_config(self, config_file: str) -> Dict:
"""Load configuration from YAML file"""
try:
with open(config_file, 'r') as f:
return yaml.safe_load(f)
except FileNotFoundError:
logger.error(f"Configuration file {config_file} not found")
return {}
def _initialize_git_repo(self) -> git.Repo:
"""Initialize Git repository for configuration tracking"""
repo_path = self.config.get('git_repo_path', './infrastructure-state')
try:
if Path(repo_path).exists():
repo = git.Repo(repo_path)
else:
repo = git.Repo.clone_from(
self.config['git_repo_url'],
repo_path
)
return repo
except Exception as e:
logger.error(f"Failed to initialize Git repository: {str(e)}")
raise
async def discover_resources(self) -> Dict[str, List[Dict]]:
"""Discover all monitored resources in the compartment"""
compartment_id = self.config['compartment_id']
discovered_resources = {
'compute_instances': [],
'vcns': [],
'security_lists': [],
'network_security_groups': [],
'load_balancers': []
}
try:
# Discover compute instances
instances = self.compute_client.list_instances(
compartment_id=compartment_id,
lifecycle_state='RUNNING'
).data
for instance in instances:
if self._should_monitor_resource(instance):
discovered_resources['compute_instances'].append({
'id': instance.id,
'name': instance.display_name,
'type': 'compute_instance',
'config': await self._get_instance_config(instance.id)
})
# Discover VCNs
vcns = self.network_client.list_vcns(
compartment_id=compartment_id,
lifecycle_state='AVAILABLE'
).data
for vcn in vcns:
if self._should_monitor_resource(vcn):
discovered_resources['vcns'].append({
'id': vcn.id,
'name': vcn.display_name,
'type': 'vcn',
'config': await self._get_vcn_config(vcn.id)
})
# Discover Security Lists
security_lists = self.network_client.list_security_lists(
compartment_id=compartment_id,
lifecycle_state='AVAILABLE'
).data
for sl in security_lists:
if self._should_monitor_resource(sl):
discovered_resources['security_lists'].append({
'id': sl.id,
'name': sl.display_name,
'type': 'security_list',
'config': await self._get_security_list_config(sl.id)
})
# Discover Load Balancers
load_balancers = self.lb_client.list_load_balancers(
compartment_id=compartment_id,
lifecycle_state='ACTIVE'
).data
for lb in load_balancers:
if self._should_monitor_resource(lb):
discovered_resources['load_balancers'].append({
'id': lb.id,
'name': lb.display_name,
'type': 'load_balancer',
'config': await self._get_load_balancer_config(lb.id)
})
logger.info(f"Discovered {sum(len(resources) for resources in discovered_resources.values())} resources")
return discovered_resources
except Exception as e:
logger.error(f"Failed to discover resources: {str(e)}")
return discovered_resources
def _should_monitor_resource(self, resource) -> bool:
"""Determine if a resource should be monitored for drift"""
if not hasattr(resource, 'defined_tags'):
return False
defined_tags = resource.defined_tags or {}
drift_monitor = defined_tags.get('Drift', {}).get('Monitor', 'Disabled')
return drift_monitor.lower() == 'enabled'
async def _get_instance_config(self, instance_id: str) -> Dict:
"""Get detailed configuration for a compute instance"""
try:
instance = self.compute_client.get_instance(instance_id).data
# Get VNIC attachments
vnic_attachments = self.compute_client.list_vnic_attachments(
compartment_id=instance.compartment_id,
instance_id=instance_id
).data
vnics = []
for attachment in vnic_attachments:
vnic = self.network_client.get_vnic(attachment.vnic_id).data
vnics.append({
'vnic_id': vnic.id,
'subnet_id': vnic.subnet_id,
'private_ip': vnic.private_ip,
'public_ip': vnic.public_ip,
'nsg_ids': vnic.nsg_ids
})
return {
'display_name': instance.display_name,
'lifecycle_state': instance.lifecycle_state,
'availability_domain': instance.availability_domain,
'shape': instance.shape,
'shape_config': instance.shape_config.__dict__ if instance.shape_config else {},
'defined_tags': instance.defined_tags,
'freeform_tags': instance.freeform_tags,
'vnics': vnics,
'metadata': instance.metadata
}
except Exception as e:
logger.error(f"Failed to get instance config for {instance_id}: {str(e)}")
return {}
async def _get_vcn_config(self, vcn_id: str) -> Dict:
"""Get detailed configuration for a VCN"""
try:
vcn = self.network_client.get_vcn(vcn_id).data
return {
'display_name': vcn.display_name,
'cidr_blocks': vcn.cidr_blocks,
'dns_label': vcn.dns_label,
'lifecycle_state': vcn.lifecycle_state,
'defined_tags': vcn.defined_tags,
'freeform_tags': vcn.freeform_tags
}
except Exception as e:
logger.error(f"Failed to get VCN config for {vcn_id}: {str(e)}")
return {}
async def _get_security_list_config(self, security_list_id: str) -> Dict:
"""Get detailed configuration for a security list"""
try:
sl = self.network_client.get_security_list(security_list_id).data
return {
'display_name': sl.display_name,
'lifecycle_state': sl.lifecycle_state,
'ingress_security_rules': [rule.__dict__ for rule in sl.ingress_security_rules],
'egress_security_rules': [rule.__dict__ for rule in sl.egress_security_rules],
'defined_tags': sl.defined_tags,
'freeform_tags': sl.freeform_tags
}
except Exception as e:
logger.error(f"Failed to get security list config for {security_list_id}: {str(e)}")
return {}
async def _get_load_balancer_config(self, lb_id: str) -> Dict:
"""Get detailed configuration for a load balancer"""
try:
lb = self.lb_client.get_load_balancer(lb_id).data
# Get backend sets
backend_sets = {}
for name, backend_set in lb.backend_sets.items():
backend_sets[name] = {
'policy': backend_set.policy,
'health_checker': backend_set.health_checker.__dict__,
'backends': [backend.__dict__ for backend in backend_set.backends]
}
return {
'display_name': lb.display_name,
'shape_name': lb.shape_name,
'lifecycle_state': lb.lifecycle_state,
'backend_sets': backend_sets,
'listeners': {name: listener.__dict__ for name, listener in lb.listeners.items()},
'defined_tags': lb.defined_tags,
'freeform_tags': lb.freeform_tags
}
except Exception as e:
logger.error(f"Failed to get load balancer config for {lb_id}: {str(e)}")
return {}
async def detect_drift(self, current_resources: Dict[str, List[Dict]]) -> List[DriftDetection]:
"""Detect configuration drift across all monitored resources"""
detected_drifts = []
# Load baseline configurations from Git
baseline_configs = await self._load_baseline_configs()
for resource_type, resources in current_resources.items():
for resource in resources:
resource_id = resource['id']
current_config = resource['config']
# Get baseline configuration
baseline_key = f"{resource_type}:{resource_id}"
baseline_config = baseline_configs.get(baseline_key, {})
if not baseline_config:
# First time seeing this resource, establish baseline
await self._establish_baseline(resource_type, resource_id, current_config)
continue
# Compare configurations
drift_items = self._compare_configurations(baseline_config, current_config)
if drift_items:
# Calculate drift severity and confidence
severity, confidence = self._analyze_drift_severity(drift_items, resource_type)
# Determine remediation action
remediation_action = self._determine_remediation_action(
severity, resource_type, resource['config']
)
drift_detection = DriftDetection(
resource_id=resource_id,
resource_type=resource_type,
resource_name=resource.get('name', 'Unknown'),
expected_config=baseline_config,
actual_config=current_config,
drift_items=drift_items,
severity=severity,
confidence_score=confidence,
detected_at=datetime.utcnow(),
remediation_action=remediation_action,
tags=current_config.get('defined_tags', {})
)
detected_drifts.append(drift_detection)
logger.warning(f"Drift detected in {resource_type} {resource_id}: {len(drift_items)} changes")
return detected_drifts
async def _load_baseline_configs(self) -> Dict[str, Dict]:
"""Load baseline configurations from Git repository"""
try:
# Pull latest changes
self.git_repo.remotes.origin.pull()
baseline_file = Path(self.git_repo.working_dir) / 'baseline_configs.json'
if baseline_file.exists():
with open(baseline_file, 'r') as f:
return json.load(f)
else:
return {}
except Exception as e:
logger.error(f"Failed to load baseline configurations: {str(e)}")
return {}
async def _establish_baseline(self, resource_type: str, resource_id: str, config: Dict):
"""Establish baseline configuration for a new resource"""
try:
baseline_file = Path(self.git_repo.working_dir) / 'baseline_configs.json'
# Load existing baselines
if baseline_file.exists():
with open(baseline_file, 'r') as f:
baselines = json.load(f)
else:
baselines = {}
# Add new baseline
baseline_key = f"{resource_type}:{resource_id}"
baselines[baseline_key] = {
'config': config,
'established_at': datetime.utcnow().isoformat(),
'checksum': hashlib.sha256(json.dumps(config, sort_keys=True).encode()).hexdigest()
}
# Save to file
with open(baseline_file, 'w') as f:
json.dump(baselines, f, indent=2, default=str)
# Commit to Git
self.git_repo.index.add([str(baseline_file)])
self.git_repo.index.commit(f"Establish baseline for {resource_type} {resource_id}")
self.git_repo.remotes.origin.push()
logger.info(f"Established baseline for {resource_type} {resource_id}")
except Exception as e:
logger.error(f"Failed to establish baseline: {str(e)}")
def _compare_configurations(self, baseline: Dict, current: Dict) -> List[str]:
"""Compare two configurations and identify differences"""
drift_items = []
def deep_compare(base_obj, curr_obj, path=""):
if isinstance(base_obj, dict) and isinstance(curr_obj, dict):
# Compare dictionary keys
base_keys = set(base_obj.keys())
curr_keys = set(curr_obj.keys())
# Added keys
for key in curr_keys - base_keys:
drift_items.append(f"Added: {path}.{key} = {curr_obj[key]}")
# Removed keys
for key in base_keys - curr_keys:
drift_items.append(f"Removed: {path}.{key} = {base_obj[key]}")
# Changed values
for key in base_keys & curr_keys:
deep_compare(base_obj[key], curr_obj[key], f"{path}.{key}" if path else key)
elif isinstance(base_obj, list) and isinstance(curr_obj, list):
# Compare lists
if len(base_obj) != len(curr_obj):
drift_items.append(f"List size changed: {path} from {len(base_obj)} to {len(curr_obj)}")
for i, (base_item, curr_item) in enumerate(zip(base_obj, curr_obj)):
deep_compare(base_item, curr_item, f"{path}[{i}]")
else:
# Compare primitive values
if base_obj != curr_obj:
drift_items.append(f"Changed: {path} from '{base_obj}' to '{curr_obj}'")
# Exclude timestamp and volatile fields
baseline_filtered = self._filter_volatile_fields(baseline)
current_filtered = self._filter_volatile_fields(current)
deep_compare(baseline_filtered, current_filtered)
return drift_items
def _filter_volatile_fields(self, config: Dict) -> Dict:
"""Filter out volatile fields that change frequently"""
volatile_fields = {
'time_created', 'time_updated', 'etag', 'lifecycle_details',
'system_tags', 'time_maintenance_begin', 'time_maintenance_end'
}
def filter_recursive(obj):
if isinstance(obj, dict):
return {
k: filter_recursive(v)
for k, v in obj.items()
if k not in volatile_fields
}
elif isinstance(obj, list):
return [filter_recursive(item) for item in obj]
else:
return obj
return filter_recursive(config)
def _analyze_drift_severity(self, drift_items: List[str], resource_type: str) -> Tuple[DriftSeverity, float]:
"""Analyze drift severity based on changes and resource type"""
severity_scores = {
'security': 0,
'availability': 0,
'performance': 0,
'compliance': 0
}
# Analyze each drift item
for item in drift_items:
if any(keyword in item.lower() for keyword in ['security', 'ingress', 'egress', 'port', 'protocol']):
severity_scores['security'] += 10
if any(keyword in item.lower() for keyword in ['availability_domain', 'fault_domain', 'backup']):
severity_scores['availability'] += 8
if any(keyword in item.lower() for keyword in ['shape', 'cpu', 'memory', 'bandwidth']):
severity_scores['performance'] += 6
if any(keyword in item.lower() for keyword in ['tag', 'compliance', 'classification']):
severity_scores['compliance'] += 7
# Calculate overall severity
total_score = sum(severity_scores.values())
confidence = min(len(drift_items) * 0.1, 1.0)
if total_score >= 30 or severity_scores['security'] >= 20:
return DriftSeverity.CRITICAL, confidence
elif total_score >= 20:
return DriftSeverity.HIGH, confidence
elif total_score >= 10:
return DriftSeverity.MEDIUM, confidence
else:
return DriftSeverity.LOW, confidence
def _determine_remediation_action(self, severity: DriftSeverity, resource_type: str, config: Dict) -> RemediationAction:
"""Determine appropriate remediation action based on severity and resource type"""
tags = config.get('defined_tags', {})
auto_remediate = tags.get('Drift', {}).get('AutoRemediate', 'False').lower() == 'true'
if severity == DriftSeverity.CRITICAL:
if auto_remediate and resource_type in ['security_list', 'network_security_group']:
return RemediationAction.AUTO_REMEDIATE
else:
return RemediationAction.MANUAL_APPROVAL
elif severity == DriftSeverity.HIGH:
if auto_remediate:
return RemediationAction.AUTO_REMEDIATE
else:
return RemediationAction.MANUAL_APPROVAL
elif severity == DriftSeverity.MEDIUM:
return RemediationAction.MANUAL_APPROVAL
else:
return RemediationAction.ALERT_ONLY
async def remediate_drift(self, drift_detections: List[DriftDetection]) -> List[RemediationResult]:
"""Execute remediation actions for detected drift"""
remediation_results = []
for drift in drift_detections:
start_time = datetime.utcnow()
try:
if drift.remediation_action == RemediationAction.AUTO_REMEDIATE:
result = await self._auto_remediate_drift(drift)
elif drift.remediation_action == RemediationAction.MANUAL_APPROVAL:
result = await self._request_manual_approval(drift)
elif drift.remediation_action == RemediationAction.ROLLBACK:
result = await self._rollback_changes(drift)
else: # ALERT_ONLY
result = await self._send_drift_alert(drift)
execution_time = (datetime.utcnow() - start_time).total_seconds()
result.execution_time = execution_time
remediation_results.append(result)
except Exception as e:
logger.error(f"Remediation failed for {drift.resource_id}: {str(e)}")
remediation_results.append(RemediationResult(
drift_detection=drift,
action_taken="remediation_failed",
success=False,
error_message=str(e),
execution_time=(datetime.utcnow() - start_time).total_seconds()
))
return remediation_results
async def _auto_remediate_drift(self, drift: DriftDetection) -> RemediationResult:
"""Automatically remediate detected drift"""
try:
# Create backup before remediation
backup_info = await self._create_resource_backup(drift.resource_id, drift.resource_type)
# Apply expected configuration
if drift.resource_type == 'security_list':
success = await self._remediate_security_list(drift)
elif drift.resource_type == 'network_security_group':
success = await self._remediate_network_security_group(drift)
elif drift.resource_type == 'load_balancer':
success = await self._remediate_load_balancer(drift)
else:
success = False
raise NotImplementedError(f"Auto-remediation not implemented for {drift.resource_type}")
if success:
# Update baseline configuration
await self._update_baseline_config(drift.resource_id, drift.resource_type, drift.expected_config)
# Send success notification
await self._send_remediation_notification(drift, "success")
return RemediationResult(
drift_detection=drift,
action_taken="auto_remediated",
success=success,
rollback_info=backup_info
)
except Exception as e:
return RemediationResult(
drift_detection=drift,
action_taken="auto_remediation_failed",
success=False,
error_message=str(e)
)
async def _remediate_security_list(self, drift: DriftDetection) -> bool:
"""Remediate security list configuration drift"""
try:
security_list_id = drift.resource_id
expected_config = drift.expected_config
# Prepare update details
update_details = oci.core.models.UpdateSecurityListDetails(
display_name=expected_config.get('display_name'),
ingress_security_rules=[
oci.core.models.IngressSecurityRule(**rule)
for rule in expected_config.get('ingress_security_rules', [])
],
egress_security_rules=[
oci.core.models.EgressSecurityRule(**rule)
for rule in expected_config.get('egress_security_rules', [])
],
defined_tags=expected_config.get('defined_tags'),
freeform_tags=expected_config.get('freeform_tags')
)
# Update security list
response = self.network_client.update_security_list(
security_list_id=security_list_id,
update_security_list_details=update_details
)
# Wait for update to complete
oci.wait_until(
self.network_client,
self.network_client.get_security_list(security_list_id),
'lifecycle_state',
'AVAILABLE'
)
logger.info(f"Successfully remediated security list {security_list_id}")
return True
except Exception as e:
logger.error(f"Failed to remediate security list {drift.resource_id}: {str(e)}")
return False
async def _create_resource_backup(self, resource_id: str, resource_type: str) -> Dict:
"""Create backup of current resource configuration before remediation"""
try:
backup_info = {
'resource_id': resource_id,
'resource_type': resource_type,
'backup_time': datetime.utcnow().isoformat(),
'backup_id': f"backup-{resource_id}-{int(datetime.utcnow().timestamp())}"
}
# Get current configuration
if resource_type == 'security_list':
current_config = await self._get_security_list_config(resource_id)
elif resource_type == 'network_security_group':
# Implementation for NSG backup
current_config = {}
else:
current_config = {}
backup_info['configuration'] = current_config
# Store backup in Git repository
backup_file = Path(self.git_repo.working_dir) / 'backups' / f"{backup_info['backup_id']}.json"
backup_file.parent.mkdir(exist_ok=True)
with open(backup_file, 'w') as f:
json.dump(backup_info, f, indent=2, default=str)
# Commit backup to Git
self.git_repo.index.add([str(backup_file)])
self.git_repo.index.commit(f"Backup {resource_type} {resource_id} before remediation")
return backup_info
except Exception as e:
logger.error(f"Failed to create backup for {resource_id}: {str(e)}")
return {}
async def _send_drift_alert(self, drift: DriftDetection) -> RemediationResult:
"""Send drift detection alert"""
try:
alert_payload = {
'event_type': 'infrastructure_drift_detected',
'severity': drift.severity.value,
'resource_id': drift.resource_id,
'resource_type': drift.resource_type,
'resource_name': drift.resource_name,
'drift_count': len(drift.drift_items),
'confidence_score': drift.confidence_score,
'detected_at': drift.detected_at.isoformat(),
'drift_details': drift.drift_items[:10], # Limit details
'remediation_action': drift.remediation_action.value
}
# Send to webhook if configured
webhook_url = self.config.get('alert_webhook_url')
if webhook_url:
response = requests.post(
webhook_url,
json=alert_payload,
timeout=30
)
response.raise_for_status()
# Send to OCI Notifications if configured
notification_topic = self.config.get('notification_topic_ocid')
if notification_topic:
ons_client = oci.ons.NotificationDataPlaneClient({}, signer=self.signer)
message_details = oci.ons.models.MessageDetails(
body=json.dumps(alert_payload, indent=2),
title=f"Infrastructure Drift Detected - {drift.severity.value.upper()}"
)
ons_client.publish_message(
topic_id=notification_topic,
message_details=message_details
)
logger.info(f"Drift alert sent for {drift.resource_id}")
return RemediationResult(
drift_detection=drift,
action_taken="alert_sent",
success=True
)
except Exception as e:
return RemediationResult(
drift_detection=drift,
action_taken="alert_failed",
success=False,
error_message=str(e)
)
async def generate_drift_report(self, drift_detections: List[DriftDetection],
remediation_results: List[RemediationResult]) -> str:
"""Generate comprehensive drift detection and remediation report"""
report_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
# Statistics
total_drifts = len(drift_detections)
critical_drifts = len([d for d in drift_detections if d.severity == DriftSeverity.CRITICAL])
high_drifts = len([d for d in drift_detections if d.severity == DriftSeverity.HIGH])
auto_remediated = len([r for r in remediation_results if r.action_taken == "auto_remediated" and r.success])
report = f"""
# Infrastructure Drift Detection Report
**Generated:** {report_time}
## Executive Summary
- **Total Drift Detections:** {total_drifts}
- **Critical Severity:** {critical_drifts}
- **High Severity:** {high_drifts}
- **Successfully Auto-Remediated:** {auto_remediated}
- **Detection Accuracy:** {sum(d.confidence_score for d in drift_detections) / max(total_drifts, 1):.2%}
## Drift Analysis by Resource Type
"""
# Group by resource type
drift_by_type = {}
for drift in drift_detections:
if drift.resource_type not in drift_by_type:
drift_by_type[drift.resource_type] = []
drift_by_type[drift.resource_type].append(drift)
for resource_type, drifts in drift_by_type.items():
report += f"""
### {resource_type.replace('_', ' ').title()}
- **Count:** {len(drifts)}
- **Average Confidence:** {sum(d.confidence_score for d in drifts) / len(drifts):.2%}
- **Severity Distribution:**
- Critical: {len([d for d in drifts if d.severity == DriftSeverity.CRITICAL])}
- High: {len([d for d in drifts if d.severity == DriftSeverity.HIGH])}
- Medium: {len([d for d in drifts if d.severity == DriftSeverity.MEDIUM])}
- Low: {len([d for d in drifts if d.severity == DriftSeverity.LOW])}
"""
# Detailed drift information
if drift_detections:
report += "\n## Detailed Drift Analysis\n"
for drift in sorted(drift_detections, key=lambda x: x.severity.value, reverse=True)[:20]:
report += f"""
### {drift.resource_name} ({drift.resource_type})
- **Severity:** {drift.severity.value.upper()}
- **Confidence:** {drift.confidence_score:.2%}
- **Remediation Action:** {drift.remediation_action.value}
- **Changes Detected:** {len(drift.drift_items)}
**Key Changes:**
"""
for change in drift.drift_items[:5]: # Show top 5 changes
report += f"- {change}\n"
if len(drift.drift_items) > 5:
report += f"- ... and {len(drift.drift_items) - 5} more changes\n"
# Remediation results
if remediation_results:
report += "\n## Remediation Results\n"
successful_remediations = [r for r in remediation_results if r.success]
failed_remediations = [r for r in remediation_results if not r.success]
report += f"""
- **Successful Actions:** {len(successful_remediations)}
- **Failed Actions:** {len(failed_remediations)}
- **Average Execution Time:** {sum(r.execution_time for r in remediation_results) / max(len(remediation_results), 1):.2f} seconds
"""
if failed_remediations:
report += "\n### Failed Remediations\n"
for result in failed_remediations:
report += f"""
- **Resource:** {result.drift_detection.resource_name}
- **Action:** {result.action_taken}
- **Error:** {result.error_message}
"""
# Recommendations
report += f"""
## Recommendations
### Immediate Actions Required
"""
critical_items = [d for d in drift_detections if d.severity == DriftSeverity.CRITICAL]
if critical_items:
report += "- **Critical drift detected** - Review and remediate immediately\n"
for item in critical_items[:3]:
report += f" - {item.resource_name}: {len(item.drift_items)} critical changes\n"
report += f"""
### Process Improvements
- Enable auto-remediation for {len([d for d in drift_detections if d.remediation_action == RemediationAction.MANUAL_APPROVAL])} resources with manual approval requirements
- Review baseline configurations for {len([d for d in drift_detections if d.confidence_score < 0.7])} resources with low confidence scores
- Implement preventive controls for {len(drift_by_type.get('security_list', []))} security list changes
### Monitoring Enhancements
- Increase monitoring frequency for critical resources
- Implement real-time alerting for security-related changes
- Establish automated testing for configuration changes
"""
return report
# GitOps Integration Functions
async def setup_gitops_pipeline():
"""Set up GitOps pipeline for continuous infrastructure monitoring"""
pipeline_config = {
'git_repo': 'https://github.com/your-org/infrastructure-config.git',
'branch': 'main',
'polling_interval': 300, # 5 minutes
'webhook_url': 'https://your-webhook-endpoint.com/drift-alerts',
'auto_remediation_enabled': True,
'notification_topic': 'ocid1.onstopic.oc1..example'
}
# Initialize monitoring system
monitor = InfrastructureDriftMonitor('drift_config.yaml')
while True:
try:
logger.info("Starting drift detection cycle...")
# Discover current infrastructure
current_resources = await monitor.discover_resources()
# Detect drift
drift_detections = await monitor.detect_drift(current_resources)
if drift_detections:
logger.warning(f"Detected {len(drift_detections)} configuration drifts")
# Execute remediation
remediation_results = await monitor.remediate_drift(drift_detections)
# Generate and save report
report = await monitor.generate_drift_report(drift_detections, remediation_results)
# Save report to Git repository
report_file = Path(monitor.git_repo.working_dir) / 'reports' / f"drift_report_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.md"
report_file.parent.mkdir(exist_ok=True)
with open(report_file, 'w') as f:
f.write(report)
# Commit report
monitor.git_repo.index.add([str(report_file)])
monitor.git_repo.index.commit(f"Drift detection report - {len(drift_detections)} issues found")
monitor.git_repo.remotes.origin.push()
else:
logger.info("No configuration drift detected")
# Wait for next cycle
await asyncio.sleep(pipeline_config['polling_interval'])
except Exception as e:
logger.error(f"GitOps pipeline error: {str(e)}")
await asyncio.sleep(60) # Wait before retrying
if __name__ == "__main__":
asyncio.run(setup_gitops_pipeline())
Advanced GitOps implementations require policy enforcement mechanisms that prevent configuration drift before it occurs. OCI Resource Manager integrates with Open Policy Agent (OPA) to provide policy-as-code capabilities that validate infrastructure changes against organizational standards.
Policy definitions stored in Git repositories enable version-controlled governance rules that automatically reject non-compliant infrastructure changes. These policies can enforce security baselines, cost optimization requirements, and operational standards across all infrastructure deployments.
The integration supports both admission control policies that prevent problematic changes and monitoring policies that detect violations in existing infrastructure. This dual approach ensures comprehensive coverage while maintaining operational flexibility.
Regards
Osama