AI IoT Sensor Anomaly Detection System

We design and deploy artificial intelligence systems: from prototype to production-ready solutions. Our team combines expertise in machine learning, data engineering and MLOps to make AI work not in the lab, but in real business.
Showing 1 of 1 servicesAll 1566 services
AI IoT Sensor Anomaly Detection System
Medium
~2-4 weeks
FAQ
AI Development Areas
AI Solution Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1215
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1161
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    852
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1043
  • image_logo-advance_0.png
    B2B Advance company logo design
    561
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    823

AI-based anomaly detection system based on IoT sensor data

IoT sensors generate continuous data streams—temperature, pressure, humidity, gases, vibration. An anomaly could indicate a sensor malfunction, a change in a physical process, or a real hazard. The system must distinguish between these situations and respond appropriately.

Streaming architecture

Pipeline from sensor to alert:

MQTT (датчик) → Kafka → Flink / Spark Streaming → ML inference → AlertManager
                              ↓
                         InfluxDB / TimescaleDB
                              ↓
                         Grafana Dashboard

Kafka Processing Scheme:

from kafka import KafkaConsumer, KafkaProducer
import json
import numpy as np
from collections import defaultdict, deque

class IoTAnomalyProcessor:
    def __init__(self, bootstrap_servers='kafka:9092',
                 window_size=60):  # 60 последних значений
        self.consumer = KafkaConsumer(
            'iot-sensor-raw',
            bootstrap_servers=bootstrap_servers,
            value_deserializer=lambda m: json.loads(m.decode()),
            group_id='anomaly-detection'
        )
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode()
        )
        self.sensor_windows = defaultdict(lambda: deque(maxlen=window_size))
        self.sensor_stats = {}  # EWMA mean/std per sensor

    def process(self):
        for message in self.consumer:
            reading = message.value
            sensor_id = reading['sensor_id']
            value = reading['value']

            # Обновляем скользящее окно
            self.sensor_windows[sensor_id].append(value)
            window = list(self.sensor_windows[sensor_id])

            # Детекция аномалий
            if len(window) >= 30:
                result = self.detect_anomaly(sensor_id, value, window)
                if result['anomaly']:
                    self.producer.send('iot-anomalies', result)

    def detect_anomaly(self, sensor_id, current_value, window):
        mean = np.mean(window)
        std = np.std(window)

        z_score = (current_value - mean) / (std + 1e-9)
        is_anomaly = abs(z_score) > 3.5

        return {
            'sensor_id': sensor_id,
            'value': current_value,
            'z_score': round(z_score, 2),
            'anomaly': bool(is_anomaly),
            'window_mean': round(mean, 3),
            'window_std': round(std, 3),
            'severity': 'critical' if abs(z_score) > 5 else 'warning'
        }

Contextual anomaly

Temperature is normal - but not at 3 am:

def contextual_anomaly_detection(sensor_id: str,
                                   current_value: float,
                                   timestamp: pd.Timestamp,
                                   historical_data: pd.DataFrame) -> dict:
    """
    Нормальный диапазон зависит от:
    - Время суток (час)
    - День недели
    - Сезон (месяц)
    Baseline строится отдельно для каждого контекста.
    """
    # Контекст текущего момента
    context = {
        'hour': timestamp.hour,
        'day_of_week': timestamp.dayofweek,
        'month': timestamp.month
    }

    # Исторические данные в том же контексте
    context_data = historical_data[
        (historical_data['sensor_id'] == sensor_id) &
        (historical_data['hour'] == context['hour']) &
        (historical_data['day_of_week'] == context['day_of_week'])
    ]['value']

    if len(context_data) < 10:
        return {'status': 'insufficient_context_data'}

    context_mean = context_data.mean()
    context_std = context_data.std()
    context_z = (current_value - context_mean) / (context_std + 1e-9)

    return {
        'sensor_id': sensor_id,
        'value': current_value,
        'context': context,
        'context_mean': round(context_mean, 3),
        'context_z_score': round(context_z, 2),
        'contextual_anomaly': abs(context_z) > 3,
        'context_samples': len(context_data)
    }

Detection of a faulty sensor

Distinguishing between physical anomaly and sensor anomaly:

def distinguish_sensor_vs_process_anomaly(sensor_group: dict,
                                           anomalous_sensor_id: str) -> dict:
    """
    Если только один датчик из группы аномальный → скорее всего датчик сломан.
    Если все/большинство датчиков аномальны → процесс аномален.
    Применяется когда несколько датчиков измеряют одну физическую зону.
    """
    anomaly_count = sum(1 for s_id, result in sensor_group.items()
                        if result.get('anomaly', False))
    total = len(sensor_group)

    group_anomaly_ratio = anomaly_count / total

    if group_anomaly_ratio <= 0.25:
        return {
            'conclusion': 'sensor_fault',
            'sensor_id': anomalous_sensor_id,
            'anomaly_ratio': group_anomaly_ratio,
            'action': 'replace_or_recalibrate_sensor',
            'process_alert': False
        }
    elif group_anomaly_ratio >= 0.6:
        return {
            'conclusion': 'process_anomaly',
            'anomaly_ratio': group_anomaly_ratio,
            'action': 'investigate_physical_process',
            'process_alert': True
        }
    else:
        return {
            'conclusion': 'uncertain',
            'anomaly_ratio': group_anomaly_ratio,
            'action': 'manual_investigation',
            'process_alert': True  # на всякий случай
        }

Edge Inference on MicroController

Optimized model for ESP32 / Raspberry Pi:

import onnxruntime as ort
import numpy as np

class EdgeAnomalyDetector:
    """
    ONNX модель развёртывается на edge устройстве.
    Инференс без облака: критично для промышленных сетей с ограниченной связью.
    """
    def __init__(self, model_path: str, window_size: int = 30):
        self.session = ort.InferenceSession(model_path)
        self.window_size = window_size
        self.buffer = []
        self.threshold = 0.5

    def infer(self, new_value: float) -> dict:
        self.buffer.append(new_value)
        if len(self.buffer) > self.window_size:
            self.buffer.pop(0)

        if len(self.buffer) < self.window_size:
            return {'status': 'warming_up'}

        # Нормализация
        arr = np.array(self.buffer, dtype=np.float32)
        arr = (arr - arr.mean()) / (arr.std() + 1e-9)
        input_tensor = arr.reshape(1, self.window_size, 1)

        result = self.session.run(None, {'input': input_tensor})[0]
        anomaly_score = float(result[0][0])

        return {
            'anomaly': anomaly_score > self.threshold,
            'score': round(anomaly_score, 3),
            'latency_ms': 'local'  # нет сетевой задержки
        }

Platform integrations: AWS IoT Core, Azure IoT Hub, Yandex IoT Core, EdgeX Foundry. MQTT brokers: Eclipse Mosquitto, EMQ X. Grafana + InfluxDB for storage and visualization.

Timeframe: MQTT pipeline + z-score anomaly + InfluxDB + Grafana — 2-3 weeks. Contextual anomaly, sensor/process differentiation, ONNX edge deployment — 6-8 weeks.