AI Automated Data Anomaly Detection System

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI Automated Data Anomaly Detection System
Medium
~2-4 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1215
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1041
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

AI-based system for automatic data anomaly detection

Data anomalies are not only suspicious values but also data quality issues: technical glitches, ETL errors, schema drift, and changes in source behavior. The Data Quality Monitoring system detects them automatically, without requiring analysts to report them.

Typology of data anomalies

Classification by nature:

data_anomaly_types = {
    # Статистические аномалии значений
    'point_anomaly': 'одно значение резко выбивается из ряда',
    'contextual_anomaly': 'значение нормально в другом контексте (лето-зима)',
    'collective_anomaly': 'группа нормальных значений образует ненормальный паттерн',

    # Аномалии качества данных
    'schema_drift': 'новый столбец появился, старый исчез, тип изменился',
    'distribution_drift': 'распределение признака сдвинулось (feature drift)',
    'cardinality_anomaly': 'резкий рост уникальных значений в категориальном поле',
    'null_spike': 'процент NULL вырос с 0% до 40% за сутки',
    'volume_anomaly': 'количество записей за период аномально мало или велико',
    'freshness_anomaly': 'данные не обновлялись дольше ожидаемого'
}

Data Quality Monitoring

Automatic quality checks:

import pandas as pd
import numpy as np
from scipy.stats import ks_2samp

class DataQualityMonitor:
    def __init__(self, table_name: str, baseline_stats: dict):
        self.table_name = table_name
        self.baseline = baseline_stats

    def run_quality_checks(self, current_df: pd.DataFrame) -> dict:
        results = {'table': self.table_name, 'checks': [], 'issues': []}

        # 1. Проверка объёма
        row_count = len(current_df)
        baseline_rows = self.baseline.get('row_count_mean', row_count)
        baseline_rows_std = self.baseline.get('row_count_std', row_count * 0.1)

        volume_z = (row_count - baseline_rows) / (baseline_rows_std + 1e-9)
        if abs(volume_z) > 3:
            results['issues'].append({
                'check': 'volume',
                'severity': 'critical' if abs(volume_z) > 5 else 'warning',
                'current': row_count,
                'expected': int(baseline_rows),
                'z_score': round(volume_z, 2)
            })

        # 2. NULL ratio по колонкам
        for col in current_df.columns:
            null_pct = current_df[col].isnull().mean() * 100
            baseline_null = self.baseline.get(f'{col}_null_pct', 0)

            if null_pct > baseline_null + 10:  # >10% роста
                results['issues'].append({
                    'check': 'null_spike',
                    'column': col,
                    'severity': 'major' if null_pct > 50 else 'warning',
                    'current_null_pct': round(null_pct, 1),
                    'baseline_null_pct': round(baseline_null, 1)
                })

        # 3. Дрейф распределения (KS-тест)
        for col in current_df.select_dtypes(include=[np.number]).columns:
            if f'{col}_sample' in self.baseline:
                stat, p_value = ks_2samp(
                    self.baseline[f'{col}_sample'],
                    current_df[col].dropna().values
                )
                if p_value < 0.001:
                    results['issues'].append({
                        'check': 'distribution_drift',
                        'column': col,
                        'severity': 'warning',
                        'ks_statistic': round(stat, 3),
                        'p_value': round(p_value, 5)
                    })

        results['passed'] = len(results['issues']) == 0
        return results

Automatic profiling and baseline

Building a baseline from historical data:

def build_data_baseline(historical_batches: list[pd.DataFrame]) -> dict:
    """
    Baseline = статистика за последние 30 дней (обновляется еженедельно).
    """
    row_counts = [len(df) for df in historical_batches]

    baseline = {
        'row_count_mean': np.mean(row_counts),
        'row_count_std': np.std(row_counts),
        'row_count_min': np.min(row_counts),
        'row_count_max': np.max(row_counts)
    }

    if historical_batches:
        sample_df = pd.concat(historical_batches[-7:])  # последняя неделя

        for col in sample_df.select_dtypes(include=[np.number]).columns:
            col_data = sample_df[col].dropna()
            baseline[f'{col}_mean'] = col_data.mean()
            baseline[f'{col}_std'] = col_data.std()
            baseline[f'{col}_p5'] = col_data.quantile(0.05)
            baseline[f'{col}_p95'] = col_data.quantile(0.95)
            baseline[f'{col}_null_pct'] = sample_df[col].isnull().mean() * 100
            # Храним 500 сэмплов для KS-теста
            baseline[f'{col}_sample'] = col_data.sample(min(500, len(col_data))).values

        for col in sample_df.select_dtypes(include=['object', 'category']).columns:
            baseline[f'{col}_cardinality'] = sample_df[col].nunique()
            baseline[f'{col}_null_pct'] = sample_df[col].isnull().mean() * 100
            baseline[f'{col}_top_values'] = sample_df[col].value_counts().head(20).to_dict()

    return baseline

Anomaly detection in multivariate data

Isolation Forest for Tabular Data:

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler, LabelEncoder

def detect_row_level_anomalies(df: pd.DataFrame,
                                 contamination: float = 0.02) -> pd.DataFrame:
    """
    Обнаружение аномальных записей (не только отдельных значений).
    Полезно для: транзакционные данные, логи, CRM записи.
    """
    # Препроцессинг
    df_processed = df.copy()

    for col in df_processed.select_dtypes(include=['object']).columns:
        le = LabelEncoder()
        df_processed[col] = le.fit_transform(df_processed[col].astype(str))

    df_numeric = df_processed.select_dtypes(include=[np.number]).fillna(-999)

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(df_numeric)

    model = IsolationForest(contamination=contamination, random_state=42)
    anomaly_labels = model.fit_predict(X_scaled)
    anomaly_scores = -model.score_samples(X_scaled)

    df['is_anomaly'] = anomaly_labels == -1
    df['anomaly_score'] = anomaly_scores

    # Объяснение: какие признаки наиболее аномальны
    df_anomalies = df[df['is_anomaly']].copy()
    return df_anomalies.sort_values('anomaly_score', ascending=False)

Schema Drift Detection

Monitoring data schema changes:

def detect_schema_drift(current_schema: dict, baseline_schema: dict) -> dict:
    """
    Сравниваем схему текущих данных со схемой из baseline.
    Критично для ETL пайплайнов: изменение upstream источника ломает downstream.
    """
    issues = []

    # Пропавшие столбцы
    missing_cols = set(baseline_schema.keys()) - set(current_schema.keys())
    for col in missing_cols:
        issues.append({
            'type': 'column_dropped',
            'column': col,
            'severity': 'critical',
            'action': 'check_upstream_source'
        })

    # Новые столбцы
    new_cols = set(current_schema.keys()) - set(baseline_schema.keys())
    for col in new_cols:
        issues.append({
            'type': 'column_added',
            'column': col,
            'severity': 'info',
            'action': 'review_and_update_documentation'
        })

    # Изменение типов
    for col in set(baseline_schema.keys()) & set(current_schema.keys()):
        if baseline_schema[col] != current_schema[col]:
            issues.append({
                'type': 'type_changed',
                'column': col,
                'from': baseline_schema[col],
                'to': current_schema[col],
                'severity': 'major',
                'action': 'validate_downstream_compatibility'
            })

    return {
        'schema_drift_detected': len(issues) > 0,
        'critical_issues': [i for i in issues if i['severity'] == 'critical'],
        'all_issues': issues
    }

Integration with Data Platform: Great Expectations for declarative tests, dbt tests for transformations, Apache Atlas / Datahub for data lineage. Alerts in Slack, PagerDuty, and email for severity >= 'major'. Dashboard in Grafana with historical quality scores for each table.

Deadlines: Quality checks (volume, nulls, schema) + basic dashboard — 2-3 weeks. Distribution drift (KS test), row-level Isolation Forest, Great Expectations integration, lineage tracking — 2-3 months.