Development of an AI system for automatic metering of water, gas, heat, electricity consumption
Advanced Metering Infrastructure (AMI) is the foundation for billing, network balancing, and loss detection. The ML layer above AMI solves problems beyond the capabilities of simple telemetry: identifying faulty meters, detecting theft, and forecasting consumption for planning.
AMI system architecture
Data collection chain:
Счётчик (Modbus/DLMS-COSEM/M-Bus)
↓ PLC (PowerLine Communication) / NB-IoT / LoRaWAN
↓ Концентратор данных (DCU)
↓ MDMS (Meter Data Management System)
↓ ML-аналитика + Биллинг
Protocols by resource type:
metering_protocols = {
'electricity': {
'protocol': 'DLMS/COSEM (IEC 62056)',
'communication': 'PLC G3, NB-IoT, GPRS',
'interval': '30 минут (профиль нагрузки)'
},
'water': {
'protocol': 'Modbus RTU / M-Bus',
'communication': 'LoRaWAN, NB-IoT',
'interval': '60 минут'
},
'heat': {
'protocol': 'M-Bus (EN 13757)',
'communication': 'LoRa, GPRS',
'interval': '60 минут (теплосчётчик Kamstrup, Danfoss)'
},
'gas': {
'protocol': 'Modbus / GSM',
'communication': 'GSM/GPRS, NB-IoT',
'interval': '60 минут'
}
}
Validation of readings
Detection of suspicious readings:
import pandas as pd
import numpy as np
from scipy import stats
def validate_meter_readings(meter_id: str,
current_reading: float,
history: pd.DataFrame) -> dict:
"""
Три типа аномалий:
1. Нулевое потребление — счётчик неисправен или намеренно отключён
2. Аномально высокое — прорыв, хищение, неисправность
3. Отрицательный прирост — счётчик заменён или переполнен
"""
issues = []
# Отрицательный прирост
if len(history) > 0:
prev_reading = history['reading'].iloc[-1]
delta = current_reading - prev_reading
if delta < 0:
issues.append({
'type': 'negative_increment',
'delta': delta,
'severity': 'warning',
'action': 'check_meter_replacement'
})
elif delta == 0 and history['reading'].diff().tail(3).sum() == 0:
issues.append({
'type': 'zero_consumption_extended',
'zero_periods': 3,
'severity': 'major',
'action': 'check_meter_communication'
})
# Статистическая аномалия
if len(history) >= 30:
typical_deltas = history['reading'].diff().dropna()
z_score = stats.zscore([delta])[0]
if abs(z_score) > 4:
issues.append({
'type': 'statistical_outlier',
'z_score': round(z_score, 2),
'severity': 'major' if z_score > 4 else 'critical',
'action': 'field_verification'
})
return {
'meter_id': meter_id,
'current_reading': current_reading,
'issues': issues,
'valid': len(issues) == 0
}
Detection of theft and unauthorized consumption
Network Balance - Loss Method:
def detect_losses_by_balance(supply_reading: float,
consumer_readings: pd.DataFrame,
technical_loss_pct: float = 0.03) -> dict:
"""
Баланс участка: подача - Σ(потребители) = потери
Нормативные потери зависят от типа сети и длины.
Превышение нормы = коммерческие потери (хищение или неисправный счётчик).
"""
total_consumed = consumer_readings['delta_kwh'].sum()
expected_technical = supply_reading * technical_loss_pct
actual_losses = supply_reading - total_consumed
commercial_losses = actual_losses - expected_technical
loss_rate = commercial_losses / (supply_reading + 1e-9)
return {
'supply_kwh': supply_reading,
'total_consumed_kwh': total_consumed,
'technical_losses_kwh': round(expected_technical, 2),
'commercial_losses_kwh': round(commercial_losses, 2),
'loss_rate_pct': round(loss_rate * 100, 2),
'anomaly': loss_rate > 0.08,
'action': 'audit_consumers_on_feeder' if loss_rate > 0.15 else None
}
Search for suspicious subscribers:
from sklearn.ensemble import IsolationForest
def detect_theft_suspects(consumer_features: pd.DataFrame) -> pd.DataFrame:
"""
Признаки хищения электроэнергии:
- Потребление ниже типичного для аналогичных объектов
- Потребление только ночью (обходит одноставочный тариф)
- Нет корреляции с погодой (если загородный дом)
- Резкое снижение после поверки счётчика
"""
features = [
'monthly_kwh', 'night_ratio', 'weather_correlation',
'year_over_year_change', 'peer_group_deviation'
]
model = IsolationForest(contamination=0.05, random_state=42)
consumer_features['theft_score'] = -model.fit_predict(consumer_features[features])
suspects = consumer_features[
consumer_features['theft_score'] == -1 # outlier
].sort_values('monthly_kwh')
suspects['priority'] = suspects['peer_group_deviation'].abs().rank(ascending=False)
return suspects[['consumer_id', 'address', 'monthly_kwh',
'peer_group_deviation', 'priority']]
Consumption forecast for balancing
Short-term load forecast:
from lightgbm import LGBMRegressor
def train_consumption_forecaster(meter_history: pd.DataFrame,
weather_data: pd.DataFrame) -> LGBMRegressor:
"""
Прогноз на 24 часа для конкретного счётчика / группы абонентов.
Используется для балансировки нагрузки и планирования закупок.
"""
features = [
'hour', 'day_of_week', 'month', 'is_weekend', 'is_holiday',
'temperature', 'temperature_forecast',
'consumption_lag_24h', 'consumption_lag_168h',
'consumption_rolling_mean_7d'
]
combined = meter_history.merge(weather_data, on='timestamp', how='left')
combined['consumption_lag_24h'] = combined['consumption'].shift(96) # 15-мин данные
combined['consumption_lag_168h'] = combined['consumption'].shift(672)
combined['consumption_rolling_mean_7d'] = combined['consumption'].rolling(672).mean()
train = combined.dropna(subset=features)
model = LGBMRegressor(n_estimators=200, learning_rate=0.05)
model.fit(train[features], train['consumption'])
return model
Billing integration: MDMS platforms: Itron EE, Landis+Gyr Gridstream, OpenWay Riva. Export to SAP IS-U, 1C: Housing and Utilities, and Billing Center. API for the consumer account – consumption history, anomalies, and savings tips.
Deadlines: AMI connector + data validation + base loss balance — 3-4 weeks. ML theft detection, load forecast, SAP IS-U integration, and personal account — 2-3 months.







