ML model degradation monitoring system

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
ML model degradation monitoring system
Medium
~5 business days
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1238
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1167
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    867
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1080
  • image_logo-advance_0.png
    B2B Advance company logo design
    563
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    829

Development of ML Model Degradation Monitoring System

ML models for trading degrade for several reasons: market regimes change, patterns that the model used get arbitraged away, correlations between assets change. Degradation monitoring allows detection of the problem before it causes financial damage.

Types of Degradation

Concept drift: relationships between features and target have changed. The model predicts correctly for the "old" market.

Data drift (feature drift): input data started distributing differently than during training. The model receives atypical inputs.

Label drift: the distribution of the target variable itself has changed (e.g., market shifted from trending to ranging).

Performance degradation: prediction quality declined even without explicit drift.

Monitoring Metrics

import numpy as np
import pandas as pd
from scipy import stats
from collections import deque

class ModelDegradationMonitor:
    def __init__(self, model_id, baseline_metrics, alert_thresholds):
        self.model_id = model_id
        self.baseline = baseline_metrics
        self.thresholds = alert_thresholds
        
        # Rolling windows for metrics
        self.predictions_buffer = deque(maxlen=500)
        self.actuals_buffer = deque(maxlen=500)
        self.features_buffer = deque(maxlen=1000)
    
    def log_prediction(self, features, prediction, confidence):
        self.predictions_buffer.append({
            'prediction': prediction,
            'confidence': confidence,
            'timestamp': datetime.utcnow()
        })
        self.features_buffer.append(features)
    
    def log_actual(self, actual_return):
        self.actuals_buffer.append(actual_return)
    
    def calculate_performance_metrics(self, window=100):
        if len(self.predictions_buffer) < window:
            return None
        
        recent_preds = [p['prediction'] for p in list(self.predictions_buffer)[-window:]]
        recent_actuals = list(self.actuals_buffer)[-window:]
        
        if len(recent_actuals) < window:
            return None
        
        # Directional accuracy
        dir_accuracy = np.mean(
            np.sign(recent_preds) == np.sign(recent_actuals)
        )
        
        # Confidence calibration: high confidence should yield high accuracy
        high_conf_preds = [
            (p['prediction'], a) 
            for p, a in zip(list(self.predictions_buffer)[-window:], recent_actuals)
            if p['confidence'] > 0.65
        ]
        
        if high_conf_preds:
            high_conf_accuracy = np.mean([
                np.sign(pred) == np.sign(actual) 
                for pred, actual in high_conf_preds
            ])
        else:
            high_conf_accuracy = None
        
        return {
            'directional_accuracy': dir_accuracy,
            'high_conf_accuracy': high_conf_accuracy,
            'degradation': dir_accuracy - self.baseline.get('directional_accuracy', 0.55),
            'n_predictions': window
        }
    
    def calculate_psi(self, train_distribution, current_values, n_bins=10):
        """Population Stability Index for feature drift"""
        bins = np.percentile(train_distribution, np.linspace(0, 100, n_bins + 1))
        bins[0] -= 1e-8
        
        train_pct = np.ones(n_bins) / n_bins  # uniform by quantiles
        current_hist = np.histogram(current_values, bins=bins)[0]
        current_pct = np.clip(current_hist / current_hist.sum(), 1e-8, None)
        
        psi = np.sum((current_pct - train_pct) * np.log(current_pct / train_pct))
        return psi
    
    def detect_concept_drift(self, method='ks_test', alpha=0.05):
        """KS-test to compare recent and historical prediction distributions"""
        if len(self.predictions_buffer) < 200:
            return False, 1.0
        
        preds = [p['prediction'] for p in self.predictions_buffer]
        old_preds = preds[:100]
        new_preds = preds[-100:]
        
        if method == 'ks_test':
            ks_stat, p_value = stats.ks_2samp(old_preds, new_preds)
            return p_value < alpha, p_value
        
        return False, 1.0
    
    def check_all_alerts(self):
        alerts = []
        
        # 1. Performance degradation
        perf = self.calculate_performance_metrics()
        if perf and perf['degradation'] < -self.thresholds.get('max_accuracy_drop', 0.05):
            alerts.append({
                'type': 'performance_degradation',
                'severity': 'HIGH',
                'detail': f"Accuracy dropped {perf['degradation']:.3f} from baseline"
            })
        
        # 2. Feature drift
        recent_features = list(self.features_buffer)[-100:]
        if recent_features and self.baseline.get('feature_distributions'):
            for feature_name in self.baseline['feature_distributions']:
                current_vals = [f.get(feature_name) for f in recent_features if f.get(feature_name) is not None]
                if current_vals:
                    psi = self.calculate_psi(
                        self.baseline['feature_distributions'][feature_name],
                        current_vals
                    )
                    if psi > 0.25:
                        alerts.append({
                            'type': 'feature_drift',
                            'severity': 'MEDIUM',
                            'feature': feature_name,
                            'psi': psi
                        })
        
        # 3. Concept drift
        drifted, p_val = self.detect_concept_drift()
        if drifted:
            alerts.append({
                'type': 'concept_drift',
                'severity': 'MEDIUM',
                'p_value': p_val
            })
        
        return alerts

Grafana Dashboard

Key panels:

  • Rolling Accuracy Chart: 100-point rolling average of directional accuracy. Baseline line for comparison.

  • PSI Heatmap: PSI matrix by features over time. Color: green (< 0.1), yellow (0.1–0.2), red (> 0.2).

  • Confidence Distribution: histogram of confidence scores. Shift towards 0.5 (uncertainty) is a sign of degradation.

  • Alert Timeline: feed of alerts with severity and type.

Alert System

ALERT_CHANNELS = {
    'HIGH': ['telegram', 'email', 'pagerduty'],
    'MEDIUM': ['telegram', 'email'],
    'LOW': ['telegram']
}

async def send_degradation_alert(alert, model_id):
    message = f"""
⚠️ ML Model Degradation Alert
Model: {model_id}
Type: {alert['type']}
Severity: {alert['severity']}
Detail: {alert.get('detail', '')}
Time: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}

Action recommended: Check model retraining system
"""
    channels = ALERT_CHANNELS.get(alert['severity'], ['telegram'])
    for channel in channels:
        await send_notification(channel, message)

Developing a degradation monitoring system with PSI feature drift detection, performance tracking, concept drift testing, Grafana dashboard and multilevel alert system.