MLOps infrastructure for trading models

We design and develop full-cycle blockchain solutions: from smart contract architecture to launching DeFi protocols, NFT marketplaces and crypto exchanges. Security audits, tokenomics, integration with existing infrastructure.
Showing 1 of 1 servicesAll 1306 services
MLOps infrastructure for trading models
Complex
from 2 weeks to 3 months
FAQ
Blockchain Development Services
Blockchain Development Stages
Latest works
  • image_website-b2b-advance_0.png
    B2B ADVANCE company website development
    1238
  • image_web-applications_feedme_466_0.webp
    Development of a web application for FEEDME
    1167
  • image_websites_belfingroup_462_0.webp
    Website development for BELFINGROUP
    867
  • image_ecommerce_furnoro_435_0.webp
    Development of an online store for the company FURNORO
    1080
  • image_logo-advance_0.png
    B2B Advance company logo design
    563
  • image_crm_enviok_479_0.webp
    Development of a web application for Enviok
    829

Development of MLOps Infrastructure for Trading Models

MLOps is DevOps for machine learning. For trading models this is especially critical: delay in deploying a new version or inference server downtime costs real money. MLOps infrastructure ensures reliable, reproducible, and automated management of the full ML model lifecycle.

MLOps Components for Trading

Data Layer          │  ML Layer           │  Serving Layer       │  Monitoring
                    │                     │                       │
ClickHouse (ticks)  │  MLflow (tracking)  │  FastAPI (inference)  │  Prometheus
PostgreSQL (trade)  │  DVC (data version) │  Redis (cache)        │  Grafana
S3/MinIO (raw data) │  Prefect (pipeline) │  Docker/K8s           │  Alertmanager
Feature Store       │  Optuna (HPO)       │  Load Balancer        │  PagerDuty

Experiment Tracking with MLflow

import mlflow
import mlflow.sklearn
import mlflow.pytorch
from mlflow.models.signature import infer_signature

def train_with_mlflow_tracking(experiment_name, config, X_train, y_train, 
                                X_val, y_val, X_test, y_test):
    mlflow.set_experiment(experiment_name)
    
    with mlflow.start_run(run_name=f"{config['model_type']}_{config['version']}"):
        # Log parameters
        mlflow.log_params({
            'model_type': config['model_type'],
            'n_features': X_train.shape[1],
            'train_size': len(X_train),
            'val_size': len(X_val),
            **config.get('hyperparams', {})
        })
        
        # Train
        model = train_model(config, X_train, y_train, X_val, y_val)
        
        # Metrics
        val_metrics = evaluate_model(model, X_val, y_val)
        test_metrics = evaluate_model(model, X_test, y_test)
        
        mlflow.log_metrics({
            f'val_{k}': v for k, v in val_metrics.items()
        })
        mlflow.log_metrics({
            f'test_{k}': v for k, v in test_metrics.items()
        })
        
        # Save model with signature
        signature = infer_signature(X_train[:10], model.predict_proba(X_train[:10]))
        mlflow.sklearn.log_model(
            model,
            'model',
            signature=signature,
            registered_model_name=f"crypto_{config['symbol']}_predictor"
        )
        
        # Artifacts: confusion matrix, feature importance plot
        import matplotlib.pyplot as plt
        fig = plot_feature_importance(model, X_train.columns)
        mlflow.log_figure(fig, 'feature_importance.png')
        
        run_id = mlflow.active_run().info.run_id
    
    return run_id, test_metrics

Data Versioning with DVC

# dvc.yaml — pipeline definition
stages:
  fetch_data:
    cmd: python src/data/fetch_ohlcv.py --symbol BTC --days 730
    deps:
      - src/data/fetch_ohlcv.py
    outs:
      - data/raw/btc_ohlcv.parquet

  feature_engineering:
    cmd: python src/features/engineer.py
    deps:
      - src/features/engineer.py
      - data/raw/btc_ohlcv.parquet
    outs:
      - data/features/btc_features.parquet
    params:
      - params.yaml:
          - feature_engineering

  train:
    cmd: python src/train.py
    deps:
      - src/train.py
      - data/features/btc_features.parquet
    outs:
      - models/btc_predictor.pkl
    metrics:
      - metrics/train_metrics.json
    params:
      - params.yaml:
          - training

CI/CD for ML with GitHub Actions

# .github/workflows/ml_pipeline.yml
name: ML Training Pipeline

on:
  schedule:
    - cron: '0 1 * * 0'  # Every Sunday at 01:00 UTC
  workflow_dispatch:
    inputs:
      symbol:
        description: 'Trading symbol'
        default: 'BTC'

jobs:
  train:
    runs-on: [self-hosted, gpu]
    steps:
      - uses: actions/checkout@v3
      
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: pip install -r requirements.txt
      
      - name: Pull data with DVC
        run: dvc pull data/
        env:
          AWS_ACCESS_KEY_ID: ${{ secrets.AWS_KEY }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET }}
      
      - name: Run training pipeline
        run: dvc repro
        env:
          MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
      
      - name: Validate model
        run: python src/validate_model.py --min-accuracy 0.54 --min-sharpe 1.0
      
      - name: Deploy to production
        if: success()
        run: python src/deploy_model.py
        env:
          TRADING_API_KEY: ${{ secrets.TRADING_API }}

Kubernetes Deployment for Inference

# k8s/ml-inference-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: crypto-ml-inference
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-inference
  template:
    spec:
      containers:
      - name: inference
        image: crypto-ml-inference:latest
        resources:
          requests:
            cpu: "500m"
            memory: "1Gi"
          limits:
            cpu: "2000m"
            memory: "4Gi"
        env:
        - name: MLFLOW_TRACKING_URI
          valueFrom:
            secretKeyRef:
              name: ml-secrets
              key: mlflow_uri
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 10

Prometheus Metrics for ML

from prometheus_client import Counter, Histogram, Gauge, start_http_server

# Inference server metrics
prediction_counter = Counter('predictions_total', 'Total predictions', ['model', 'symbol'])
prediction_latency = Histogram('prediction_latency_seconds', 'Prediction latency',
                               buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0])
model_accuracy_gauge = Gauge('model_directional_accuracy', 'Rolling accuracy', 
                              ['model', 'symbol', 'window'])
feature_drift_gauge = Gauge('feature_psi', 'PSI for feature drift', ['model', 'feature'])

@app.post("/predict")
async def predict(request: PredictionRequest):
    prediction_counter.labels(model=request.model_id, symbol=request.symbol).inc()
    
    with prediction_latency.time():
        result = await run_inference(request)
    
    return result

Feature Store with Feast

from feast import FeatureStore, Entity, FeatureView, Field
from feast.types import Float32, Int64

# Define entities and features
crypto_entity = Entity(name='symbol', join_keys=['symbol'])

ohlcv_features = FeatureView(
    name='crypto_ohlcv_features',
    entities=[crypto_entity],
    ttl=timedelta(hours=2),
    schema=[
        Field(name='return_24h', dtype=Float32),
        Field(name='rsi_14', dtype=Float32),
        Field(name='bb_pos_20', dtype=Float32),
        Field(name='vol_ratio_24', dtype=Float32),
    ]
)

# Realtime serving
store = FeatureStore(repo_path='./feature_repo')

def get_realtime_features(symbol):
    features = store.get_online_features(
        features=['crypto_ohlcv_features:return_24h', 
                  'crypto_ohlcv_features:rsi_14'],
        entity_rows=[{'symbol': symbol}]
    )
    return features.to_dict()

Observability

Distributed tracing with OpenTelemetry: trace request from incoming market data to outgoing trading signal. Identify bottlenecks in the pipeline.

Structured logging with structured JSON format, ELK stack for aggregation.

Cost monitoring: track GPU compute cost for each training. Budget alerts.

Developing a complete MLOps infrastructure: MLflow + DVC, CI/CD pipeline, Kubernetes deployment, Prometheus/Grafana monitoring, Feast Feature Store and OpenTelemetry tracing.