Implementing GitOps with OCI Resource Manager: Advanced Infrastructure Drift Detection and Automated Remediation

Modern cloud infrastructure demands continuous monitoring and automated governance to ensure configurations remain compliant with intended designs. Oracle Cloud Infrastructure Resource Manager provides powerful capabilities for implementing GitOps workflows with sophisticated drift detection and automated remediation mechanisms. This comprehensive guide explores advanced patterns for infrastructure state management, policy enforcement, and automated compliance restoration.

GitOps Architecture with OCI Resource Manager

GitOps represents a paradigm shift where Git repositories serve as the single source of truth for infrastructure definitions. OCI Resource Manager extends this concept by providing native integration with Git repositories, enabling automatic infrastructure updates triggered by code commits and sophisticated state reconciliation mechanisms.

The architecture centers around declarative infrastructure definitions stored in Git repositories, with Resource Manager continuously monitoring for changes and automatically applying updates. This approach eliminates configuration drift, ensures audit trails, and enables rapid rollback capabilities when issues arise.

Unlike traditional infrastructure management approaches, GitOps with Resource Manager provides immutable infrastructure deployments where every change goes through version control, peer review, and automated testing before reaching production environments.

Advanced Drift Detection Mechanisms

Infrastructure drift occurs when deployed resources deviate from their intended configurations due to manual changes, external automation, or service updates. OCI Resource Manager’s drift detection capabilities go beyond simple configuration comparison to provide comprehensive analysis of resource state variations.

The drift detection engine continuously compares actual resource configurations against Terraform state files, identifying discrepancies in real-time. Advanced algorithms analyze configuration changes, resource dependencies, and policy violations to provide actionable insights into infrastructure deviations.

Machine learning models enhance drift detection by establishing baseline behaviors and identifying anomalous configuration changes that might indicate security incidents or operational issues. This intelligent analysis reduces false positives while ensuring critical deviations receive immediate attention.

Production Implementation with Automated Remediation

Here’s a comprehensive implementation demonstrating GitOps workflows with advanced drift detection and automated remediation capabilities:

Terraform Configuration with Policy Enforcement





# main.tf - Infrastructure with built-in compliance checks
terraform {
  required_providers {
    oci = {
      source  = "oracle/oci"
      version = "~> 5.0"
    }
  }
  
  backend "remote" {
    organization = "your-org"
    workspaces {
      name = "oci-production"
    }
  }
}

# Data source for compliance validation
data "oci_identity_policies" "required_policies" {
  compartment_id = var.tenancy_ocid
  
  filter {
    name   = "name"
    values = ["security-baseline-policy"]
  }
}

# Compliance validation resource
resource "null_resource" "compliance_check" {
  triggers = {
    always_run = timestamp()
  }
  
  provisioner "local-exec" {
    command = "python3 ${path.module}/scripts/compliance_validator.py --compartment ${var.compartment_id}"
  }
  
  lifecycle {
    precondition {
      condition     = length(data.oci_identity_policies.required_policies.policies) > 0
      error_message = "Required security baseline policy not found."
    }
  }
}

# VCN with mandatory security configurations
resource "oci_core_vcn" "production_vcn" {
  compartment_id = var.compartment_id
  display_name   = "production-vcn"
  cidr_blocks    = ["10.0.0.0/16"]
  dns_label      = "prodvcn"
  
  # Mandatory tags for compliance
  defined_tags = {
    "Security.Classification" = "Confidential"
    "Operations.Environment"  = "Production"
    "Compliance.Required"     = "True"
  }
  
  lifecycle {
    prevent_destroy = true
    
    postcondition {
      condition     = contains(self.defined_tags["Security.Classification"], "Confidential")
      error_message = "Production VCN must be tagged as Confidential."
    }
  }
}

# Security list with drift detection webhook
resource "oci_core_security_list" "production_security_list" {
  compartment_id = var.compartment_id
  vcn_id        = oci_core_vcn.production_vcn.id
  display_name  = "production-security-list"
  
  # Ingress rules with compliance requirements
  dynamic "ingress_security_rules" {
    for_each = var.allowed_ingress_rules
    content {
      protocol    = ingress_security_rules.value.protocol
      source      = ingress_security_rules.value.source
      source_type = "CIDR_BLOCK"
      
      tcp_options {
        min = ingress_security_rules.value.port_range.min
        max = ingress_security_rules.value.port_range.max
      }
    }
  }
  
  # Egress rules with monitoring
  egress_security_rules {
    destination      = "0.0.0.0/0"
    destination_type = "CIDR_BLOCK"
    protocol        = "6"
    
    tcp_options {
      min = 443
      max = 443
    }
  }
  
  # Custom webhook for change notifications
  provisioner "local-exec" {
    when    = create
    command = "curl -X POST ${var.webhook_url} -H 'Content-Type: application/json' -d '{\"event\":\"security_list_created\",\"resource_id\":\"${self.id}\"}'"
  }
  
  lifecycle {
    postcondition {
      condition = length([
        for rule in self.egress_security_rules : rule
        if rule.destination == "0.0.0.0/0" && rule.protocol == "6"
      ]) <= 2
      error_message = "Security list has too many permissive egress rules."
    }
  }
}

# Compute instance with security baseline
resource "oci_core_instance" "production_instance" {
  count               = var.instance_count
  availability_domain = data.oci_identity_availability_domains.ads.availability_domains[count.index % 3].name
  compartment_id     = var.compartment_id
  display_name       = "prod-instance-${count.index + 1}"
  shape             = var.instance_shape
  
  shape_config {
    ocpus         = var.instance_ocpus
    memory_in_gbs = var.instance_memory
  }
  
  create_vnic_details {
    subnet_id        = oci_core_subnet.production_subnet.id
    display_name     = "primaryvnic-${count.index + 1}"
    assign_public_ip = false
    nsg_ids         = [oci_core_network_security_group.production_nsg.id]
  }
  
  source_details {
    source_type = "image"
    source_id   = data.oci_core_images.oracle_linux.images[0].id
  }
  
  metadata = {
    ssh_authorized_keys = var.ssh_public_key
    user_data = base64encode(templatefile("${path.module}/templates/cloud-init.yaml", {
      monitoring_agent_config = var.monitoring_config
      security_agent_config   = var.security_config
    }))
  }
  
  # Anti-drift configuration
  defined_tags = {
    "Security.Classification" = "Confidential"
    "Operations.Environment"  = "Production"
    "Drift.Monitor"          = "Enabled"
    "Compliance.Baseline"    = "CIS-1.0"
  }
  
  lifecycle {
    postcondition {
      condition     = self.state == "RUNNING"
      error_message = "Instance must be in RUNNING state after creation."
    }
    
    replace_triggered_by = [
      oci_core_security_list.production_security_list
    ]
  }
}

# Network Security Group with policy enforcement
resource "oci_core_network_security_group" "production_nsg" {
  compartment_id = var.compartment_id
  vcn_id        = oci_core_vcn.production_vcn.id
  display_name  = "production-nsg"
  
  defined_tags = {
    "Security.Purpose" = "Application-Tier"
    "Drift.Monitor"   = "Enabled"
  }
}

# Dynamic NSG rules based on compliance requirements
resource "oci_core_network_security_group_security_rule" "application_rules" {
  for_each = {
    for rule in var.security_rules : "${rule.direction}-${rule.protocol}-${rule.port}" => rule
    if rule.enabled
  }
  
  network_security_group_id = oci_core_network_security_group.production_nsg.id
  direction                 = each.value.direction
  protocol                  = each.value.protocol
  
  source      = each.value.direction == "INGRESS" ? each.value.source : null
  source_type = each.value.direction == "INGRESS" ? "CIDR_BLOCK" : null
  
  destination      = each.value.direction == "EGRESS" ? each.value.destination : null
  destination_type = each.value.direction == "EGRESS" ? "CIDR_BLOCK" : null
  
  dynamic "tcp_options" {
    for_each = each.value.protocol == "6" ? [1] : []
    content {
      destination_port_range {
        min = each.value.port
        max = each.value.port
      }
    }
  }
}

# Load balancer with health monitoring
resource "oci_load_balancer_load_balancer" "production_lb" {
  compartment_id = var.compartment_id
  display_name   = "production-lb"
  shape         = "flexible"
  subnet_ids    = [oci_core_subnet.production_subnet.id]
  
  shape_details {
    minimum_bandwidth_in_mbps = 10
    maximum_bandwidth_in_mbps = 100
  }
  
  defined_tags = {
    "Security.Classification" = "Confidential"
    "Drift.Monitor"          = "Enabled"
    "HealthCheck.Critical"   = "True"
  }
  
  lifecycle {
    postcondition {
      condition     = self.state == "ACTIVE"
      error_message = "Load balancer must be in ACTIVE state."
    }
  }
}

# Backend set with automated health checks
resource "oci_load_balancer_backend_set" "production_backend" {
  load_balancer_id = oci_load_balancer_load_balancer.production_lb.id
  name            = "production-backend"
  policy          = "ROUND_ROBIN"
  
  health_checker {
    port                = 8080
    protocol           = "HTTP"
    url_path           = "/health"
    interval_ms        = 10000
    timeout_in_millis  = 3000
    retries            = 3
    return_code        = 200
  }
  
  session_persistence_configuration {
    cookie_name      = "X-Oracle-BMC-LBS-Route"
    disable_fallback = false
  }
}

# Backend instances with drift monitoring
resource "oci_load_balancer_backend" "production_backends" {
  count            = length(oci_core_instance.production_instance)
  load_balancer_id = oci_load_balancer_load_balancer.production_lb.id
  backendset_name  = oci_load_balancer_backend_set.production_backend.name
  ip_address      = oci_core_instance.production_instance[count.index].private_ip
  port            = 8080
  backup          = false
  drain           = false
  offline         = false
  weight          = 1
  
  lifecycle {
    postcondition {
      condition     = self.health_status == "OK"
      error_message = "Backend must be healthy after creation."
    }
  }
}

Advanced Drift Detection and Remediation Script

#!/usr/bin/env python3
"""
Advanced Infrastructure Drift Detection and Remediation System
Provides comprehensive monitoring, analysis, and automated remediation
of infrastructure configuration drift in OCI environments.
"""

import json
import logging
import asyncio
import hashlib
import difflib
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import oci
import git
import requests
import yaml
from pathlib import Path

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class DriftSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium" 
    HIGH = "high"
    CRITICAL = "critical"

class RemediationAction(Enum):
    ALERT_ONLY = "alert_only"
    AUTO_REMEDIATE = "auto_remediate"
    MANUAL_APPROVAL = "manual_approval"
    ROLLBACK = "rollback"

@dataclass
class DriftDetection:
    """Container for drift detection results"""
    resource_id: str
    resource_type: str
    resource_name: str
    expected_config: Dict[str, Any]
    actual_config: Dict[str, Any]
    drift_items: List[str]
    severity: DriftSeverity
    confidence_score: float
    detected_at: datetime
    remediation_action: RemediationAction = RemediationAction.ALERT_ONLY
    tags: Dict[str, str] = field(default_factory=dict)

@dataclass
class RemediationResult:
    """Container for remediation operation results"""
    drift_detection: DriftDetection
    action_taken: str
    success: bool
    error_message: Optional[str] = None
    execution_time: float = 0.0
    rollback_info: Optional[Dict] = None

class InfrastructureDriftMonitor:
    def __init__(self, config_file: str = 'drift_config.yaml'):
        """Initialize the drift monitoring system"""
        self.config = self._load_config(config_file)
        self.signer = oci.auth.signers.get_resource_principals_signer()
        
        # Initialize OCI clients
        self.compute_client = oci.core.ComputeClient({}, signer=self.signer)
        self.network_client = oci.core.VirtualNetworkClient({}, signer=self.signer)
        self.lb_client = oci.load_balancer.LoadBalancerClient({}, signer=self.signer)
        self.resource_manager_client = oci.resource_manager.ResourceManagerClient({}, signer=self.signer)
        
        # Drift detection state
        self.baseline_configs = {}
        self.drift_history = []
        self.remediation_queue = asyncio.Queue()
        
        # Initialize Git repository for state tracking
        self.git_repo = self._initialize_git_repo()
        
    def _load_config(self, config_file: str) -> Dict:
        """Load configuration from YAML file"""
        try:
            with open(config_file, 'r') as f:
                return yaml.safe_load(f)
        except FileNotFoundError:
            logger.error(f"Configuration file {config_file} not found")
            return {}

    def _initialize_git_repo(self) -> git.Repo:
        """Initialize Git repository for configuration tracking"""
        repo_path = self.config.get('git_repo_path', './infrastructure-state')
        
        try:
            if Path(repo_path).exists():
                repo = git.Repo(repo_path)
            else:
                repo = git.Repo.clone_from(
                    self.config['git_repo_url'], 
                    repo_path
                )
            
            return repo
        except Exception as e:
            logger.error(f"Failed to initialize Git repository: {str(e)}")
            raise

    async def discover_resources(self) -> Dict[str, List[Dict]]:
        """Discover all monitored resources in the compartment"""
        compartment_id = self.config['compartment_id']
        discovered_resources = {
            'compute_instances': [],
            'vcns': [],
            'security_lists': [],
            'network_security_groups': [],
            'load_balancers': []
        }
        
        try:
            # Discover compute instances
            instances = self.compute_client.list_instances(
                compartment_id=compartment_id,
                lifecycle_state='RUNNING'
            ).data
            
            for instance in instances:
                if self._should_monitor_resource(instance):
                    discovered_resources['compute_instances'].append({
                        'id': instance.id,
                        'name': instance.display_name,
                        'type': 'compute_instance',
                        'config': await self._get_instance_config(instance.id)
                    })
            
            # Discover VCNs
            vcns = self.network_client.list_vcns(
                compartment_id=compartment_id,
                lifecycle_state='AVAILABLE'
            ).data
            
            for vcn in vcns:
                if self._should_monitor_resource(vcn):
                    discovered_resources['vcns'].append({
                        'id': vcn.id,
                        'name': vcn.display_name,
                        'type': 'vcn',
                        'config': await self._get_vcn_config(vcn.id)
                    })
            
            # Discover Security Lists
            security_lists = self.network_client.list_security_lists(
                compartment_id=compartment_id,
                lifecycle_state='AVAILABLE'
            ).data
            
            for sl in security_lists:
                if self._should_monitor_resource(sl):
                    discovered_resources['security_lists'].append({
                        'id': sl.id,
                        'name': sl.display_name,
                        'type': 'security_list',
                        'config': await self._get_security_list_config(sl.id)
                    })
            
            # Discover Load Balancers
            load_balancers = self.lb_client.list_load_balancers(
                compartment_id=compartment_id,
                lifecycle_state='ACTIVE'
            ).data
            
            for lb in load_balancers:
                if self._should_monitor_resource(lb):
                    discovered_resources['load_balancers'].append({
                        'id': lb.id,
                        'name': lb.display_name,
                        'type': 'load_balancer',
                        'config': await self._get_load_balancer_config(lb.id)
                    })
            
            logger.info(f"Discovered {sum(len(resources) for resources in discovered_resources.values())} resources")
            return discovered_resources
            
        except Exception as e:
            logger.error(f"Failed to discover resources: {str(e)}")
            return discovered_resources

    def _should_monitor_resource(self, resource) -> bool:
        """Determine if a resource should be monitored for drift"""
        if not hasattr(resource, 'defined_tags'):
            return False
        
        defined_tags = resource.defined_tags or {}
        drift_monitor = defined_tags.get('Drift', {}).get('Monitor', 'Disabled')
        
        return drift_monitor.lower() == 'enabled'

    async def _get_instance_config(self, instance_id: str) -> Dict:
        """Get detailed configuration for a compute instance"""
        try:
            instance = self.compute_client.get_instance(instance_id).data
            
            # Get VNIC attachments
            vnic_attachments = self.compute_client.list_vnic_attachments(
                compartment_id=instance.compartment_id,
                instance_id=instance_id
            ).data
            
            vnics = []
            for attachment in vnic_attachments:
                vnic = self.network_client.get_vnic(attachment.vnic_id).data
                vnics.append({
                    'vnic_id': vnic.id,
                    'subnet_id': vnic.subnet_id,
                    'private_ip': vnic.private_ip,
                    'public_ip': vnic.public_ip,
                    'nsg_ids': vnic.nsg_ids
                })
            
            return {
                'display_name': instance.display_name,
                'lifecycle_state': instance.lifecycle_state,
                'availability_domain': instance.availability_domain,
                'shape': instance.shape,
                'shape_config': instance.shape_config.__dict__ if instance.shape_config else {},
                'defined_tags': instance.defined_tags,
                'freeform_tags': instance.freeform_tags,
                'vnics': vnics,
                'metadata': instance.metadata
            }
            
        except Exception as e:
            logger.error(f"Failed to get instance config for {instance_id}: {str(e)}")
            return {}

    async def _get_vcn_config(self, vcn_id: str) -> Dict:
        """Get detailed configuration for a VCN"""
        try:
            vcn = self.network_client.get_vcn(vcn_id).data
            
            return {
                'display_name': vcn.display_name,
                'cidr_blocks': vcn.cidr_blocks,
                'dns_label': vcn.dns_label,
                'lifecycle_state': vcn.lifecycle_state,
                'defined_tags': vcn.defined_tags,
                'freeform_tags': vcn.freeform_tags
            }
            
        except Exception as e:
            logger.error(f"Failed to get VCN config for {vcn_id}: {str(e)}")
            return {}

    async def _get_security_list_config(self, security_list_id: str) -> Dict:
        """Get detailed configuration for a security list"""
        try:
            sl = self.network_client.get_security_list(security_list_id).data
            
            return {
                'display_name': sl.display_name,
                'lifecycle_state': sl.lifecycle_state,
                'ingress_security_rules': [rule.__dict__ for rule in sl.ingress_security_rules],
                'egress_security_rules': [rule.__dict__ for rule in sl.egress_security_rules],
                'defined_tags': sl.defined_tags,
                'freeform_tags': sl.freeform_tags
            }
            
        except Exception as e:
            logger.error(f"Failed to get security list config for {security_list_id}: {str(e)}")
            return {}

    async def _get_load_balancer_config(self, lb_id: str) -> Dict:
        """Get detailed configuration for a load balancer"""
        try:
            lb = self.lb_client.get_load_balancer(lb_id).data
            
            # Get backend sets
            backend_sets = {}
            for name, backend_set in lb.backend_sets.items():
                backend_sets[name] = {
                    'policy': backend_set.policy,
                    'health_checker': backend_set.health_checker.__dict__,
                    'backends': [backend.__dict__ for backend in backend_set.backends]
                }
            
            return {
                'display_name': lb.display_name,
                'shape_name': lb.shape_name,
                'lifecycle_state': lb.lifecycle_state,
                'backend_sets': backend_sets,
                'listeners': {name: listener.__dict__ for name, listener in lb.listeners.items()},
                'defined_tags': lb.defined_tags,
                'freeform_tags': lb.freeform_tags
            }
            
        except Exception as e:
            logger.error(f"Failed to get load balancer config for {lb_id}: {str(e)}")
            return {}

    async def detect_drift(self, current_resources: Dict[str, List[Dict]]) -> List[DriftDetection]:
        """Detect configuration drift across all monitored resources"""
        detected_drifts = []
        
        # Load baseline configurations from Git
        baseline_configs = await self._load_baseline_configs()
        
        for resource_type, resources in current_resources.items():
            for resource in resources:
                resource_id = resource['id']
                current_config = resource['config']
                
                # Get baseline configuration
                baseline_key = f"{resource_type}:{resource_id}"
                baseline_config = baseline_configs.get(baseline_key, {})
                
                if not baseline_config:
                    # First time seeing this resource, establish baseline
                    await self._establish_baseline(resource_type, resource_id, current_config)
                    continue
                
                # Compare configurations
                drift_items = self._compare_configurations(baseline_config, current_config)
                
                if drift_items:
                    # Calculate drift severity and confidence
                    severity, confidence = self._analyze_drift_severity(drift_items, resource_type)
                    
                    # Determine remediation action
                    remediation_action = self._determine_remediation_action(
                        severity, resource_type, resource['config']
                    )
                    
                    drift_detection = DriftDetection(
                        resource_id=resource_id,
                        resource_type=resource_type,
                        resource_name=resource.get('name', 'Unknown'),
                        expected_config=baseline_config,
                        actual_config=current_config,
                        drift_items=drift_items,
                        severity=severity,
                        confidence_score=confidence,
                        detected_at=datetime.utcnow(),
                        remediation_action=remediation_action,
                        tags=current_config.get('defined_tags', {})
                    )
                    
                    detected_drifts.append(drift_detection)
                    logger.warning(f"Drift detected in {resource_type} {resource_id}: {len(drift_items)} changes")
        
        return detected_drifts

    async def _load_baseline_configs(self) -> Dict[str, Dict]:
        """Load baseline configurations from Git repository"""
        try:
            # Pull latest changes
            self.git_repo.remotes.origin.pull()
            
            baseline_file = Path(self.git_repo.working_dir) / 'baseline_configs.json'
            
            if baseline_file.exists():
                with open(baseline_file, 'r') as f:
                    return json.load(f)
            else:
                return {}
                
        except Exception as e:
            logger.error(f"Failed to load baseline configurations: {str(e)}")
            return {}

    async def _establish_baseline(self, resource_type: str, resource_id: str, config: Dict):
        """Establish baseline configuration for a new resource"""
        try:
            baseline_file = Path(self.git_repo.working_dir) / 'baseline_configs.json'
            
            # Load existing baselines
            if baseline_file.exists():
                with open(baseline_file, 'r') as f:
                    baselines = json.load(f)
            else:
                baselines = {}
            
            # Add new baseline
            baseline_key = f"{resource_type}:{resource_id}"
            baselines[baseline_key] = {
                'config': config,
                'established_at': datetime.utcnow().isoformat(),
                'checksum': hashlib.sha256(json.dumps(config, sort_keys=True).encode()).hexdigest()
            }
            
            # Save to file
            with open(baseline_file, 'w') as f:
                json.dump(baselines, f, indent=2, default=str)
            
            # Commit to Git
            self.git_repo.index.add([str(baseline_file)])
            self.git_repo.index.commit(f"Establish baseline for {resource_type} {resource_id}")
            self.git_repo.remotes.origin.push()
            
            logger.info(f"Established baseline for {resource_type} {resource_id}")
            
        except Exception as e:
            logger.error(f"Failed to establish baseline: {str(e)}")

    def _compare_configurations(self, baseline: Dict, current: Dict) -> List[str]:
        """Compare two configurations and identify differences"""
        drift_items = []
        
        def deep_compare(base_obj, curr_obj, path=""):
            if isinstance(base_obj, dict) and isinstance(curr_obj, dict):
                # Compare dictionary keys
                base_keys = set(base_obj.keys())
                curr_keys = set(curr_obj.keys())
                
                # Added keys
                for key in curr_keys - base_keys:
                    drift_items.append(f"Added: {path}.{key} = {curr_obj[key]}")
                
                # Removed keys
                for key in base_keys - curr_keys:
                    drift_items.append(f"Removed: {path}.{key} = {base_obj[key]}")
                
                # Changed values
                for key in base_keys & curr_keys:
                    deep_compare(base_obj[key], curr_obj[key], f"{path}.{key}" if path else key)
                    
            elif isinstance(base_obj, list) and isinstance(curr_obj, list):
                # Compare lists
                if len(base_obj) != len(curr_obj):
                    drift_items.append(f"List size changed: {path} from {len(base_obj)} to {len(curr_obj)}")
                
                for i, (base_item, curr_item) in enumerate(zip(base_obj, curr_obj)):
                    deep_compare(base_item, curr_item, f"{path}[{i}]")
                    
            else:
                # Compare primitive values
                if base_obj != curr_obj:
                    drift_items.append(f"Changed: {path} from '{base_obj}' to '{curr_obj}'")
        
        # Exclude timestamp and volatile fields
        baseline_filtered = self._filter_volatile_fields(baseline)
        current_filtered = self._filter_volatile_fields(current)
        
        deep_compare(baseline_filtered, current_filtered)
        return drift_items

    def _filter_volatile_fields(self, config: Dict) -> Dict:
        """Filter out volatile fields that change frequently"""
        volatile_fields = {
            'time_created', 'time_updated', 'etag', 'lifecycle_details',
            'system_tags', 'time_maintenance_begin', 'time_maintenance_end'
        }
        
        def filter_recursive(obj):
            if isinstance(obj, dict):
                return {
                    k: filter_recursive(v) 
                    for k, v in obj.items() 
                    if k not in volatile_fields
                }
            elif isinstance(obj, list):
                return [filter_recursive(item) for item in obj]
            else:
                return obj
        
        return filter_recursive(config)

    def _analyze_drift_severity(self, drift_items: List[str], resource_type: str) -> Tuple[DriftSeverity, float]:
        """Analyze drift severity based on changes and resource type"""
        severity_scores = {
            'security': 0,
            'availability': 0,
            'performance': 0,
            'compliance': 0
        }
        
        # Analyze each drift item
        for item in drift_items:
            if any(keyword in item.lower() for keyword in ['security', 'ingress', 'egress', 'port', 'protocol']):
                severity_scores['security'] += 10
            
            if any(keyword in item.lower() for keyword in ['availability_domain', 'fault_domain', 'backup']):
                severity_scores['availability'] += 8
            
            if any(keyword in item.lower() for keyword in ['shape', 'cpu', 'memory', 'bandwidth']):
                severity_scores['performance'] += 6
            
            if any(keyword in item.lower() for keyword in ['tag', 'compliance', 'classification']):
                severity_scores['compliance'] += 7
        
        # Calculate overall severity
        total_score = sum(severity_scores.values())
        confidence = min(len(drift_items) * 0.1, 1.0)
        
        if total_score >= 30 or severity_scores['security'] >= 20:
            return DriftSeverity.CRITICAL, confidence
        elif total_score >= 20:
            return DriftSeverity.HIGH, confidence
        elif total_score >= 10:
            return DriftSeverity.MEDIUM, confidence
        else:
            return DriftSeverity.LOW, confidence

    def _determine_remediation_action(self, severity: DriftSeverity, resource_type: str, config: Dict) -> RemediationAction:
        """Determine appropriate remediation action based on severity and resource type"""
        tags = config.get('defined_tags', {})
        auto_remediate = tags.get('Drift', {}).get('AutoRemediate', 'False').lower() == 'true'
        
        if severity == DriftSeverity.CRITICAL:
            if auto_remediate and resource_type in ['security_list', 'network_security_group']:
                return RemediationAction.AUTO_REMEDIATE
            else:
                return RemediationAction.MANUAL_APPROVAL
        
        elif severity == DriftSeverity.HIGH:
            if auto_remediate:
                return RemediationAction.AUTO_REMEDIATE
            else:
                return RemediationAction.MANUAL_APPROVAL
        
        elif severity == DriftSeverity.MEDIUM:
            return RemediationAction.MANUAL_APPROVAL
        
        else:
            return RemediationAction.ALERT_ONLY

    async def remediate_drift(self, drift_detections: List[DriftDetection]) -> List[RemediationResult]:
        """Execute remediation actions for detected drift"""
        remediation_results = []
        
        for drift in drift_detections:
            start_time = datetime.utcnow()
            
            try:
                if drift.remediation_action == RemediationAction.AUTO_REMEDIATE:
                    result = await self._auto_remediate_drift(drift)
                
                elif drift.remediation_action == RemediationAction.MANUAL_APPROVAL:
                    result = await self._request_manual_approval(drift)
                
                elif drift.remediation_action == RemediationAction.ROLLBACK:
                    result = await self._rollback_changes(drift)
                
                else:  # ALERT_ONLY
                    result = await self._send_drift_alert(drift)
                
                execution_time = (datetime.utcnow() - start_time).total_seconds()
                result.execution_time = execution_time
                
                remediation_results.append(result)
                
            except Exception as e:
                logger.error(f"Remediation failed for {drift.resource_id}: {str(e)}")
                
                remediation_results.append(RemediationResult(
                    drift_detection=drift,
                    action_taken="remediation_failed",
                    success=False,
                    error_message=str(e),
                    execution_time=(datetime.utcnow() - start_time).total_seconds()
                ))
        
        return remediation_results

    async def _auto_remediate_drift(self, drift: DriftDetection) -> RemediationResult:
        """Automatically remediate detected drift"""
        try:
            # Create backup before remediation
            backup_info = await self._create_resource_backup(drift.resource_id, drift.resource_type)
            
            # Apply expected configuration
            if drift.resource_type == 'security_list':
                success = await self._remediate_security_list(drift)
            
            elif drift.resource_type == 'network_security_group':
                success = await self._remediate_network_security_group(drift)
            
            elif drift.resource_type == 'load_balancer':
                success = await self._remediate_load_balancer(drift)
            
            else:
                success = False
                raise NotImplementedError(f"Auto-remediation not implemented for {drift.resource_type}")
            
            if success:
                # Update baseline configuration
                await self._update_baseline_config(drift.resource_id, drift.resource_type, drift.expected_config)
                
                # Send success notification
                await self._send_remediation_notification(drift, "success")
            
            return RemediationResult(
                drift_detection=drift,
                action_taken="auto_remediated",
                success=success,
                rollback_info=backup_info
            )
            
        except Exception as e:
            return RemediationResult(
                drift_detection=drift,
                action_taken="auto_remediation_failed",
                success=False,
                error_message=str(e)
            )

    async def _remediate_security_list(self, drift: DriftDetection) -> bool:
        """Remediate security list configuration drift"""
        try:
            security_list_id = drift.resource_id
            expected_config = drift.expected_config
            
            # Prepare update details
            update_details = oci.core.models.UpdateSecurityListDetails(
                display_name=expected_config.get('display_name'),
                ingress_security_rules=[
                    oci.core.models.IngressSecurityRule(**rule) 
                    for rule in expected_config.get('ingress_security_rules', [])
                ],
                egress_security_rules=[
                    oci.core.models.EgressSecurityRule(**rule) 
                    for rule in expected_config.get('egress_security_rules', [])
                ],
                defined_tags=expected_config.get('defined_tags'),
                freeform_tags=expected_config.get('freeform_tags')
            )
            
            # Update security list
            response = self.network_client.update_security_list(
                security_list_id=security_list_id,
                update_security_list_details=update_details
            )
            
            # Wait for update to complete
            oci.wait_until(
                self.network_client,
                self.network_client.get_security_list(security_list_id),
                'lifecycle_state',
                'AVAILABLE'
            )
            
            logger.info(f"Successfully remediated security list {security_list_id}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to remediate security list {drift.resource_id}: {str(e)}")
            return False

    async def _create_resource_backup(self, resource_id: str, resource_type: str) -> Dict:
        """Create backup of current resource configuration before remediation"""
        try:
            backup_info = {
                'resource_id': resource_id,
                'resource_type': resource_type,
                'backup_time': datetime.utcnow().isoformat(),
                'backup_id': f"backup-{resource_id}-{int(datetime.utcnow().timestamp())}"
            }
            
            # Get current configuration
            if resource_type == 'security_list':
                current_config = await self._get_security_list_config(resource_id)
            elif resource_type == 'network_security_group':
                # Implementation for NSG backup
                current_config = {}
            else:
                current_config = {}
            
            backup_info['configuration'] = current_config
            
            # Store backup in Git repository
            backup_file = Path(self.git_repo.working_dir) / 'backups' / f"{backup_info['backup_id']}.json"
            backup_file.parent.mkdir(exist_ok=True)
            
            with open(backup_file, 'w') as f:
                json.dump(backup_info, f, indent=2, default=str)
            
            # Commit backup to Git
            self.git_repo.index.add([str(backup_file)])
            self.git_repo.index.commit(f"Backup {resource_type} {resource_id} before remediation")
            
            return backup_info
            
        except Exception as e:
            logger.error(f"Failed to create backup for {resource_id}: {str(e)}")
            return {}

    async def _send_drift_alert(self, drift: DriftDetection) -> RemediationResult:
        """Send drift detection alert"""
        try:
            alert_payload = {
                'event_type': 'infrastructure_drift_detected',
                'severity': drift.severity.value,
                'resource_id': drift.resource_id,
                'resource_type': drift.resource_type,
                'resource_name': drift.resource_name,
                'drift_count': len(drift.drift_items),
                'confidence_score': drift.confidence_score,
                'detected_at': drift.detected_at.isoformat(),
                'drift_details': drift.drift_items[:10],  # Limit details
                'remediation_action': drift.remediation_action.value
            }
            
            # Send to webhook if configured
            webhook_url = self.config.get('alert_webhook_url')
            if webhook_url:
                response = requests.post(
                    webhook_url,
                    json=alert_payload,
                    timeout=30
                )
                response.raise_for_status()
            
            # Send to OCI Notifications if configured
            notification_topic = self.config.get('notification_topic_ocid')
            if notification_topic:
                ons_client = oci.ons.NotificationDataPlaneClient({}, signer=self.signer)
                
                message_details = oci.ons.models.MessageDetails(
                    body=json.dumps(alert_payload, indent=2),
                    title=f"Infrastructure Drift Detected - {drift.severity.value.upper()}"
                )
                
                ons_client.publish_message(
                    topic_id=notification_topic,
                    message_details=message_details
                )
            
            logger.info(f"Drift alert sent for {drift.resource_id}")
            
            return RemediationResult(
                drift_detection=drift,
                action_taken="alert_sent",
                success=True
            )
            
        except Exception as e:
            return RemediationResult(
                drift_detection=drift,
                action_taken="alert_failed",
                success=False,
                error_message=str(e)
            )

    async def generate_drift_report(self, drift_detections: List[DriftDetection], 
                                  remediation_results: List[RemediationResult]) -> str:
        """Generate comprehensive drift detection and remediation report"""
        
        report_time = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC')
        
        # Statistics
        total_drifts = len(drift_detections)
        critical_drifts = len([d for d in drift_detections if d.severity == DriftSeverity.CRITICAL])
        high_drifts = len([d for d in drift_detections if d.severity == DriftSeverity.HIGH])
        auto_remediated = len([r for r in remediation_results if r.action_taken == "auto_remediated" and r.success])
        
        report = f"""
# Infrastructure Drift Detection Report
**Generated:** {report_time}

## Executive Summary
- **Total Drift Detections:** {total_drifts}
- **Critical Severity:** {critical_drifts}
- **High Severity:** {high_drifts}
- **Successfully Auto-Remediated:** {auto_remediated}
- **Detection Accuracy:** {sum(d.confidence_score for d in drift_detections) / max(total_drifts, 1):.2%}

## Drift Analysis by Resource Type
"""
        
        # Group by resource type
        drift_by_type = {}
        for drift in drift_detections:
            if drift.resource_type not in drift_by_type:
                drift_by_type[drift.resource_type] = []
            drift_by_type[drift.resource_type].append(drift)
        
        for resource_type, drifts in drift_by_type.items():
            report += f"""
### {resource_type.replace('_', ' ').title()}
- **Count:** {len(drifts)}
- **Average Confidence:** {sum(d.confidence_score for d in drifts) / len(drifts):.2%}
- **Severity Distribution:**
  - Critical: {len([d for d in drifts if d.severity == DriftSeverity.CRITICAL])}
  - High: {len([d for d in drifts if d.severity == DriftSeverity.HIGH])}
  - Medium: {len([d for d in drifts if d.severity == DriftSeverity.MEDIUM])}
  - Low: {len([d for d in drifts if d.severity == DriftSeverity.LOW])}
"""
        
        # Detailed drift information
        if drift_detections:
            report += "\n## Detailed Drift Analysis\n"
            
            for drift in sorted(drift_detections, key=lambda x: x.severity.value, reverse=True)[:20]:
                report += f"""
### {drift.resource_name} ({drift.resource_type})
- **Severity:** {drift.severity.value.upper()}
- **Confidence:** {drift.confidence_score:.2%}
- **Remediation Action:** {drift.remediation_action.value}
- **Changes Detected:** {len(drift.drift_items)}

**Key Changes:**
"""
                for change in drift.drift_items[:5]:  # Show top 5 changes
                    report += f"- {change}\n"
                
                if len(drift.drift_items) > 5:
                    report += f"- ... and {len(drift.drift_items) - 5} more changes\n"
        
        # Remediation results
        if remediation_results:
            report += "\n## Remediation Results\n"
            
            successful_remediations = [r for r in remediation_results if r.success]
            failed_remediations = [r for r in remediation_results if not r.success]
            
            report += f"""
- **Successful Actions:** {len(successful_remediations)}
- **Failed Actions:** {len(failed_remediations)}
- **Average Execution Time:** {sum(r.execution_time for r in remediation_results) / max(len(remediation_results), 1):.2f} seconds
"""
            
            if failed_remediations:
                report += "\n### Failed Remediations\n"
                for result in failed_remediations:
                    report += f"""
- **Resource:** {result.drift_detection.resource_name}
- **Action:** {result.action_taken}
- **Error:** {result.error_message}
"""
        
        # Recommendations
        report += f"""
## Recommendations

### Immediate Actions Required
"""
        
        critical_items = [d for d in drift_detections if d.severity == DriftSeverity.CRITICAL]
        if critical_items:
            report += "- **Critical drift detected** - Review and remediate immediately\n"
            for item in critical_items[:3]:
                report += f"  - {item.resource_name}: {len(item.drift_items)} critical changes\n"
        
        report += f"""
### Process Improvements
- Enable auto-remediation for {len([d for d in drift_detections if d.remediation_action == RemediationAction.MANUAL_APPROVAL])} resources with manual approval requirements
- Review baseline configurations for {len([d for d in drift_detections if d.confidence_score < 0.7])} resources with low confidence scores
- Implement preventive controls for {len(drift_by_type.get('security_list', []))} security list changes

### Monitoring Enhancements
- Increase monitoring frequency for critical resources
- Implement real-time alerting for security-related changes
- Establish automated testing for configuration changes
"""
        
        return report

# GitOps Integration Functions
async def setup_gitops_pipeline():
    """Set up GitOps pipeline for continuous infrastructure monitoring"""
    pipeline_config = {
        'git_repo': 'https://github.com/your-org/infrastructure-config.git',
        'branch': 'main',
        'polling_interval': 300,  # 5 minutes
        'webhook_url': 'https://your-webhook-endpoint.com/drift-alerts',
        'auto_remediation_enabled': True,
        'notification_topic': 'ocid1.onstopic.oc1..example'
    }
    
    # Initialize monitoring system
    monitor = InfrastructureDriftMonitor('drift_config.yaml')
    
    while True:
        try:
            logger.info("Starting drift detection cycle...")
            
            # Discover current infrastructure
            current_resources = await monitor.discover_resources()
            
            # Detect drift
            drift_detections = await monitor.detect_drift(current_resources)
            
            if drift_detections:
                logger.warning(f"Detected {len(drift_detections)} configuration drifts")
                
                # Execute remediation
                remediation_results = await monitor.remediate_drift(drift_detections)
                
                # Generate and save report
                report = await monitor.generate_drift_report(drift_detections, remediation_results)
                
                # Save report to Git repository
                report_file = Path(monitor.git_repo.working_dir) / 'reports' / f"drift_report_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.md"
                report_file.parent.mkdir(exist_ok=True)
                
                with open(report_file, 'w') as f:
                    f.write(report)
                
                # Commit report
                monitor.git_repo.index.add([str(report_file)])
                monitor.git_repo.index.commit(f"Drift detection report - {len(drift_detections)} issues found")
                monitor.git_repo.remotes.origin.push()
                
            else:
                logger.info("No configuration drift detected")
            
            # Wait for next cycle
            await asyncio.sleep(pipeline_config['polling_interval'])
            
        except Exception as e:
            logger.error(f"GitOps pipeline error: {str(e)}")
            await asyncio.sleep(60)  # Wait before retrying

if __name__ == "__main__":
    asyncio.run(setup_gitops_pipeline())

Advanced GitOps implementations require policy enforcement mechanisms that prevent configuration drift before it occurs. OCI Resource Manager integrates with Open Policy Agent (OPA) to provide policy-as-code capabilities that validate infrastructure changes against organizational standards.

Policy definitions stored in Git repositories enable version-controlled governance rules that automatically reject non-compliant infrastructure changes. These policies can enforce security baselines, cost optimization requirements, and operational standards across all infrastructure deployments.

The integration supports both admission control policies that prevent problematic changes and monitoring policies that detect violations in existing infrastructure. This dual approach ensures comprehensive coverage while maintaining operational flexibility.

Regards
Osama

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.