Time Series Anomaly Detection Implementation

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
Time Series Anomaly Detection Implementation
Medium
~1-2 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1215
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

Implementation of anomaly detection in time series

Detecting anomalies in time series is a problem with multiple approaches and no universal solution. The choice of algorithm is determined by the anomaly type (point, contextual, or collective), the availability of labels, and computational constraints. The practical system combines several methods to minimize missed detections and false positives.

Typology of anomalies

Point Anomalies: A single value is sharply out of the norm. Example: a temperature sensor reading of 200°C when the norm is 50°C.

Contextual anomalies: The value is normal in itself, but abnormal in the given context. Example: a temperature of 35°C in January (normal in summer, abnormal in winter).

Collective anomalies: A sequence of values is normal individually, but anomalous collectively. Example: several standard transactions forming a fraudulent pattern.

Statistical methods

Z-Score and MAD:

import numpy as np
from scipy.stats import median_abs_deviation

def zscore_anomalies(series, threshold=3.0):
    """
    Z-score: хорошо для нормально распределённых данных
    """
    z_scores = np.abs((series - series.mean()) / series.std())
    return z_scores > threshold

def mad_anomalies(series, threshold=3.5):
    """
    MAD (Median Absolute Deviation): устойчив к выбросам в обучающих данных
    Предпочтительнее z-score для данных с артефактами
    """
    median = np.median(series)
    mad = median_abs_deviation(series)
    modified_z = 0.6745 * (series - median) / mad
    return np.abs(modified_z) > threshold

CUSUM for gradual changes:

def cusum_detector(series, k=0.5, h=5.0):
    """
    CUSUM: накапливает отклонения → детектирует shift в среднем
    k: reference value (чувствительность)
    h: threshold (порог срабатывания)
    """
    mean = series[:50].mean()  # baseline на начале ряда
    std = series[:50].std()

    S_pos = np.zeros(len(series))
    S_neg = np.zeros(len(series))

    for t in range(1, len(series)):
        xi = (series[t] - mean) / std
        S_pos[t] = max(0, S_pos[t-1] + xi - k)
        S_neg[t] = max(0, S_neg[t-1] - xi - k)

    return (S_pos > h) | (S_neg > h)

STL decomposition + residual detection:

from statsmodels.tsa.seasonal import STL

def stl_anomaly_detection(series, period=24, threshold=3.5):
    """
    Разложение на тренд + сезонность + остаток
    Аномалия = большой остаток
    """
    stl = STL(series, period=period, robust=True)
    result = stl.fit()
    residuals = result.resid

    # MAD на остатках
    mad = median_abs_deviation(residuals)
    modified_z = np.abs(0.6745 * (residuals - np.median(residuals)) / mad)
    return modified_z > threshold, result

ML methods

Isolation Forest:

from sklearn.ensemble import IsolationForest

def isolation_forest_detector(series, contamination=0.05, window=10):
    """
    Isolation Forest: эффективен на многомерных данных
    contamination: ожидаемая доля аномалий
    window: размер скользящего окна для создания фич
    """
    # Создание оконных фич
    features = []
    for i in range(window, len(series)):
        window_data = series[i-window:i]
        features.append([
            window_data.mean(),
            window_data.std(),
            window_data.max() - window_data.min(),
            window_data[-1] - window_data.mean(),  # current deviation
            np.corrcoef(np.arange(window), window_data)[0,1]  # trend
        ])

    features = np.array(features)
    iso_forest = IsolationForest(contamination=contamination, random_state=42)
    predictions = iso_forest.fit_predict(features)
    # -1 = аномалия, 1 = нормально
    return predictions == -1

LSTM Autoencoder:

import torch
import torch.nn as nn

class LSTMAutoencoder(nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=2):
        super().__init__()
        # Encoder
        self.encoder = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        # Decoder
        self.decoder = nn.LSTM(hidden_size, input_size, num_layers, batch_first=True)

    def forward(self, x):
        # Encode
        _, (h_n, c_n) = self.encoder(x)

        # Decode: repeat hidden state for decoder input
        decoder_input = h_n[-1].unsqueeze(1).repeat(1, x.size(1), 1)
        reconstruction, _ = self.decoder(decoder_input)
        return reconstruction

def detect_autoencoder_anomalies(model, series, threshold_quantile=0.95):
    """
    Reconstruction error как мера аномальности
    Высокий RE = модель не может восстановить паттерн = аномалия
    """
    with torch.no_grad():
        reconstruction = model(series)
        re = torch.mean((series - reconstruction)**2, dim=[1, 2])

    threshold = torch.quantile(re, threshold_quantile)
    return re > threshold

Online (streaming) detection

Streaming Anomaly Detection:

from collections import deque
import numpy as np

class OnlineAnomalyDetector:
    """
    Полностью онлайн: работает без накопления истории в памяти
    Обновляет статистику при каждой новой точке
    """
    def __init__(self, window_size=200, threshold=3.5):
        self.window = deque(maxlen=window_size)
        self.threshold = threshold
        self.n = 0
        self.mean = 0
        self.M2 = 0  # Welford's algorithm для online variance

    def update(self, value):
        self.window.append(value)
        self.n += 1

        # Welford's online mean and variance
        delta = value - self.mean
        self.mean += delta / self.n
        delta2 = value - self.mean
        self.M2 += delta * delta2

        variance = self.M2 / (self.n - 1) if self.n > 1 else 0
        std = np.sqrt(variance)

        if std > 0 and self.n > 30:  # warmup period
            z_score = abs(value - self.mean) / std
            return z_score > self.threshold

        return False

Detector quality assessment

Metrics if tagged:

from sklearn.metrics import precision_score, recall_score, f1_score, average_precision_score

def evaluate_detector(y_true, y_pred, y_scores=None):
    metrics = {
        'precision': precision_score(y_true, y_pred),
        'recall': recall_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred),
    }
    if y_scores is not None:
        metrics['average_precision'] = average_precision_score(y_true, y_scores)
    return metrics

No labels - relative rating:

  • False Positive Rate: the percentage of time the system is in an "anomaly" during normal operation
  • Alert fatigue: if alerts are generated > 5% of the time, the system is too sensitive
  • Operational feedback: engineers mark alerts as true/false positive → continuously improved model

Practical scenarios

Infrastructure Metrics: Prometheus metrics → STL decomposition + Isolation Forest. Main issue: deployments create false anomalies. Solution: suppress the detection window ±10 minutes from the deployment.

Financial transactions: High class imbalance (anomalies < 0.1%). LSTM Autoencoder or Isolation Forest are better than supervised methods.

Industrial sensors: Often have physically determined limitations → threshold + statistical hybrid.

Timeframe: STL + Isolation Forest + online Z-score + basic dashboard — 3-4 weeks. LSTM Autoencoder, streaming detection, feedback loop for retraining, multi-sensor fusion — 2-3 months.