Monitoring ML Models in Production
Modern ML systems rarely fail spectacularly. They fail silently. Performance degrades, data distribution drifts, latency creeps up, and by the time someone notices, you are debugging a production incident with very little visibility.
This post is a practical guide to monitoring ML models in production, covering concrete patterns for classic tabular models, RAG pipelines, and privacy-sensitive NLP.
What makes ML monitoring different
Traditional application monitoring cares about availability, latency, error rates, and resource usage. ML systems need all of that, plus one critical dimension: quality of predictions over time.
Three properties make ML monitoring harder than standard application monitoring:
- Data dependence - Model performance depends on input data distributions, which change over time.
- Delayed ground truth - Labels often arrive late, or never at all (e.g. RAG answers in a chatbot).
- Non-determinism - With LLMs and RAG systems especially, the same input may produce different outputs.
The core pillars of ML monitoring
I structure ML monitoring around five pillars:
- Operational health
- Data quality
- Drift detection
- Prediction quality
- User feedback & business impact
You will rarely have all five from day one. But you want to design your system so you can iteratively add them.
1. Operational health: metrics every model needs
Operational monitoring is the easiest to start with and integrates well with standard tooling like Prometheus, Grafana, Datadog or OpenTelemetry.
At a minimum, you want:
- Request rate (QPS / RPS)
- Latency percentiles (p50, p90, p99)
- Error rates (4xx, 5xx, model errors)
- Resource usage (CPU, GPU, memory)
If you already have basic HTTP metrics from a FastAPI + Docker deployment, extending them to ML-specific endpoints is straightforward.
Example: FastAPI + Prometheus metrics
from fastapi import FastAPI, HTTPException
from prometheus_client import Counter, Histogram, generate_latest
import time
app = FastAPI()
REQUEST_COUNT = Counter(
"ml_inference_requests_total",
"Total number of inference requests",
["model_name", "status"]
)
REQUEST_LATENCY = Histogram(
"ml_inference_latency_seconds",
"Inference latency in seconds",
["model_name"]
)
MODEL_NAME = "credit_risk_v1"
@app.get("/metrics")
async def metrics():
return generate_latest()
@app.post("/predict")
async def predict(payload: dict):
start_time = time.time()
try:
# parse input, run model, etc
prediction = run_model(payload)
REQUEST_COUNT.labels(model_name=MODEL_NAME, status="success").inc()
return {"prediction": prediction}
except Exception as e:
REQUEST_COUNT.labels(model_name=MODEL_NAME, status="error").inc()
raise HTTPException(status_code=500, detail=str(e))
finally:
elapsed = time.time() - start_time
REQUEST_LATENCY.labels(model_name=MODEL_NAME).observe(elapsed)
You can extend this pattern to log model time vs embedding time in RAG systems or GPU vs CPU processing.
2. Data quality: catching garbage before it hits the model
Most model incidents start as a data problem: a schema change, a bad upstream job, a feature suddenly becoming constant.
Data validation should happen before inference whenever possible.
What to monitor for
- Missing values ratio per feature
- Constant or near-constant values (e.g. 99 % zeros)
- Out-of-range values (based on training statistics)
- Schema violations (missing columns, wrong types)
You can implement this with tools like Great Expectations, custom checks, or even simple sanity thresholds.
Example: lightweight feature checks
import numpy as np
class DataQualityMonitor:
def __init__(self, feature_stats):
# feature_stats: {"feature_name": {"min": .., "max": ..}}
self.feature_stats = feature_stats
self.missing_ratio_threshold = 0.1
self.out_of_range_ratio_threshold = 0.05
def check_batch(self, X):
report = {}
n = len(X)
for feature, stats in self.feature_stats.items():
values = np.array([row.get(feature) for row in X], dtype=float)
missing = np.isnan(values).sum() / n
out_of_range = (
((values < stats["min"]) | (values > stats["max"]))
& ~np.isnan(values)
).sum() / n
report[feature] = {
"missing_ratio": missing,
"out_of_range_ratio": out_of_range,
"missing_alert": missing > self.missing_ratio_threshold,
"range_alert": out_of_range > self.out_of_range_ratio_threshold,
}
return report
You can expose aggregate values as metrics and trigger alerts if they cross thresholds.
In RAG systems, data quality also involves your vector database: chunk size, indexing lag, and corrupted embeddings all impact relevance. Monitoring ingestion lag and document counts per collection catches these problems early.
3. Drift detection: are we still in distribution?
Models are trained on historical data, but production data keeps evolving. Drift detection helps you answer two questions:
- Has the input distribution changed significantly? (data drift)
- Has the relationship between inputs and outputs changed? (concept drift)
You do not need fancy algorithms at first. Start simple and iterate.
Simple feature drift metrics
For each numeric feature, you can monitor:
- Mean and standard deviation
- Quantiles (p10, p50, p90)
- Population Stability Index (PSI)
Example PSI for binned features:
import numpy as np
def population_stability_index(ref, curr, n_bins=10):
# ref, curr are 1D numpy arrays
quantiles = np.quantile(ref, np.linspace(0, 1, n_bins + 1))
ref_hist, _ = np.histogram(ref, bins=quantiles)
curr_hist, _ = np.histogram(curr, bins=quantiles)
ref_pct = ref_hist / len(ref)
curr_pct = curr_hist / len(curr)
# avoid division by zero
mask = (ref_pct > 0) & (curr_pct > 0)
psi = np.sum((curr_pct[mask] - ref_pct[mask]) * np.log(curr_pct[mask] / ref_pct[mask]))
return psi
You can compute PSI daily per feature and alert if it exceeds a threshold (e.g. 0.2 moderate, 0.3 high).
Embedding and RAG-specific drift
In RAG systems, I monitor:
- Distribution of embedding norms
- Average similarity of new queries to historical queries
- Percentage of queries with low top-1 similarity (potential out-of-domain inputs)
Example: monitoring query embedding drift with cosine similarity:
import numpy as np
class EmbeddingDriftMonitor:
def __init__(self, ref_query_embeddings):
self.ref_centroid = ref_query_embeddings.mean(axis=0)
def avg_similarity_to_ref(self, batch_embeddings):
def cosine(a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
sims = [cosine(e, self.ref_centroid) for e in batch_embeddings]
return float(np.mean(sims))
Low similarity over time tells you your model is seeing qualitatively new queries.
4. Prediction quality: online, offline, and without labels
Ultimately, you want to monitor how well your model performs on its task. There are three modes:
- Online with immediate labels - e.g. click-through rate for recommendations.
- Delayed labels - e.g. credit default after 6 months.
- No labels - common in RAG and chatbots.
Online metrics with immediate labels
The easiest case: you log predictions and outcomes, then compute metrics in near real time.
Pattern:
- At inference time: log
model_version,features_hash,prediction_id,prediction,timestamp. - When label arrives: log
prediction_id,label. - Join logs in a batch job and compute metrics per model version, segment, time window.
The same join-and-compute pattern applies to RAG evaluation, though the metrics differ.
Delayed labels: monitoring proxies
When labels arrive late, you cannot wait months to detect issues. Use a mix of:
- Proxy metrics: e.g. approval rates, distribution of predicted scores.
- Backtesting: evaluate new data once labels mature, compare to historic baselines.
- Champion vs challenger: run multiple models in parallel and compare via A/B tests.
No labels: RAG and LLM systems
LLM-based systems rarely have ground truth labels at scale. Yet we still need to monitor quality.
Tactics that work well in practice:
- Structured quality rubrics evaluated by another model (e.g. faithfulness, relevance).
- Response length and structure: unusually short or long answers often signal issues.
- Citation coverage: percentage of answers that reference retrieved documents.
- User behavior: repeats, rephrases, manual overrides.
For example, you can log a simple synthetic quality score per answer using a cheap LLM evaluator:
from typing import List
# pseudo-code, assuming you have an LLM client
def evaluate_rag_answer(question: str, answer: str, contexts: List[str]) -> float:
prompt = f"""
Question: {question}
Answer: {answer}
Contexts: {contexts}
Rate the answer's faithfulness to the contexts on a scale from 1 to 5.
Respond with a single number.
""".strip()
rating_str = llm_client.generate(prompt)
try:
rating = float(rating_str.strip())
except ValueError:
rating = 0.0
return rating
You can sample a subset of traffic for such evaluations and log the scores to monitor for regressions.
5. Logging: what to record, and how to protect privacy
Monitoring depends on logging, and logging is where privacy risk grows fast, especially for NLP. Treat inference logs as sensitive data.
What to log
At a minimum, per request:
- Timestamps
- Request ID or correlation ID
- Model name and version
- Input schema version
- Prediction or response metadata
- Latency and resource tags
- Optional: user segment (coarsely bucketed, privacy-aware)
Avoid logging raw PII. For text inputs, consider:
- Hashing identifiers
- Masking obvious PII (emails, phone numbers) using NER
- Storing reduced representations (e.g. embedding norms, output length) instead of full text for monitoring-only use cases
Example: structured logging for inference
import json
import logging
import time
import uuid
logger = logging.getLogger("ml_inference")
logger.setLevel(logging.INFO)
class JsonFormatter(logging.Formatter):
def format(self, record):
return json.dumps(record.msg)
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
def log_inference(input_meta, prediction, latency, model_version):
log_record = {
"event": "inference",
"timestamp": time.time(),
"request_id": str(uuid.uuid4()),
"model_version": model_version,
"input_meta": input_meta, # already sanitized
"prediction_meta": {
"score": prediction.get("score"),
"class": prediction.get("class"),
},
"latency_ms": latency * 1000,
}
logger.info(log_record)
Downstream, you can feed these logs into a warehouse or OLAP store for aggregations.
Alerting and SLOs: when to wake someone up
Not every metric deviation deserves a pager. You want a small set of Service Level Objectives (SLOs) that map to real user or business impact.
Common ML SLOs:
- 99th percentile latency < 1 second.
- Error rate < 0.5 %.
- Daily mean model quality metric (e.g. accuracy, CTR) within X % of 30 day baseline.
- Data quality metrics below thresholds (missing values, schema violations).
For RAG or LLM systems:
- Average synthetic quality score above threshold.
- Percentage of requests with at least one high similarity retrieval above threshold.
Use multi-window alerting to avoid flapping: e.g. trigger only if metric breached for 3 consecutive 5 minute windows.
Building a minimal monitoring stack
If you are starting from scratch, a pragmatic stack looks like this:
- Metrics & dashboards - Prometheus + Grafana or a SaaS equivalent.
- Logs - Structured JSON logs shipped to an ELK stack or cloud logging.
- Batch evaluation jobs - Python scripts or Airflow jobs that:
- Aggregate logs
- Compute drift and quality metrics
- Push aggregated metrics back to the time-series DB
- Alerts - Defined on top of the aggregated metrics.
You can incrementally grow into a more sophisticated MLOps platform, but this basic setup already gives you:
- Early detection of regressions
- Visibility into data drift
- Evidence to guide retraining or model rollback decisions
You can integrate these batch jobs into your CI/CD pipeline so every new model ships with a pre-defined monitoring config.
Monitoring model versions and rollouts
One pattern I strongly recommend is version-aware monitoring:
- Tag every prediction with
model_version. - Maintain separate metrics per version.
- During rollouts, send a percentage of traffic to a new version and compare.
In many of my systems, I use a simple feature flag or routing layer:
import random
MODEL_VERSIONS = {
"v1": load_model("model_v1.pkl"),
"v2": load_model("model_v2.pkl"), # candidate
}
TRAFIC_SPLIT = {"v1": 0.9, "v2": 0.1}
def choose_model_version():
r = random.random()
cumulative = 0
for version, p in TRAFIC_SPLIT.items():
cumulative += p
if r < cumulative:
return version
return "v1"
def predict_with_routing(x):
version = choose_model_version()
model = MODEL_VERSIONS[version]
y = model.predict(x)
log_inference({"version": version}, {"class": y}, latency=0.01, model_version=version)
return y
With versioned metrics, you can quickly detect if v2 underperforms compared to v1 and roll back.
The same principle applies to multimodal AI systems where each model component (vision encoder, language model, fusion layer) should have separate metrics so you can reason about its contribution.
Making monitoring part of your engineering culture
Tools will not save you if the team does not act on what they show. Practices that help:
- Include monitoring checks in your definition of done for ML features.
- Review dashboards in weekly team meetings, not only during incidents.
- Perform post-mortems when monitoring fails to catch an issue.
- Document the expected ranges for key metrics next to model documentation.
Once monitoring is in place, it doubles as an experimentation tool. You can ship more aggressive ideas, knowing that you will see if they misbehave.
Key Takeaways
- ML monitoring extends traditional monitoring with data and prediction quality.
- Start with operational metrics (latency, errors, resource usage), then layer in data quality, drift, and prediction quality.
- Validate input data before inference to catch schema changes, missing values, and out-of-range features early.
- Use simple drift metrics like PSI and feature statistics before adopting complex methods, especially for tabular models.
- For RAG and LLM systems, rely on synthetic evaluators, retrieval metrics, and user behavior as proxies for quality.
- Log structured, privacy-aware records that include model version and metadata. Avoid raw PII or full text when not needed.
- Define clear SLOs tied to user impact and configure alerts with sensible thresholds and windows to avoid noise.
- Version-aware monitoring and gradual rollouts make it easy to compare models and safely deploy improvements.
- Integrate monitoring into your ML development lifecycle and team rituals.
Related Articles
CI/CD Pipelines for Machine Learning Projects
Learn how to design practical CI/CD pipelines for ML projects, covering testing, data checks, model evaluation, deployment and MLOps tooling.
11 min read · intermediateEngineeringDeploying ML Models with FastAPI and Docker
Learn how to containerize and deploy ML models using FastAPI and Docker, with patterns for scaling, performance, and production-ready setups.
8 min read · intermediateEngineeringPython Best Practices for ML Engineers
Practical Python best practices for ML engineers, from project structure and typing to performance, testing, and production-ready code.
10 min read · beginner