AI-based system for automatic data anomaly detection
Data anomalies are not only suspicious values but also data quality issues: technical glitches, ETL errors, schema drift, and changes in source behavior. The Data Quality Monitoring system detects them automatically, without requiring analysts to report them.
Typology of data anomalies
Classification by nature:
data_anomaly_types = {
# Статистические аномалии значений
'point_anomaly': 'одно значение резко выбивается из ряда',
'contextual_anomaly': 'значение нормально в другом контексте (лето-зима)',
'collective_anomaly': 'группа нормальных значений образует ненормальный паттерн',
# Аномалии качества данных
'schema_drift': 'новый столбец появился, старый исчез, тип изменился',
'distribution_drift': 'распределение признака сдвинулось (feature drift)',
'cardinality_anomaly': 'резкий рост уникальных значений в категориальном поле',
'null_spike': 'процент NULL вырос с 0% до 40% за сутки',
'volume_anomaly': 'количество записей за период аномально мало или велико',
'freshness_anomaly': 'данные не обновлялись дольше ожидаемого'
}
Data Quality Monitoring
Automatic quality checks:
import pandas as pd
import numpy as np
from scipy.stats import ks_2samp
class DataQualityMonitor:
def __init__(self, table_name: str, baseline_stats: dict):
self.table_name = table_name
self.baseline = baseline_stats
def run_quality_checks(self, current_df: pd.DataFrame) -> dict:
results = {'table': self.table_name, 'checks': [], 'issues': []}
# 1. Проверка объёма
row_count = len(current_df)
baseline_rows = self.baseline.get('row_count_mean', row_count)
baseline_rows_std = self.baseline.get('row_count_std', row_count * 0.1)
volume_z = (row_count - baseline_rows) / (baseline_rows_std + 1e-9)
if abs(volume_z) > 3:
results['issues'].append({
'check': 'volume',
'severity': 'critical' if abs(volume_z) > 5 else 'warning',
'current': row_count,
'expected': int(baseline_rows),
'z_score': round(volume_z, 2)
})
# 2. NULL ratio по колонкам
for col in current_df.columns:
null_pct = current_df[col].isnull().mean() * 100
baseline_null = self.baseline.get(f'{col}_null_pct', 0)
if null_pct > baseline_null + 10: # >10% роста
results['issues'].append({
'check': 'null_spike',
'column': col,
'severity': 'major' if null_pct > 50 else 'warning',
'current_null_pct': round(null_pct, 1),
'baseline_null_pct': round(baseline_null, 1)
})
# 3. Дрейф распределения (KS-тест)
for col in current_df.select_dtypes(include=[np.number]).columns:
if f'{col}_sample' in self.baseline:
stat, p_value = ks_2samp(
self.baseline[f'{col}_sample'],
current_df[col].dropna().values
)
if p_value < 0.001:
results['issues'].append({
'check': 'distribution_drift',
'column': col,
'severity': 'warning',
'ks_statistic': round(stat, 3),
'p_value': round(p_value, 5)
})
results['passed'] = len(results['issues']) == 0
return results
Automatic profiling and baseline
Building a baseline from historical data:
def build_data_baseline(historical_batches: list[pd.DataFrame]) -> dict:
"""
Baseline = статистика за последние 30 дней (обновляется еженедельно).
"""
row_counts = [len(df) for df in historical_batches]
baseline = {
'row_count_mean': np.mean(row_counts),
'row_count_std': np.std(row_counts),
'row_count_min': np.min(row_counts),
'row_count_max': np.max(row_counts)
}
if historical_batches:
sample_df = pd.concat(historical_batches[-7:]) # последняя неделя
for col in sample_df.select_dtypes(include=[np.number]).columns:
col_data = sample_df[col].dropna()
baseline[f'{col}_mean'] = col_data.mean()
baseline[f'{col}_std'] = col_data.std()
baseline[f'{col}_p5'] = col_data.quantile(0.05)
baseline[f'{col}_p95'] = col_data.quantile(0.95)
baseline[f'{col}_null_pct'] = sample_df[col].isnull().mean() * 100
# Храним 500 сэмплов для KS-теста
baseline[f'{col}_sample'] = col_data.sample(min(500, len(col_data))).values
for col in sample_df.select_dtypes(include=['object', 'category']).columns:
baseline[f'{col}_cardinality'] = sample_df[col].nunique()
baseline[f'{col}_null_pct'] = sample_df[col].isnull().mean() * 100
baseline[f'{col}_top_values'] = sample_df[col].value_counts().head(20).to_dict()
return baseline
Anomaly detection in multivariate data
Isolation Forest for Tabular Data:
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler, LabelEncoder
def detect_row_level_anomalies(df: pd.DataFrame,
contamination: float = 0.02) -> pd.DataFrame:
"""
Обнаружение аномальных записей (не только отдельных значений).
Полезно для: транзакционные данные, логи, CRM записи.
"""
# Препроцессинг
df_processed = df.copy()
for col in df_processed.select_dtypes(include=['object']).columns:
le = LabelEncoder()
df_processed[col] = le.fit_transform(df_processed[col].astype(str))
df_numeric = df_processed.select_dtypes(include=[np.number]).fillna(-999)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_numeric)
model = IsolationForest(contamination=contamination, random_state=42)
anomaly_labels = model.fit_predict(X_scaled)
anomaly_scores = -model.score_samples(X_scaled)
df['is_anomaly'] = anomaly_labels == -1
df['anomaly_score'] = anomaly_scores
# Объяснение: какие признаки наиболее аномальны
df_anomalies = df[df['is_anomaly']].copy()
return df_anomalies.sort_values('anomaly_score', ascending=False)
Schema Drift Detection
Monitoring data schema changes:
def detect_schema_drift(current_schema: dict, baseline_schema: dict) -> dict:
"""
Сравниваем схему текущих данных со схемой из baseline.
Критично для ETL пайплайнов: изменение upstream источника ломает downstream.
"""
issues = []
# Пропавшие столбцы
missing_cols = set(baseline_schema.keys()) - set(current_schema.keys())
for col in missing_cols:
issues.append({
'type': 'column_dropped',
'column': col,
'severity': 'critical',
'action': 'check_upstream_source'
})
# Новые столбцы
new_cols = set(current_schema.keys()) - set(baseline_schema.keys())
for col in new_cols:
issues.append({
'type': 'column_added',
'column': col,
'severity': 'info',
'action': 'review_and_update_documentation'
})
# Изменение типов
for col in set(baseline_schema.keys()) & set(current_schema.keys()):
if baseline_schema[col] != current_schema[col]:
issues.append({
'type': 'type_changed',
'column': col,
'from': baseline_schema[col],
'to': current_schema[col],
'severity': 'major',
'action': 'validate_downstream_compatibility'
})
return {
'schema_drift_detected': len(issues) > 0,
'critical_issues': [i for i in issues if i['severity'] == 'critical'],
'all_issues': issues
}
Integration with Data Platform: Great Expectations for declarative tests, dbt tests for transformations, Apache Atlas / Datahub for data lineage. Alerts in Slack, PagerDuty, and email for severity >= 'major'. Dashboard in Grafana with historical quality scores for each table.
Deadlines: Quality checks (volume, nulls, schema) + basic dashboard — 2-3 weeks. Distribution drift (KS test), row-level Isolation Forest, Great Expectations integration, lineage tracking — 2-3 months.







