AIOps Log and Incident Analysis Implementation

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AIOps Log and Incident Analysis Implementation
Complex
~1-2 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1215
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1043
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

Implementation of AI-based log and incident analysis (AIOps)

AIOps log analysis is the link between raw log data and actionable insights for the SRE team. The system processes billions of rows in real time, distinguishing signal from noise, and constructing a temporal picture of the incident with cause-and-effect relationships.

The scale of the task

Typical volumes:

  • Microservice system of 100 services: 5-50 GB of logs per hour
  • Kubernetes cluster: thousands of pods, events every second
  • Latency requirement: the anomaly must be detected < 2 minutes after occurrence

Processing stack:

Applications/Infra
    → Fluent Bit (lightweight collector, edge filtering)
    → Kafka (буферизация, партиционирование по сервису)
    → Flink / Spark Streaming (обработка)
    → ClickHouse (аналитика) + Elasticsearch (поиск)
    → ML Service (inference)
    → Grafana / Custom UI

Log Structuring Pipeline

Multi-format parsing:

import re
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ParsedLog:
    timestamp: datetime
    level: str
    service: str
    trace_id: str
    message: str
    parsed_fields: dict

class MultiFormatLogParser:
    PATTERNS = {
        'nginx': r'(?P<ip>\S+) .* \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) (?P<proto>\S+)" (?P<status>\d+) (?P<bytes>\d+)',
        'java_log4j': r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) (?P<level>\w+) (?P<class>\S+) - (?P<message>.*)',
        'python_logging': r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<level>\w+) (?P<logger>\S+): (?P<message>.*)',
        'json': None  # прямой json.loads
    }

    def parse(self, raw_line, format_hint=None):
        # Попытка JSON сначала
        try:
            import json
            data = json.loads(raw_line)
            return self.normalize_json_log(data)
        except:
            pass

        # Попытка regex паттернов
        for fmt, pattern in self.PATTERNS.items():
            if pattern is None:
                continue
            match = re.match(pattern, raw_line)
            if match:
                return self.normalize_regex_log(match.groupdict(), fmt)

        # Fallback: неструктурированная строка
        return ParsedLog(
            timestamp=datetime.now(),
            level=self.detect_level(raw_line),
            service='unknown',
            trace_id=None,
            message=raw_line,
            parsed_fields={}
        )

Intelligent Alerting

Multi-level scoring:

class IntelligentAlertScorer:
    def __init__(self):
        self.severity_weights = {
            'ERROR': 3, 'CRITICAL': 5, 'FATAL': 10,
            'WARN': 1, 'WARNING': 1,
            'INFO': 0
        }

    def compute_alert_score(self, log_window, service_profile):
        """
        Комплексный скор: серьёзность × аномальность × важность сервиса
        """
        # Серьёзность ошибок
        error_score = sum(
            self.severity_weights.get(log.level, 0)
            for log in log_window
        )

        # Аномальность: отношение к baseline
        baseline_errors = service_profile['baseline_error_rate_per_minute']
        current_errors = sum(1 for log in log_window if log.level in ['ERROR', 'CRITICAL'])
        spike_ratio = current_errors / (baseline_errors * len(log_window) + 1)

        # Важность сервиса (критичность для бизнеса)
        business_criticality = service_profile.get('criticality', 1)

        return error_score * np.log1p(spike_ratio) * business_criticality

Contextual suppression:

def should_suppress_alert(alert, suppression_rules):
    """
    Подавление известных ложных тревог:
    - Maintenance window
    - Known flapping services
    - Planned deployment events
    """
    now = datetime.now()

    for rule in suppression_rules:
        if (rule['service'] == alert.service and
            rule['start'] <= now <= rule['end'] and
            rule['pattern'] in alert.message):
            return True, rule['reason']

    return False, None

Incident Timeline Construction

Chronological reconstruction:

def build_incident_timeline(correlated_logs, metrics, deploys, threshold_events):
    """
    Строим полную картину инцидента из всех источников данных
    """
    events = []

    # Из логов: первые ошибки, их нарастание
    for log in correlated_logs:
        events.append({
            'time': log.timestamp,
            'type': 'log_event',
            'service': log.service,
            'description': log.message[:200],
            'severity': log.level
        })

    # Из метрик: когда началось ухудшение
    for metric_anomaly in metrics:
        events.append({
            'time': metric_anomaly.timestamp,
            'type': 'metric_anomaly',
            'metric': metric_anomaly.name,
            'value': metric_anomaly.value,
            'baseline': metric_anomaly.baseline
        })

    # Деплои и изменения конфигурации
    for deploy in deploys:
        events.append({
            'time': deploy.timestamp,
            'type': 'deployment',
            'service': deploy.service,
            'version': deploy.new_version,
            'changed_by': deploy.author
        })

    return sorted(events, key=lambda x: x['time'])

Automatic Runbook Matching

Semantic search in the runbook database:

from sentence_transformers import SentenceTransformer
import faiss
import numpy as np

class RunbookMatcher:
    def __init__(self):
        self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
        self.index = None
        self.runbooks = []

    def index_runbooks(self, runbooks):
        """
        Индексация runbook базы для семантического поиска
        """
        self.runbooks = runbooks
        texts = [f"{rb['title']} {rb['description']} {' '.join(rb['symptoms'])}"
                 for rb in runbooks]
        embeddings = self.encoder.encode(texts)

        dimension = embeddings.shape[1]
        self.index = faiss.IndexFlatL2(dimension)
        self.index.add(embeddings.astype(np.float32))

    def find_relevant_runbooks(self, incident_description, top_k=3):
        """
        По описанию инцидента → похожие runbooks
        """
        query_embedding = self.encoder.encode([incident_description])
        distances, indices = self.index.search(
            query_embedding.astype(np.float32), top_k
        )

        return [
            {
                'runbook': self.runbooks[idx],
                'similarity': 1 / (1 + distance)
            }
            for idx, distance in zip(indices[0], distances[0])
        ]

Generative AI for Incident Summary

Automatic Incident Summary:

def generate_incident_brief(incident_timeline, correlated_services, root_cause_candidates, llm):
    prompt = f"""
    Analyze this incident and provide a concise summary for the on-call engineer:

    Timeline of events:
    {format_timeline(incident_timeline[:20])}  # первые 20 событий

    Affected services: {', '.join(correlated_services)}
    Probable root cause: {root_cause_candidates[0] if root_cause_candidates else 'Unknown'}

    Provide:
    1. One-sentence executive summary
    2. Key symptoms observed
    3. Probable root cause with confidence
    4. Recommended immediate actions
    5. Estimated impact
    """

    return llm.invoke(prompt)

War Room automation:

  • Slack: Automatically create an incident channel with members from affected teams
  • Confluence: Automatic creation of PIR (Post-Incident Review) documents
  • Jira: Create tickets with causal factor tags

Accounting and training based on incidents

MTTD/MTTR Tracking:

def calculate_incident_metrics(incident_log):
    """
    MTTD: Mean Time to Detect
    MTTR: Mean Time to Resolve
    MTTA: Mean Time to Acknowledge
    """
    return {
        'mttd_minutes': (incident_log['detected_at'] - incident_log['started_at']).total_seconds() / 60,
        'mtta_minutes': (incident_log['acknowledged_at'] - incident_log['detected_at']).total_seconds() / 60,
        'mttr_minutes': (incident_log['resolved_at'] - incident_log['started_at']).total_seconds() / 60
    }

Timeframe: Kafka pipeline + log parsing + alert scoring + Slack integration — 4-5 weeks. Timeline builder, runbook matcher, LLM incident summary, MTTD/MTTR analytics, Jira/Confluence automation — 3-4 months.