AI Telecom Network Anomaly Detection System Development

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Telecom Network Anomaly Detection System Development
Medium
~1-2 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1215
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

Development of an AI system for detecting anomalies in a telecom network

A telecom network generates millions of metrics per minute. Traditional static thresholds—80% CPU, packet loss > 1%—fail to capture subtle anomalies such as slow drift, correlated degradations across multiple KPIs, and atypical traffic patterns. ML detection operates without preset thresholds, adapting to the normal behavior of each element.

Multivariate anomaly at the network level

Why static thresholds are not sufficient:

  • Normal router CPU at peak time = 75% (not anomaly)
  • CPU 50% at 3am on Saturday = anomaly (possible attack or memory leak)
  • Simultaneous degradation of 5 KPIs on one element = an anomaly, although each one separately is normal

Context-dependent thresholds:

import pandas as pd
import numpy as np
from prophet import Prophet

class ContextualAnomalyDetector:
    def __init__(self, kpi_name: str):
        self.kpi_name = kpi_name
        self.prophet_model = Prophet(
            daily_seasonality=True,
            weekly_seasonality=True,
            interval_width=0.99
        )
        self.fitted = False

    def fit(self, historical_data: pd.DataFrame):
        """
        historical_data: DataFrame с колонками ds (datetime), y (значение KPI)
        Минимум 4 недели истории для корректной сезонности.
        """
        self.prophet_model.fit(historical_data)
        self.fitted = True

    def detect(self, current_value: float, current_time: pd.Timestamp) -> dict:
        future = pd.DataFrame({'ds': [current_time]})
        forecast = self.prophet_model.predict(future)

        yhat = forecast['yhat'].values[0]
        yhat_lower = forecast['yhat_lower'].values[0]
        yhat_upper = forecast['yhat_upper'].values[0]

        is_anomaly = current_value < yhat_lower or current_value > yhat_upper
        deviation = (current_value - yhat) / (abs(yhat) + 1e-9)

        return {
            'kpi': self.kpi_name,
            'value': current_value,
            'expected': yhat,
            'bounds': (yhat_lower, yhat_upper),
            'anomaly': is_anomaly,
            'relative_deviation': deviation
        }

Multivariate Anomaly Detection

Isolation Forest on a node metric vector:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler

class NetworkElementAnomalyDetector:
    """
    Каждый сетевой элемент имеет свою модель Isolation Forest.
    Обучение: 30 дней нормальной работы.
    Инференс: каждые 5 минут на текущем векторе KPI.
    """
    def __init__(self, element_id: str, contamination=0.01):
        self.element_id = element_id
        self.scaler = StandardScaler()
        self.model = IsolationForest(
            contamination=contamination,
            n_estimators=100,
            random_state=42
        )

    def fit(self, normal_kpi_matrix: np.ndarray):
        """
        normal_kpi_matrix: (N_samples × N_kpis)
        """
        X = self.scaler.fit_transform(normal_kpi_matrix)
        self.model.fit(X)
        # Калибровка порога на нормальных данных
        scores = self.model.score_samples(X)
        self.threshold = np.percentile(scores, 1)  # 1% ложных срабатываний

    def score(self, kpi_vector: np.ndarray) -> dict:
        X = self.scaler.transform([kpi_vector])
        raw_score = self.model.score_samples(X)[0]
        anomaly_score = -raw_score  # выше = аномальнее

        return {
            'element_id': self.element_id,
            'anomaly_score': float(anomaly_score),
            'is_anomaly': raw_score < self.threshold,
            'severity': self._score_to_severity(anomaly_score)
        }

    def _score_to_severity(self, score):
        if score > 0.7: return 'critical'
        if score > 0.5: return 'major'
        if score > 0.3: return 'minor'
        return 'normal'

Traffic Pattern Anomaly

Detection of atypical traffic (DDoS, BGP hijack):

def detect_traffic_anomaly(traffic_matrix: pd.DataFrame,
                            baseline_stats: dict) -> list:
    """
    traffic_matrix: src_ip × dst_ip × bytes за 5 минут (NetFlow/IPFIX)
    Аномалии трафика: объёмные (DDoS), структурные (BGP hijack), протокольные
    """
    anomalies = []

    # 1. Объёмная аномалия: резкий рост входящего трафика
    current_total = traffic_matrix['bytes'].sum()
    baseline_total = baseline_stats['total_bytes_mean']
    baseline_std = baseline_stats['total_bytes_std']

    volume_z_score = (current_total - baseline_total) / (baseline_std + 1e-9)
    if volume_z_score > 5:
        anomalies.append({
            'type': 'volumetric_spike',
            'severity': 'critical',
            'z_score': volume_z_score,
            'possible_cause': 'DDoS attack or flash crowd'
        })

    # 2. Новые источники: IP, которых не было в baseline
    current_sources = set(traffic_matrix['src_ip'].unique())
    known_sources = baseline_stats.get('known_src_ips', set())
    new_sources = current_sources - known_sources
    if len(new_sources) > baseline_stats.get('new_ip_threshold', 1000):
        anomalies.append({
            'type': 'new_source_flood',
            'severity': 'major',
            'new_ips_count': len(new_sources)
        })

    # 3. Протокольная аномалия: рост ICMP или UDP flood
    protocol_ratios = traffic_matrix.groupby('protocol')['bytes'].sum() / current_total
    for proto in ['ICMP', 'UDP']:
        if protocol_ratios.get(proto, 0) > 0.5:
            anomalies.append({
                'type': f'{proto}_flood',
                'severity': 'major',
                'ratio': protocol_ratios[proto]
            })

    return anomalies

BGP and routing

BGP anomaly detection:

def analyze_bgp_events(bgp_updates: pd.DataFrame, baseline_prefix_count: int) -> dict:
    """
    BGP hijack: внезапное появление нового AS-path для известного префикса.
    BGP leak: маршруты от одного провайдера рекламируются другому.
    Route flap: частые обновления = нестабильность соединения.
    """
    # Route flapping
    prefix_update_counts = bgp_updates.groupby('prefix').size()
    flapping_prefixes = prefix_update_counts[prefix_update_counts > 10].index.tolist()

    # Новые AS-origin для известных префиксов
    known_origins = {}  # prefix → expected AS
    hijack_candidates = []
    for _, row in bgp_updates.iterrows():
        if row['prefix'] in known_origins:
            if row['origin_as'] != known_origins[row['prefix']]:
                hijack_candidates.append({
                    'prefix': row['prefix'],
                    'expected_as': known_origins[row['prefix']],
                    'detected_as': row['origin_as']
                })

    return {
        'flapping_prefixes': flapping_prefixes,
        'hijack_candidates': hijack_candidates,
        'route_instability': len(flapping_prefixes) > 5
    }

Alert Correlation and Noise Suppression

Anomaly Correlation Graph: A router uplink failure results in hundreds of downstream anomalies. Algorithm: build a dependency graph from the CMDB topology → identify the upstream source → group them into a single incident.

Timeframe: Prophet contextual anomaly + Isolation Forest on nodes + traffic anomaly — 3-4 weeks. BGP anomaly, alert correlation graph, automatic RCA, NOC integration — 2-3 months.