Development of an AI system for predicting accidents in housing and communal services pipelines and heating networks
The housing and utilities pipeline infrastructure is aging: the average age of a heating network in Russia is 25 years, while the standard is 20. Accidents during the heating season aren't just losses; they have social consequences. A predictive system prioritizes sections for replacement over a 3-12-month timeframe.
Data and sources
Information layers for analysis:
data_layers = {
'pipe_registry': {
'attributes': ['material', 'diameter_mm', 'installation_year',
'insulation_type', 'soil_type', 'depth_m'],
'source': 'GIS ТГК / ЕАСУП ЖКХ'
},
'accident_history': {
'attributes': ['accident_date', 'pipe_segment_id', 'failure_type',
'repair_type', 'repair_cost', 'outage_hours'],
'source': 'Аварийно-диспетчерская служба АДС'
},
'pressure_telemetry': {
'attributes': ['pressure_bar', 'temperature_c', 'flow_m3h'],
'frequency': '10 минут (ПТК SCADA)',
'source': 'датчики ИТП, ЦТП, насосные'
},
'soil_data': {
'attributes': ['soil_corrosivity', 'groundwater_level',
'freeze_depth_m', 'clay_content'],
'source': 'геологические изыскания + ГИС'
},
'weather_history': {
'attributes': ['temperature', 'precipitation', 'freeze_thaw_cycles'],
'source': 'Росгидромет API'
}
}
Pipeline risk model
Feature Engineering at the segment level:
import pandas as pd
import numpy as np
def build_pipe_risk_features(pipe_registry: pd.DataFrame,
accident_history: pd.DataFrame,
pressure_stats: pd.DataFrame) -> pd.DataFrame:
"""
Единица анализа: участок трубы между колодцами (50-200 м)
"""
features = pipe_registry.copy()
# Возраст и износ
current_year = pd.Timestamp.today().year
features['age_years'] = current_year - features['installation_year']
features['age_ratio'] = features['age_years'] / features['design_life_years'].fillna(25)
# История аварий на этом участке
accident_counts = accident_history.groupby('pipe_segment_id').agg(
accidents_total=('accident_date', 'count'),
accidents_5yr=('accident_date', lambda x: (pd.Timestamp.today() - x).dt.days.lt(5*365).sum()),
last_accident_days=('accident_date', lambda x: (pd.Timestamp.today() - x.max()).days if len(x) > 0 else 9999)
).reset_index()
features = features.merge(accident_counts, on='pipe_segment_id', how='left')
features['accidents_total'] = features['accidents_total'].fillna(0)
# Давление (режим работы)
pressure_avg = pressure_stats.groupby('segment_id')['pressure_bar'].agg(['mean', 'max', 'std'])
features = features.merge(pressure_avg, left_on='pipe_segment_id', right_index=True, how='left')
# Коррозионная активность грунта
corrosion_encoding = {'low': 1, 'medium': 2, 'high': 3, 'very_high': 4}
features['soil_corrosivity_score'] = features['soil_corrosivity'].map(corrosion_encoding).fillna(2)
# Материал трубы (закодируем риск материала)
material_risk = {'steel': 3, 'cast_iron': 4, 'asbestos_cement': 5, 'hdpe': 1, 'ppu': 2}
features['material_risk'] = features['pipe_material'].map(material_risk).fillna(3)
return features
XGBoost accident probability model:
from xgboost import XGBClassifier
from sklearn.model_selection import TimeSeriesSplit
def train_accident_probability_model(features_df: pd.DataFrame) -> XGBClassifier:
"""
Целевая переменная: авария на участке в течение 12 месяцев.
Обучение на исторических данных с разбивкой по годам.
"""
feature_cols = [
'age_years', 'age_ratio', 'diameter_mm',
'accidents_total', 'accidents_5yr', 'last_accident_days',
'pressure_mean', 'pressure_max', 'pressure_std',
'soil_corrosivity_score', 'material_risk',
'freeze_thaw_cycles_annual', 'groundwater_level',
'is_main_pipeline', # магистральная vs. распределительная
'operating_mode' # тепловая vs. ГВС vs. ХВС
]
model = XGBClassifier(
n_estimators=300,
max_depth=5,
learning_rate=0.05,
scale_pos_weight=10, # аварии редки (~5-8% в год)
eval_metric='aucpr',
random_state=42
)
X = features_df[feature_cols].fillna(-1)
y = features_df['accident_in_12m']
model.fit(X, y, eval_set=[(X, y)], verbose=False)
return model
Prioritizing pipe replacement
Priority Index Calculation:
def prioritize_pipe_replacements(risk_scores: pd.DataFrame,
budget_rub: float) -> pd.DataFrame:
"""
Риск-индекс = P(авария) × последствия.
Последствия: стоимость аварийного ремонта + отключённые потребители × часы.
"""
risk_scores['failure_consequence'] = (
risk_scores['emergency_repair_cost_rub'] +
risk_scores['connected_apartments'] * 500 * risk_scores['expected_outage_hours']
# 500 руб/час × N квартир — социальный ущерб
)
risk_scores['risk_index'] = (
risk_scores['failure_probability_12m'] * risk_scores['failure_consequence']
)
# Топ сегменты по риск-индексу в рамках бюджета
sorted_segments = risk_scores.sort_values('risk_index', ascending=False)
sorted_segments['cumulative_cost'] = sorted_segments['replacement_cost_rub'].cumsum()
within_budget = sorted_segments[sorted_segments['cumulative_cost'] <= budget_rub]
return within_budget[['pipe_segment_id', 'risk_index', 'failure_probability_12m',
'age_years', 'replacement_cost_rub', 'risk_rank']]
Pressure drop accident detection
Real-time pipeline rupture detection:
def detect_pipe_rupture(pressure_readings: pd.Series,
flow_readings: pd.Series,
baseline: dict) -> dict:
"""
Разрыв трубы: резкое падение давления + рост расхода в upstream узле
(теряется вода/теплоноситель).
"""
current_pressure = pressure_readings.iloc[-1]
baseline_pressure = baseline['pressure_mean']
baseline_std = baseline['pressure_std']
pressure_drop_sigma = (baseline_pressure - current_pressure) / (baseline_std + 1e-9)
# Одновременный рост расхода
current_flow = flow_readings.iloc[-1]
flow_increase_pct = (current_flow - baseline['flow_mean']) / (baseline['flow_mean'] + 1e-9) * 100
rupture_score = pressure_drop_sigma * 0.6 + max(0, flow_increase_pct / 20) * 0.4
return {
'pressure_drop_sigma': round(pressure_drop_sigma, 1),
'flow_increase_pct': round(flow_increase_pct, 1),
'rupture_score': round(rupture_score, 2),
'rupture_detected': pressure_drop_sigma > 3 and flow_increase_pct > 20,
'estimated_location': localize_leak_by_pressure_gradient(pressure_readings)
}
Integration with the Unified Automated System for Housing and Public Utilities Management and GIS: Export of prioritized areas to GIS (QGIS, ArcGIS) for visualization. Integration with the ADS system for automatic ticket generation. Mobile app for emergency crews with an incident map.
Timeframe: Risk model based on historical data + GIS visualization – 4-5 weeks. Real-time gap detection, replacement budget optimization, ADS integration, mobile client – 3-4 months.







