AI-based anomaly detection system based on IoT sensor data
IoT sensors generate continuous data streams—temperature, pressure, humidity, gases, vibration. An anomaly could indicate a sensor malfunction, a change in a physical process, or a real hazard. The system must distinguish between these situations and respond appropriately.
Streaming architecture
Pipeline from sensor to alert:
MQTT (датчик) → Kafka → Flink / Spark Streaming → ML inference → AlertManager
↓
InfluxDB / TimescaleDB
↓
Grafana Dashboard
Kafka Processing Scheme:
from kafka import KafkaConsumer, KafkaProducer
import json
import numpy as np
from collections import defaultdict, deque
class IoTAnomalyProcessor:
def __init__(self, bootstrap_servers='kafka:9092',
window_size=60): # 60 последних значений
self.consumer = KafkaConsumer(
'iot-sensor-raw',
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda m: json.loads(m.decode()),
group_id='anomaly-detection'
)
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode()
)
self.sensor_windows = defaultdict(lambda: deque(maxlen=window_size))
self.sensor_stats = {} # EWMA mean/std per sensor
def process(self):
for message in self.consumer:
reading = message.value
sensor_id = reading['sensor_id']
value = reading['value']
# Обновляем скользящее окно
self.sensor_windows[sensor_id].append(value)
window = list(self.sensor_windows[sensor_id])
# Детекция аномалий
if len(window) >= 30:
result = self.detect_anomaly(sensor_id, value, window)
if result['anomaly']:
self.producer.send('iot-anomalies', result)
def detect_anomaly(self, sensor_id, current_value, window):
mean = np.mean(window)
std = np.std(window)
z_score = (current_value - mean) / (std + 1e-9)
is_anomaly = abs(z_score) > 3.5
return {
'sensor_id': sensor_id,
'value': current_value,
'z_score': round(z_score, 2),
'anomaly': bool(is_anomaly),
'window_mean': round(mean, 3),
'window_std': round(std, 3),
'severity': 'critical' if abs(z_score) > 5 else 'warning'
}
Contextual anomaly
Temperature is normal - but not at 3 am:
def contextual_anomaly_detection(sensor_id: str,
current_value: float,
timestamp: pd.Timestamp,
historical_data: pd.DataFrame) -> dict:
"""
Нормальный диапазон зависит от:
- Время суток (час)
- День недели
- Сезон (месяц)
Baseline строится отдельно для каждого контекста.
"""
# Контекст текущего момента
context = {
'hour': timestamp.hour,
'day_of_week': timestamp.dayofweek,
'month': timestamp.month
}
# Исторические данные в том же контексте
context_data = historical_data[
(historical_data['sensor_id'] == sensor_id) &
(historical_data['hour'] == context['hour']) &
(historical_data['day_of_week'] == context['day_of_week'])
]['value']
if len(context_data) < 10:
return {'status': 'insufficient_context_data'}
context_mean = context_data.mean()
context_std = context_data.std()
context_z = (current_value - context_mean) / (context_std + 1e-9)
return {
'sensor_id': sensor_id,
'value': current_value,
'context': context,
'context_mean': round(context_mean, 3),
'context_z_score': round(context_z, 2),
'contextual_anomaly': abs(context_z) > 3,
'context_samples': len(context_data)
}
Detection of a faulty sensor
Distinguishing between physical anomaly and sensor anomaly:
def distinguish_sensor_vs_process_anomaly(sensor_group: dict,
anomalous_sensor_id: str) -> dict:
"""
Если только один датчик из группы аномальный → скорее всего датчик сломан.
Если все/большинство датчиков аномальны → процесс аномален.
Применяется когда несколько датчиков измеряют одну физическую зону.
"""
anomaly_count = sum(1 for s_id, result in sensor_group.items()
if result.get('anomaly', False))
total = len(sensor_group)
group_anomaly_ratio = anomaly_count / total
if group_anomaly_ratio <= 0.25:
return {
'conclusion': 'sensor_fault',
'sensor_id': anomalous_sensor_id,
'anomaly_ratio': group_anomaly_ratio,
'action': 'replace_or_recalibrate_sensor',
'process_alert': False
}
elif group_anomaly_ratio >= 0.6:
return {
'conclusion': 'process_anomaly',
'anomaly_ratio': group_anomaly_ratio,
'action': 'investigate_physical_process',
'process_alert': True
}
else:
return {
'conclusion': 'uncertain',
'anomaly_ratio': group_anomaly_ratio,
'action': 'manual_investigation',
'process_alert': True # на всякий случай
}
Edge Inference on MicroController
Optimized model for ESP32 / Raspberry Pi:
import onnxruntime as ort
import numpy as np
class EdgeAnomalyDetector:
"""
ONNX модель развёртывается на edge устройстве.
Инференс без облака: критично для промышленных сетей с ограниченной связью.
"""
def __init__(self, model_path: str, window_size: int = 30):
self.session = ort.InferenceSession(model_path)
self.window_size = window_size
self.buffer = []
self.threshold = 0.5
def infer(self, new_value: float) -> dict:
self.buffer.append(new_value)
if len(self.buffer) > self.window_size:
self.buffer.pop(0)
if len(self.buffer) < self.window_size:
return {'status': 'warming_up'}
# Нормализация
arr = np.array(self.buffer, dtype=np.float32)
arr = (arr - arr.mean()) / (arr.std() + 1e-9)
input_tensor = arr.reshape(1, self.window_size, 1)
result = self.session.run(None, {'input': input_tensor})[0]
anomaly_score = float(result[0][0])
return {
'anomaly': anomaly_score > self.threshold,
'score': round(anomaly_score, 3),
'latency_ms': 'local' # нет сетевой задержки
}
Platform integrations: AWS IoT Core, Azure IoT Hub, Yandex IoT Core, EdgeX Foundry. MQTT brokers: Eclipse Mosquitto, EMQ X. Grafana + InfluxDB for storage and visualization.
Timeframe: MQTT pipeline + z-score anomaly + InfluxDB + Grafana — 2-3 weeks. Contextual anomaly, sensor/process differentiation, ONNX edge deployment — 6-8 weeks.







