Building Real-Time ML Inference Pipelines
Real-time ML is where models stop being research artifacts and start becoming products. Users don't care about your validation accuracy. They care about one thing: how fast and how reliably your system answers.
When we built low-latency RAG APIs at Ailog, the architecture work mattered as much as the model work. A well-designed retrieval-augmented generation service can feel instant, even when it hides vector search, ranking, and LLM calls under the hood. A poorly designed one feels sluggish, even with great models.
This post is a practical guide to building real-time ML inference pipelines that stay fast, robust, and maintainable.
What "real-time" really means
"Real-time" is often abused. For ML inference pipelines, I use three broad categories:
- Soft real-time: user-facing APIs like RAG chatbots, semantic search, recommendations.
- Latency: 100 ms to 2 seconds
- Near real-time: streaming analytics, fraud scoring on events, alerting systems.
- Latency: seconds to a few minutes
- Hard real-time: embedded, robotics, safety-critical. Out of scope here.
Most web ML products live in the soft or near real-time world. You care about:
- Tail latency: p95 / p99, not just average.
- Throughput: how many requests per second (RPS) you can handle.
- Freshness: how quickly new data is reflected.
Latency, throughput, and quality must be tracked together. A fast but wrong system is useless. A very accurate but slow system will not be used.
Core architecture patterns
At a high level, a real-time inference pipeline usually looks like this:
- Request comes in (HTTP / gRPC / WebSocket).
- Request is validated, enriched, and possibly queued.
- One or more model services run inference.
- Results are post-processed, ranked, and returned.
- Some data is logged for monitoring and offline training.
Sync vs async APIs
- Synchronous API: client sends a request and waits for the answer.
- Great for chatbots, semantic search, classic RAG.
- Asynchronous API: client sends a job and polls or receives a callback.
- Useful for heavier models, long-running agents, large batch jobs.
For user-facing LLM or RAG, you often want a synchronous response with streaming output, where intermediate reasoning is streamed to the user as it becomes available.
Microservice boundaries
Common decomposition:
- Gateway / API service: authentication, rate limiting, request shaping.
- Feature or retrieval service: vector search, hybrid search, feature joins.
- Model service: runs LLM or smaller models.
- Orchestrator: coordinates multi-step flows, like agentic RAG.
If you have a RAG system, you might separate:
- Indexing / ingestion pipeline (offline or near real-time).
- Query-time inference pipeline (online, real-time).
This post focuses on the query-time side.
Designing for latency budgets
You should start with a latency budget and work backwards.
Example: A RAG chat API with a 1 second p95 budget for first token.
Rough budget:
- Network, auth, routing: 100 ms
- Retrieval (vector DB + ranking): 150 ms
- Orchestration, pre/post-processing: 100 ms
- LLM initial token: 500-600 ms
Once you assign budgets, you can:
- Choose model sizes (small reranker vs giant LLM).
- Decide where to cache.
- Decide when to precompute features.
Practical pipeline components
1. Fast API layer
I usually reach for FastAPI for Python-based inference services. It plays nicely with async patterns and type hints.
Simple skeleton:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import time
app = FastAPI()
class Query(BaseModel):
text: str
class Response(BaseModel):
answer: str
latency_ms: int
@app.post("/infer", response_model=Response)
async def infer(query: Query):
start = time.perf_counter()
# TODO: call retrieval, model, post-process
answer = f"Echo: {query.text}"
latency_ms = int((time.perf_counter() - start) * 1000)
if latency_ms > 1000:
# Hook to send slow logs elsewhere
pass
return Response(answer=answer, latency_ms=latency_ms)
Use this layer for:
- Validation & sanitization.
- Lightweight routing (A/B, canary).
- Rate limiting and auth (via middleware).
2. Queues and backpressure
For soft real-time, you want to absorb spikes without collapsing. A message queue or streaming system like Kafka, NATS, or Redis Stream is useful.
Pattern:
- API service puts work items in a queue if backend is saturated.
- Worker pool consumes and runs ML inference.
- Optional: use a max queue size to fail fast instead of degrading indefinitely.
Using Redis and rq for a simple async pipeline:
# worker.py
from rq import Worker, Queue, Connection
import os
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
listen = ["default"]
if __name__ == "__main__":
with Connection():
worker = Worker(list(map(Queue, listen)))
worker.work()
And enqueue from your FastAPI service:
# app.py
from rq import Queue
from redis import Redis
from fastapi import BackgroundTasks
redis_conn = Redis.from_url("redis://localhost:6379/0")
queue = Queue("default", connection=redis_conn)
async def heavy_inference_job(text: str) -> str:
# Heavy model call
return f"Processed: {text}"
@app.post("/async-infer")
async def async_infer(query: Query, background_tasks: BackgroundTasks):
job = queue.enqueue(heavy_inference_job, query.text)
return {"job_id": job.id}
This turns your pipeline into a near real-time system where clients can poll or receive callbacks.
3. Efficient model serving
The core of the pipeline is the model service. Key practices:
- Model warmup: run a few dummy inferences on startup.
- Batching: group multiple requests into a single forward pass if you are GPU-bound.
- Concurrency control: limit simultaneous requests per model instance.
A minimal batching loop with asyncio:
import asyncio
from typing import List, Tuple
BATCH_SIZE = 8
BATCH_TIMEOUT = 0.005 # 5 ms
class Batcher:
def __init__(self, model):
self.model = model
self.queue: List[Tuple[asyncio.Future, str]] = []
self.lock = asyncio.Lock()
self.task = asyncio.create_task(self._batch_loop())
async def submit(self, text: str) -> str:
fut: asyncio.Future = asyncio.get_event_loop().create_future()
async with self.lock:
self.queue.append((fut, text))
return await fut
async def _batch_loop(self):
while True:
await asyncio.sleep(BATCH_TIMEOUT)
async with self.lock:
if not self.queue:
continue
batch = self.queue[:BATCH_SIZE]
self.queue = self.queue[BATCH_SIZE:]
futures, texts = zip(*batch)
# Single batched model call
outputs = self.model.predict(list(texts))
for fut, out in zip(futures, outputs):
if not fut.done():
fut.set_result(out)
Tie this into a FastAPI endpoint:
batcher = Batcher(model)
@app.post("/model-infer")
async def model_infer(query: Query):
output = await batcher.submit(query.text)
return {"output": output}
With a tight timeout, you keep single-request latency acceptable while still benefiting from batching under load.
4. Pre-processing and feature pipelines
Real-time pipelines often contain a surprising amount of non-ML work:
- Tokenization and normalization.
- Feature joins with user or product metadata.
- Security and privacy filters.
Guidelines:
- Keep per-request CPU-heavy preprocessing small.
- Push heavy feature engineering to offline batches where possible.
- For text, apply simple deterministic functions at request time.
Example: a small, fast sanitizer and feature builder for a RAG query.
import re
from typing import Dict
PRIVATE_PATTERNS = [
re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), # SSN-like
]
def sanitize_text(text: str) -> str:
text = text.strip()
for pat in PRIVATE_PATTERNS:
text = pat.sub("[SENSITIVE]", text)
return text
async def build_features(user_id: str, query: str) -> Dict:
# In a real system, load user profile from cache or fast DB
user_tier = "pro" if user_id.startswith("P") else "free"
return {
"user_id": user_id,
"user_tier": user_tier,
"query": sanitize_text(query),
}
5. Retrieval and ranking in RAG pipelines
If your pipeline is RAG-based, a big chunk of your latency is in retrieval and ranking.
Components:
- Vector search (dense retrieval over an embedding index).
- Hybrid search (dense + sparse) for better recall.
- Reranking model to pick the best contexts.
Practical tips:
- Use smaller embedding models for queries if possible.
- Keep top-k small for the final reranker (e.g. 10-50).
- Profile your vector DB under realistic load before committing to a provider.
Minimal retrieval module example using a generic client:
class Retriever:
def __init__(self, vector_client, embed_model):
self.client = vector_client
self.embed_model = embed_model
async def retrieve(self, query: str, k: int = 10):
q_emb = self.embed_model.encode([query])[0]
# Assume async vector search client
hits = await self.client.search(q_emb, top_k=k)
return [hit["text"] for hit in hits]
This would be used inside your FastAPI endpoint, with careful time budgeting.
Observability and feedback loops
Real-time pipelines must be observable. Without that, you are blind when latency spikes or quality drops.
Essential metric categories:
- Request metrics: latency histogram, throughput, error rates.
- Model metrics: per-model latency, GPU utilization, queue depth.
- Quality metrics: user feedback, click-through, human evals. Evaluating RAG system performance is a good starting point if your pipeline serves retrieval-augmented answers.
In code, you can add simple counters and histograms with Prometheus:
from prometheus_client import Counter, Histogram
REQUESTS = Counter("inference_requests_total", "Total inference requests")
LATENCY = Histogram("inference_latency_seconds", "Inference latency in seconds")
@app.post("/infer", response_model=Response)
@LATENCY.time()
async def infer(query: Query):
REQUESTS.inc()
# do work
Hook these metrics into alerts: p95 latency above 800 ms for 5 minutes is usually worth a page.
You also need logging of inputs and outputs (with anonymization) so you can debug failures and rebuild datasets for fine-tuning.
Scaling and reliability
Once the basic pipeline works, scaling it is largely an engineering problem.
Horizontal scaling
Containerize your API and model services, then use orchestration for autoscaling:
- Build Docker images for your API and model services.
- Use Kubernetes or similar for autoscaling.
- Put a load balancer (NGINX, Envoy) in front of your services.
A solid CI/CD pipeline helps here, so that model updates and code changes flow through automated tests before reaching production.
Caching
Useful caches in a real-time pipeline:
- Query result cache for repeated queries.
- Embedding cache for repeated texts.
- Feature cache for user or item metadata.
You can implement a simple in-memory LRU cache first, then move to Redis.
from functools import lru_cache
@lru_cache(maxsize=10_000)
def embed_text_cached(text: str):
return embed_model.encode([text])[0]
Be careful with memory and invalidation rules.
Graceful degradation
When dependencies are slow or failing, your pipeline should degrade gracefully instead of fully breaking.
Examples:
- If reranker service fails, fall back to raw vector search results.
- If LLM is overloaded, fall back to a distilled smaller model.
- If retrieval times out, answer with a generic, low-risk fallback.
Implement timeouts and fallbacks explicitly:
import asyncio
async def safe_retrieve(retriever, query: str, timeout: float = 0.2):
try:
return await asyncio.wait_for(retriever.retrieve(query), timeout=timeout)
except asyncio.TimeoutError:
# Log and return empty context list
return []
Putting it together: a minimal RAG inference flow
To make it concrete, here is a simplified architecture for a real-time RAG chat endpoint:
- FastAPI receives
/chatrequest. - Request is validated and basic features built.
- Retriever service does vector search + hybrid search.
- Top-k contexts are truncated and passed to LLM service.
- LLM streams answer back to client.
- Metrics and logs are recorded.
Sketch of the main endpoint:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat")
async def chat(query: Query):
features = await build_features(user_id="U123", query=query.text)
contexts = await safe_retrieve(retriever, features["query"])
async def token_stream():
async for token in llm_client.stream_generate(features["query"], contexts):
yield token
return StreamingResponse(token_stream(), media_type="text/plain")
Here you get:
- Low latency first token via streaming.
- Structured pipeline pieces you can optimize independently.
- Clear places to plug in monitoring and privacy-preserving steps.
Key Takeaways
- Define a concrete latency budget and design your pipeline to meet p95 and p99, not just average latency.
- Separate concerns: API gateway, retrieval / feature services, and model services should be loosely coupled and independently scalable.
- Use async processing, queues, and small batching windows to balance throughput and latency under load.
- Keep preprocessing lightweight at request time and push heavy feature engineering to offline or near real-time jobs.
- For RAG pipelines, optimize retrieval and ranking carefully since they dominate latency alongside the LLM.
- Instrument everything: latency, throughput, errors, and quality metrics are essential for operating real-time systems.
- Apply caching, horizontal scaling, and graceful degradation strategies to handle spikes and partial failures without breaking the user experience.
- Start with a minimal, observable pipeline, then iterate on models and infrastructure together as you scale.
Related Articles
Deploying ML Models with FastAPI and Docker
Learn how to containerize and deploy ML models using FastAPI and Docker, with patterns for scaling, performance, and production-ready setups.
8 min read · intermediateEngineeringEnd-to-End Multi-Agent Systems: Design Patterns from IEEE CAI 2026
Design patterns for production multi-agent systems from IEEE CAI 2026 covering planning, execution, fault tolerance, and scaling
11 min read · advancedEngineeringCI/CD Pipelines for Machine Learning Projects
Learn how to design practical CI/CD pipelines for ML projects, covering testing, data checks, model evaluation, deployment and MLOps tooling.
11 min read · intermediate