Development of ML Model Degradation Monitoring System
ML models for trading degrade for several reasons: market regimes change, patterns that the model used get arbitraged away, correlations between assets change. Degradation monitoring allows detection of the problem before it causes financial damage.
Types of Degradation
Concept drift: relationships between features and target have changed. The model predicts correctly for the "old" market.
Data drift (feature drift): input data started distributing differently than during training. The model receives atypical inputs.
Label drift: the distribution of the target variable itself has changed (e.g., market shifted from trending to ranging).
Performance degradation: prediction quality declined even without explicit drift.
Monitoring Metrics
import numpy as np
import pandas as pd
from scipy import stats
from collections import deque
class ModelDegradationMonitor:
def __init__(self, model_id, baseline_metrics, alert_thresholds):
self.model_id = model_id
self.baseline = baseline_metrics
self.thresholds = alert_thresholds
# Rolling windows for metrics
self.predictions_buffer = deque(maxlen=500)
self.actuals_buffer = deque(maxlen=500)
self.features_buffer = deque(maxlen=1000)
def log_prediction(self, features, prediction, confidence):
self.predictions_buffer.append({
'prediction': prediction,
'confidence': confidence,
'timestamp': datetime.utcnow()
})
self.features_buffer.append(features)
def log_actual(self, actual_return):
self.actuals_buffer.append(actual_return)
def calculate_performance_metrics(self, window=100):
if len(self.predictions_buffer) < window:
return None
recent_preds = [p['prediction'] for p in list(self.predictions_buffer)[-window:]]
recent_actuals = list(self.actuals_buffer)[-window:]
if len(recent_actuals) < window:
return None
# Directional accuracy
dir_accuracy = np.mean(
np.sign(recent_preds) == np.sign(recent_actuals)
)
# Confidence calibration: high confidence should yield high accuracy
high_conf_preds = [
(p['prediction'], a)
for p, a in zip(list(self.predictions_buffer)[-window:], recent_actuals)
if p['confidence'] > 0.65
]
if high_conf_preds:
high_conf_accuracy = np.mean([
np.sign(pred) == np.sign(actual)
for pred, actual in high_conf_preds
])
else:
high_conf_accuracy = None
return {
'directional_accuracy': dir_accuracy,
'high_conf_accuracy': high_conf_accuracy,
'degradation': dir_accuracy - self.baseline.get('directional_accuracy', 0.55),
'n_predictions': window
}
def calculate_psi(self, train_distribution, current_values, n_bins=10):
"""Population Stability Index for feature drift"""
bins = np.percentile(train_distribution, np.linspace(0, 100, n_bins + 1))
bins[0] -= 1e-8
train_pct = np.ones(n_bins) / n_bins # uniform by quantiles
current_hist = np.histogram(current_values, bins=bins)[0]
current_pct = np.clip(current_hist / current_hist.sum(), 1e-8, None)
psi = np.sum((current_pct - train_pct) * np.log(current_pct / train_pct))
return psi
def detect_concept_drift(self, method='ks_test', alpha=0.05):
"""KS-test to compare recent and historical prediction distributions"""
if len(self.predictions_buffer) < 200:
return False, 1.0
preds = [p['prediction'] for p in self.predictions_buffer]
old_preds = preds[:100]
new_preds = preds[-100:]
if method == 'ks_test':
ks_stat, p_value = stats.ks_2samp(old_preds, new_preds)
return p_value < alpha, p_value
return False, 1.0
def check_all_alerts(self):
alerts = []
# 1. Performance degradation
perf = self.calculate_performance_metrics()
if perf and perf['degradation'] < -self.thresholds.get('max_accuracy_drop', 0.05):
alerts.append({
'type': 'performance_degradation',
'severity': 'HIGH',
'detail': f"Accuracy dropped {perf['degradation']:.3f} from baseline"
})
# 2. Feature drift
recent_features = list(self.features_buffer)[-100:]
if recent_features and self.baseline.get('feature_distributions'):
for feature_name in self.baseline['feature_distributions']:
current_vals = [f.get(feature_name) for f in recent_features if f.get(feature_name) is not None]
if current_vals:
psi = self.calculate_psi(
self.baseline['feature_distributions'][feature_name],
current_vals
)
if psi > 0.25:
alerts.append({
'type': 'feature_drift',
'severity': 'MEDIUM',
'feature': feature_name,
'psi': psi
})
# 3. Concept drift
drifted, p_val = self.detect_concept_drift()
if drifted:
alerts.append({
'type': 'concept_drift',
'severity': 'MEDIUM',
'p_value': p_val
})
return alerts
Grafana Dashboard
Key panels:
-
Rolling Accuracy Chart: 100-point rolling average of directional accuracy. Baseline line for comparison.
-
PSI Heatmap: PSI matrix by features over time. Color: green (< 0.1), yellow (0.1–0.2), red (> 0.2).
-
Confidence Distribution: histogram of confidence scores. Shift towards 0.5 (uncertainty) is a sign of degradation.
-
Alert Timeline: feed of alerts with severity and type.
Alert System
ALERT_CHANNELS = {
'HIGH': ['telegram', 'email', 'pagerduty'],
'MEDIUM': ['telegram', 'email'],
'LOW': ['telegram']
}
async def send_degradation_alert(alert, model_id):
message = f"""
⚠️ ML Model Degradation Alert
Model: {model_id}
Type: {alert['type']}
Severity: {alert['severity']}
Detail: {alert.get('detail', '')}
Time: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}
Action recommended: Check model retraining system
"""
channels = ALERT_CHANNELS.get(alert['severity'], ['telegram'])
for channel in channels:
await send_notification(channel, message)
Developing a degradation monitoring system with PSI feature drift detection, performance tracking, concept drift testing, Grafana dashboard and multilevel alert system.







