Implementation of AI-based log and incident analysis (AIOps)
AIOps log analysis is the link between raw log data and actionable insights for the SRE team. The system processes billions of rows in real time, distinguishing signal from noise, and constructing a temporal picture of the incident with cause-and-effect relationships.
The scale of the task
Typical volumes:
- Microservice system of 100 services: 5-50 GB of logs per hour
- Kubernetes cluster: thousands of pods, events every second
- Latency requirement: the anomaly must be detected < 2 minutes after occurrence
Processing stack:
Applications/Infra
→ Fluent Bit (lightweight collector, edge filtering)
→ Kafka (буферизация, партиционирование по сервису)
→ Flink / Spark Streaming (обработка)
→ ClickHouse (аналитика) + Elasticsearch (поиск)
→ ML Service (inference)
→ Grafana / Custom UI
Log Structuring Pipeline
Multi-format parsing:
import re
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ParsedLog:
timestamp: datetime
level: str
service: str
trace_id: str
message: str
parsed_fields: dict
class MultiFormatLogParser:
PATTERNS = {
'nginx': r'(?P<ip>\S+) .* \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<path>\S+) (?P<proto>\S+)" (?P<status>\d+) (?P<bytes>\d+)',
'java_log4j': r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}.\d{3}) (?P<level>\w+) (?P<class>\S+) - (?P<message>.*)',
'python_logging': r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}) (?P<level>\w+) (?P<logger>\S+): (?P<message>.*)',
'json': None # прямой json.loads
}
def parse(self, raw_line, format_hint=None):
# Попытка JSON сначала
try:
import json
data = json.loads(raw_line)
return self.normalize_json_log(data)
except:
pass
# Попытка regex паттернов
for fmt, pattern in self.PATTERNS.items():
if pattern is None:
continue
match = re.match(pattern, raw_line)
if match:
return self.normalize_regex_log(match.groupdict(), fmt)
# Fallback: неструктурированная строка
return ParsedLog(
timestamp=datetime.now(),
level=self.detect_level(raw_line),
service='unknown',
trace_id=None,
message=raw_line,
parsed_fields={}
)
Intelligent Alerting
Multi-level scoring:
class IntelligentAlertScorer:
def __init__(self):
self.severity_weights = {
'ERROR': 3, 'CRITICAL': 5, 'FATAL': 10,
'WARN': 1, 'WARNING': 1,
'INFO': 0
}
def compute_alert_score(self, log_window, service_profile):
"""
Комплексный скор: серьёзность × аномальность × важность сервиса
"""
# Серьёзность ошибок
error_score = sum(
self.severity_weights.get(log.level, 0)
for log in log_window
)
# Аномальность: отношение к baseline
baseline_errors = service_profile['baseline_error_rate_per_minute']
current_errors = sum(1 for log in log_window if log.level in ['ERROR', 'CRITICAL'])
spike_ratio = current_errors / (baseline_errors * len(log_window) + 1)
# Важность сервиса (критичность для бизнеса)
business_criticality = service_profile.get('criticality', 1)
return error_score * np.log1p(spike_ratio) * business_criticality
Contextual suppression:
def should_suppress_alert(alert, suppression_rules):
"""
Подавление известных ложных тревог:
- Maintenance window
- Known flapping services
- Planned deployment events
"""
now = datetime.now()
for rule in suppression_rules:
if (rule['service'] == alert.service and
rule['start'] <= now <= rule['end'] and
rule['pattern'] in alert.message):
return True, rule['reason']
return False, None
Incident Timeline Construction
Chronological reconstruction:
def build_incident_timeline(correlated_logs, metrics, deploys, threshold_events):
"""
Строим полную картину инцидента из всех источников данных
"""
events = []
# Из логов: первые ошибки, их нарастание
for log in correlated_logs:
events.append({
'time': log.timestamp,
'type': 'log_event',
'service': log.service,
'description': log.message[:200],
'severity': log.level
})
# Из метрик: когда началось ухудшение
for metric_anomaly in metrics:
events.append({
'time': metric_anomaly.timestamp,
'type': 'metric_anomaly',
'metric': metric_anomaly.name,
'value': metric_anomaly.value,
'baseline': metric_anomaly.baseline
})
# Деплои и изменения конфигурации
for deploy in deploys:
events.append({
'time': deploy.timestamp,
'type': 'deployment',
'service': deploy.service,
'version': deploy.new_version,
'changed_by': deploy.author
})
return sorted(events, key=lambda x: x['time'])
Automatic Runbook Matching
Semantic search in the runbook database:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
class RunbookMatcher:
def __init__(self):
self.encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.index = None
self.runbooks = []
def index_runbooks(self, runbooks):
"""
Индексация runbook базы для семантического поиска
"""
self.runbooks = runbooks
texts = [f"{rb['title']} {rb['description']} {' '.join(rb['symptoms'])}"
for rb in runbooks]
embeddings = self.encoder.encode(texts)
dimension = embeddings.shape[1]
self.index = faiss.IndexFlatL2(dimension)
self.index.add(embeddings.astype(np.float32))
def find_relevant_runbooks(self, incident_description, top_k=3):
"""
По описанию инцидента → похожие runbooks
"""
query_embedding = self.encoder.encode([incident_description])
distances, indices = self.index.search(
query_embedding.astype(np.float32), top_k
)
return [
{
'runbook': self.runbooks[idx],
'similarity': 1 / (1 + distance)
}
for idx, distance in zip(indices[0], distances[0])
]
Generative AI for Incident Summary
Automatic Incident Summary:
def generate_incident_brief(incident_timeline, correlated_services, root_cause_candidates, llm):
prompt = f"""
Analyze this incident and provide a concise summary for the on-call engineer:
Timeline of events:
{format_timeline(incident_timeline[:20])} # первые 20 событий
Affected services: {', '.join(correlated_services)}
Probable root cause: {root_cause_candidates[0] if root_cause_candidates else 'Unknown'}
Provide:
1. One-sentence executive summary
2. Key symptoms observed
3. Probable root cause with confidence
4. Recommended immediate actions
5. Estimated impact
"""
return llm.invoke(prompt)
War Room automation:
- Slack: Automatically create an incident channel with members from affected teams
- Confluence: Automatic creation of PIR (Post-Incident Review) documents
- Jira: Create tickets with causal factor tags
Accounting and training based on incidents
MTTD/MTTR Tracking:
def calculate_incident_metrics(incident_log):
"""
MTTD: Mean Time to Detect
MTTR: Mean Time to Resolve
MTTA: Mean Time to Acknowledge
"""
return {
'mttd_minutes': (incident_log['detected_at'] - incident_log['started_at']).total_seconds() / 60,
'mtta_minutes': (incident_log['acknowledged_at'] - incident_log['detected_at']).total_seconds() / 60,
'mttr_minutes': (incident_log['resolved_at'] - incident_log['started_at']).total_seconds() / 60
}
Timeframe: Kafka pipeline + log parsing + alert scoring + Slack integration — 4-5 weeks. Timeline builder, runbook matcher, LLM incident summary, MTTD/MTTR analytics, Jira/Confluence automation — 3-4 months.







