Implementation of anomaly detection in time series
Detecting anomalies in time series is a problem with multiple approaches and no universal solution. The choice of algorithm is determined by the anomaly type (point, contextual, or collective), the availability of labels, and computational constraints. The practical system combines several methods to minimize missed detections and false positives.
Typology of anomalies
Point Anomalies: A single value is sharply out of the norm. Example: a temperature sensor reading of 200°C when the norm is 50°C.
Contextual anomalies: The value is normal in itself, but abnormal in the given context. Example: a temperature of 35°C in January (normal in summer, abnormal in winter).
Collective anomalies: A sequence of values is normal individually, but anomalous collectively. Example: several standard transactions forming a fraudulent pattern.
Statistical methods
Z-Score and MAD:
import numpy as np
from scipy.stats import median_abs_deviation
def zscore_anomalies(series, threshold=3.0):
"""
Z-score: хорошо для нормально распределённых данных
"""
z_scores = np.abs((series - series.mean()) / series.std())
return z_scores > threshold
def mad_anomalies(series, threshold=3.5):
"""
MAD (Median Absolute Deviation): устойчив к выбросам в обучающих данных
Предпочтительнее z-score для данных с артефактами
"""
median = np.median(series)
mad = median_abs_deviation(series)
modified_z = 0.6745 * (series - median) / mad
return np.abs(modified_z) > threshold
CUSUM for gradual changes:
def cusum_detector(series, k=0.5, h=5.0):
"""
CUSUM: накапливает отклонения → детектирует shift в среднем
k: reference value (чувствительность)
h: threshold (порог срабатывания)
"""
mean = series[:50].mean() # baseline на начале ряда
std = series[:50].std()
S_pos = np.zeros(len(series))
S_neg = np.zeros(len(series))
for t in range(1, len(series)):
xi = (series[t] - mean) / std
S_pos[t] = max(0, S_pos[t-1] + xi - k)
S_neg[t] = max(0, S_neg[t-1] - xi - k)
return (S_pos > h) | (S_neg > h)
STL decomposition + residual detection:
from statsmodels.tsa.seasonal import STL
def stl_anomaly_detection(series, period=24, threshold=3.5):
"""
Разложение на тренд + сезонность + остаток
Аномалия = большой остаток
"""
stl = STL(series, period=period, robust=True)
result = stl.fit()
residuals = result.resid
# MAD на остатках
mad = median_abs_deviation(residuals)
modified_z = np.abs(0.6745 * (residuals - np.median(residuals)) / mad)
return modified_z > threshold, result
ML methods
Isolation Forest:
from sklearn.ensemble import IsolationForest
def isolation_forest_detector(series, contamination=0.05, window=10):
"""
Isolation Forest: эффективен на многомерных данных
contamination: ожидаемая доля аномалий
window: размер скользящего окна для создания фич
"""
# Создание оконных фич
features = []
for i in range(window, len(series)):
window_data = series[i-window:i]
features.append([
window_data.mean(),
window_data.std(),
window_data.max() - window_data.min(),
window_data[-1] - window_data.mean(), # current deviation
np.corrcoef(np.arange(window), window_data)[0,1] # trend
])
features = np.array(features)
iso_forest = IsolationForest(contamination=contamination, random_state=42)
predictions = iso_forest.fit_predict(features)
# -1 = аномалия, 1 = нормально
return predictions == -1
LSTM Autoencoder:
import torch
import torch.nn as nn
class LSTMAutoencoder(nn.Module):
def __init__(self, input_size, hidden_size=64, num_layers=2):
super().__init__()
# Encoder
self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
# Decoder
self.decoder = nn.LSTM(hidden_size, input_size, num_layers, batch_first=True)
def forward(self, x):
# Encode
_, (h_n, c_n) = self.encoder(x)
# Decode: repeat hidden state for decoder input
decoder_input = h_n[-1].unsqueeze(1).repeat(1, x.size(1), 1)
reconstruction, _ = self.decoder(decoder_input)
return reconstruction
def detect_autoencoder_anomalies(model, series, threshold_quantile=0.95):
"""
Reconstruction error как мера аномальности
Высокий RE = модель не может восстановить паттерн = аномалия
"""
with torch.no_grad():
reconstruction = model(series)
re = torch.mean((series - reconstruction)**2, dim=[1, 2])
threshold = torch.quantile(re, threshold_quantile)
return re > threshold
Online (streaming) detection
Streaming Anomaly Detection:
from collections import deque
import numpy as np
class OnlineAnomalyDetector:
"""
Полностью онлайн: работает без накопления истории в памяти
Обновляет статистику при каждой новой точке
"""
def __init__(self, window_size=200, threshold=3.5):
self.window = deque(maxlen=window_size)
self.threshold = threshold
self.n = 0
self.mean = 0
self.M2 = 0 # Welford's algorithm для online variance
def update(self, value):
self.window.append(value)
self.n += 1
# Welford's online mean and variance
delta = value - self.mean
self.mean += delta / self.n
delta2 = value - self.mean
self.M2 += delta * delta2
variance = self.M2 / (self.n - 1) if self.n > 1 else 0
std = np.sqrt(variance)
if std > 0 and self.n > 30: # warmup period
z_score = abs(value - self.mean) / std
return z_score > self.threshold
return False
Detector quality assessment
Metrics if tagged:
from sklearn.metrics import precision_score, recall_score, f1_score, average_precision_score
def evaluate_detector(y_true, y_pred, y_scores=None):
metrics = {
'precision': precision_score(y_true, y_pred),
'recall': recall_score(y_true, y_pred),
'f1': f1_score(y_true, y_pred),
}
if y_scores is not None:
metrics['average_precision'] = average_precision_score(y_true, y_scores)
return metrics
No labels - relative rating:
- False Positive Rate: the percentage of time the system is in an "anomaly" during normal operation
- Alert fatigue: if alerts are generated > 5% of the time, the system is too sensitive
- Operational feedback: engineers mark alerts as true/false positive → continuously improved model
Practical scenarios
Infrastructure Metrics: Prometheus metrics → STL decomposition + Isolation Forest. Main issue: deployments create false anomalies. Solution: suppress the detection window ±10 minutes from the deployment.
Financial transactions: High class imbalance (anomalies < 0.1%). LSTM Autoencoder or Isolation Forest are better than supervised methods.
Industrial sensors: Often have physically determined limitations → threshold + statistical hybrid.
Timeframe: STL + Isolation Forest + online Z-score + basic dashboard — 3-4 weeks. LSTM Autoencoder, streaming detection, feedback loop for retraining, multi-sensor fusion — 2-3 months.







