Development of MLOps Infrastructure for Trading Models
MLOps is DevOps for machine learning. For trading models this is especially critical: delay in deploying a new version or inference server downtime costs real money. MLOps infrastructure ensures reliable, reproducible, and automated management of the full ML model lifecycle.
MLOps Components for Trading
Data Layer │ ML Layer │ Serving Layer │ Monitoring
│ │ │
ClickHouse (ticks) │ MLflow (tracking) │ FastAPI (inference) │ Prometheus
PostgreSQL (trade) │ DVC (data version) │ Redis (cache) │ Grafana
S3/MinIO (raw data) │ Prefect (pipeline) │ Docker/K8s │ Alertmanager
Feature Store │ Optuna (HPO) │ Load Balancer │ PagerDuty
Experiment Tracking with MLflow
import mlflow
import mlflow.sklearn
import mlflow.pytorch
from mlflow.models.signature import infer_signature
def train_with_mlflow_tracking(experiment_name, config, X_train, y_train,
X_val, y_val, X_test, y_test):
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=f"{config['model_type']}_{config['version']}"):
# Log parameters
mlflow.log_params({
'model_type': config['model_type'],
'n_features': X_train.shape[1],
'train_size': len(X_train),
'val_size': len(X_val),
**config.get('hyperparams', {})
})
# Train
model = train_model(config, X_train, y_train, X_val, y_val)
# Metrics
val_metrics = evaluate_model(model, X_val, y_val)
test_metrics = evaluate_model(model, X_test, y_test)
mlflow.log_metrics({
f'val_{k}': v for k, v in val_metrics.items()
})
mlflow.log_metrics({
f'test_{k}': v for k, v in test_metrics.items()
})
# Save model with signature
signature = infer_signature(X_train[:10], model.predict_proba(X_train[:10]))
mlflow.sklearn.log_model(
model,
'model',
signature=signature,
registered_model_name=f"crypto_{config['symbol']}_predictor"
)
# Artifacts: confusion matrix, feature importance plot
import matplotlib.pyplot as plt
fig = plot_feature_importance(model, X_train.columns)
mlflow.log_figure(fig, 'feature_importance.png')
run_id = mlflow.active_run().info.run_id
return run_id, test_metrics
Data Versioning with DVC
# dvc.yaml — pipeline definition
stages:
fetch_data:
cmd: python src/data/fetch_ohlcv.py --symbol BTC --days 730
deps:
- src/data/fetch_ohlcv.py
outs:
- data/raw/btc_ohlcv.parquet
feature_engineering:
cmd: python src/features/engineer.py
deps:
- src/features/engineer.py
- data/raw/btc_ohlcv.parquet
outs:
- data/features/btc_features.parquet
params:
- params.yaml:
- feature_engineering
train:
cmd: python src/train.py
deps:
- src/train.py
- data/features/btc_features.parquet
outs:
- models/btc_predictor.pkl
metrics:
- metrics/train_metrics.json
params:
- params.yaml:
- training
CI/CD for ML with GitHub Actions
# .github/workflows/ml_pipeline.yml
name: ML Training Pipeline
on:
schedule:
- cron: '0 1 * * 0' # Every Sunday at 01:00 UTC
workflow_dispatch:
inputs:
symbol:
description: 'Trading symbol'
default: 'BTC'
jobs:
train:
runs-on: [self-hosted, gpu]
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Pull data with DVC
run: dvc pull data/
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_KEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET }}
- name: Run training pipeline
run: dvc repro
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
- name: Validate model
run: python src/validate_model.py --min-accuracy 0.54 --min-sharpe 1.0
- name: Deploy to production
if: success()
run: python src/deploy_model.py
env:
TRADING_API_KEY: ${{ secrets.TRADING_API }}
Kubernetes Deployment for Inference
# k8s/ml-inference-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: crypto-ml-inference
spec:
replicas: 3
selector:
matchLabels:
app: ml-inference
template:
spec:
containers:
- name: inference
image: crypto-ml-inference:latest
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "2000m"
memory: "4Gi"
env:
- name: MLFLOW_TRACKING_URI
valueFrom:
secretKeyRef:
name: ml-secrets
key: mlflow_uri
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 10
Prometheus Metrics for ML
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Inference server metrics
prediction_counter = Counter('predictions_total', 'Total predictions', ['model', 'symbol'])
prediction_latency = Histogram('prediction_latency_seconds', 'Prediction latency',
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0])
model_accuracy_gauge = Gauge('model_directional_accuracy', 'Rolling accuracy',
['model', 'symbol', 'window'])
feature_drift_gauge = Gauge('feature_psi', 'PSI for feature drift', ['model', 'feature'])
@app.post("/predict")
async def predict(request: PredictionRequest):
prediction_counter.labels(model=request.model_id, symbol=request.symbol).inc()
with prediction_latency.time():
result = await run_inference(request)
return result
Feature Store with Feast
from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float32, Int64
# Define entities and features
crypto_entity = Entity(name='symbol', join_keys=['symbol'])
ohlcv_features = FeatureView(
name='crypto_ohlcv_features',
entities=[crypto_entity],
ttl=timedelta(hours=2),
schema=[
Field(name='return_24h', dtype=Float32),
Field(name='rsi_14', dtype=Float32),
Field(name='bb_pos_20', dtype=Float32),
Field(name='vol_ratio_24', dtype=Float32),
]
)
# Realtime serving
store = FeatureStore(repo_path='./feature_repo')
def get_realtime_features(symbol):
features = store.get_online_features(
features=['crypto_ohlcv_features:return_24h',
'crypto_ohlcv_features:rsi_14'],
entity_rows=[{'symbol': symbol}]
)
return features.to_dict()
Observability
Distributed tracing with OpenTelemetry: trace request from incoming market data to outgoing trading signal. Identify bottlenecks in the pipeline.
Structured logging with structured JSON format, ELK stack for aggregation.
Cost monitoring: track GPU compute cost for each training. Budget alerts.
Developing a complete MLOps infrastructure: MLflow + DVC, CI/CD pipeline, Kubernetes deployment, Prometheus/Grafana monitoring, Feast Feature Store and OpenTelemetry tracing.







