Development of an AI system for detecting anomalies in a telecom network
A telecom network generates millions of metrics per minute. Traditional static thresholds—80% CPU, packet loss > 1%—fail to capture subtle anomalies such as slow drift, correlated degradations across multiple KPIs, and atypical traffic patterns. ML detection operates without preset thresholds, adapting to the normal behavior of each element.
Multivariate anomaly at the network level
Why static thresholds are not sufficient:
- Normal router CPU at peak time = 75% (not anomaly)
- CPU 50% at 3am on Saturday = anomaly (possible attack or memory leak)
- Simultaneous degradation of 5 KPIs on one element = an anomaly, although each one separately is normal
Context-dependent thresholds:
import pandas as pd
import numpy as np
from prophet import Prophet
class ContextualAnomalyDetector:
def __init__(self, kpi_name: str):
self.kpi_name = kpi_name
self.prophet_model = Prophet(
daily_seasonality=True,
weekly_seasonality=True,
interval_width=0.99
)
self.fitted = False
def fit(self, historical_data: pd.DataFrame):
"""
historical_data: DataFrame с колонками ds (datetime), y (значение KPI)
Минимум 4 недели истории для корректной сезонности.
"""
self.prophet_model.fit(historical_data)
self.fitted = True
def detect(self, current_value: float, current_time: pd.Timestamp) -> dict:
future = pd.DataFrame({'ds': [current_time]})
forecast = self.prophet_model.predict(future)
yhat = forecast['yhat'].values[0]
yhat_lower = forecast['yhat_lower'].values[0]
yhat_upper = forecast['yhat_upper'].values[0]
is_anomaly = current_value < yhat_lower or current_value > yhat_upper
deviation = (current_value - yhat) / (abs(yhat) + 1e-9)
return {
'kpi': self.kpi_name,
'value': current_value,
'expected': yhat,
'bounds': (yhat_lower, yhat_upper),
'anomaly': is_anomaly,
'relative_deviation': deviation
}
Multivariate Anomaly Detection
Isolation Forest on a node metric vector:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
class NetworkElementAnomalyDetector:
"""
Каждый сетевой элемент имеет свою модель Isolation Forest.
Обучение: 30 дней нормальной работы.
Инференс: каждые 5 минут на текущем векторе KPI.
"""
def __init__(self, element_id: str, contamination=0.01):
self.element_id = element_id
self.scaler = StandardScaler()
self.model = IsolationForest(
contamination=contamination,
n_estimators=100,
random_state=42
)
def fit(self, normal_kpi_matrix: np.ndarray):
"""
normal_kpi_matrix: (N_samples × N_kpis)
"""
X = self.scaler.fit_transform(normal_kpi_matrix)
self.model.fit(X)
# Калибровка порога на нормальных данных
scores = self.model.score_samples(X)
self.threshold = np.percentile(scores, 1) # 1% ложных срабатываний
def score(self, kpi_vector: np.ndarray) -> dict:
X = self.scaler.transform([kpi_vector])
raw_score = self.model.score_samples(X)[0]
anomaly_score = -raw_score # выше = аномальнее
return {
'element_id': self.element_id,
'anomaly_score': float(anomaly_score),
'is_anomaly': raw_score < self.threshold,
'severity': self._score_to_severity(anomaly_score)
}
def _score_to_severity(self, score):
if score > 0.7: return 'critical'
if score > 0.5: return 'major'
if score > 0.3: return 'minor'
return 'normal'
Traffic Pattern Anomaly
Detection of atypical traffic (DDoS, BGP hijack):
def detect_traffic_anomaly(traffic_matrix: pd.DataFrame,
baseline_stats: dict) -> list:
"""
traffic_matrix: src_ip × dst_ip × bytes за 5 минут (NetFlow/IPFIX)
Аномалии трафика: объёмные (DDoS), структурные (BGP hijack), протокольные
"""
anomalies = []
# 1. Объёмная аномалия: резкий рост входящего трафика
current_total = traffic_matrix['bytes'].sum()
baseline_total = baseline_stats['total_bytes_mean']
baseline_std = baseline_stats['total_bytes_std']
volume_z_score = (current_total - baseline_total) / (baseline_std + 1e-9)
if volume_z_score > 5:
anomalies.append({
'type': 'volumetric_spike',
'severity': 'critical',
'z_score': volume_z_score,
'possible_cause': 'DDoS attack or flash crowd'
})
# 2. Новые источники: IP, которых не было в baseline
current_sources = set(traffic_matrix['src_ip'].unique())
known_sources = baseline_stats.get('known_src_ips', set())
new_sources = current_sources - known_sources
if len(new_sources) > baseline_stats.get('new_ip_threshold', 1000):
anomalies.append({
'type': 'new_source_flood',
'severity': 'major',
'new_ips_count': len(new_sources)
})
# 3. Протокольная аномалия: рост ICMP или UDP flood
protocol_ratios = traffic_matrix.groupby('protocol')['bytes'].sum() / current_total
for proto in ['ICMP', 'UDP']:
if protocol_ratios.get(proto, 0) > 0.5:
anomalies.append({
'type': f'{proto}_flood',
'severity': 'major',
'ratio': protocol_ratios[proto]
})
return anomalies
BGP and routing
BGP anomaly detection:
def analyze_bgp_events(bgp_updates: pd.DataFrame, baseline_prefix_count: int) -> dict:
"""
BGP hijack: внезапное появление нового AS-path для известного префикса.
BGP leak: маршруты от одного провайдера рекламируются другому.
Route flap: частые обновления = нестабильность соединения.
"""
# Route flapping
prefix_update_counts = bgp_updates.groupby('prefix').size()
flapping_prefixes = prefix_update_counts[prefix_update_counts > 10].index.tolist()
# Новые AS-origin для известных префиксов
known_origins = {} # prefix → expected AS
hijack_candidates = []
for _, row in bgp_updates.iterrows():
if row['prefix'] in known_origins:
if row['origin_as'] != known_origins[row['prefix']]:
hijack_candidates.append({
'prefix': row['prefix'],
'expected_as': known_origins[row['prefix']],
'detected_as': row['origin_as']
})
return {
'flapping_prefixes': flapping_prefixes,
'hijack_candidates': hijack_candidates,
'route_instability': len(flapping_prefixes) > 5
}
Alert Correlation and Noise Suppression
Anomaly Correlation Graph: A router uplink failure results in hundreds of downstream anomalies. Algorithm: build a dependency graph from the CMDB topology → identify the upstream source → group them into a single incident.
Timeframe: Prophet contextual anomaly + Isolation Forest on nodes + traffic anomaly — 3-4 weeks. BGP anomaly, alert correlation graph, automatic RCA, NOC integration — 2-3 months.







