Hélain Zimmermann

Building Real-Time ML Inference Pipelines

Real-time ML is where models stop being research artifacts and start becoming products. Users don't care about your validation accuracy. They care about one thing: how fast and how reliably your system answers.

When we built low-latency RAG APIs at Ailog, the architecture work mattered as much as the model work. A well-designed retrieval-augmented generation service can feel instant, even when it hides vector search, ranking, and LLM calls under the hood. A poorly designed one feels sluggish, even with great models.

This post is a practical guide to building real-time ML inference pipelines that stay fast, robust, and maintainable.

What "real-time" really means

"Real-time" is often abused. For ML inference pipelines, I use three broad categories:

  • Soft real-time: user-facing APIs like RAG chatbots, semantic search, recommendations.
    • Latency: 100 ms to 2 seconds
  • Near real-time: streaming analytics, fraud scoring on events, alerting systems.
    • Latency: seconds to a few minutes
  • Hard real-time: embedded, robotics, safety-critical. Out of scope here.

Most web ML products live in the soft or near real-time world. You care about:

  • Tail latency: p95 / p99, not just average.
  • Throughput: how many requests per second (RPS) you can handle.
  • Freshness: how quickly new data is reflected.

Latency, throughput, and quality must be tracked together. A fast but wrong system is useless. A very accurate but slow system will not be used.

Core architecture patterns

At a high level, a real-time inference pipeline usually looks like this:

  1. Request comes in (HTTP / gRPC / WebSocket).
  2. Request is validated, enriched, and possibly queued.
  3. One or more model services run inference.
  4. Results are post-processed, ranked, and returned.
  5. Some data is logged for monitoring and offline training.

Sync vs async APIs

  • Synchronous API: client sends a request and waits for the answer.
    • Great for chatbots, semantic search, classic RAG.
  • Asynchronous API: client sends a job and polls or receives a callback.
    • Useful for heavier models, long-running agents, large batch jobs.

For user-facing LLM or RAG, you often want a synchronous response with streaming output, where intermediate reasoning is streamed to the user as it becomes available.

Microservice boundaries

Common decomposition:

  • Gateway / API service: authentication, rate limiting, request shaping.
  • Feature or retrieval service: vector search, hybrid search, feature joins.
  • Model service: runs LLM or smaller models.
  • Orchestrator: coordinates multi-step flows, like agentic RAG.

If you have a RAG system, you might separate:

  • Indexing / ingestion pipeline (offline or near real-time).
  • Query-time inference pipeline (online, real-time).

This post focuses on the query-time side.

Designing for latency budgets

You should start with a latency budget and work backwards.

Example: A RAG chat API with a 1 second p95 budget for first token.

Rough budget:

  • Network, auth, routing: 100 ms
  • Retrieval (vector DB + ranking): 150 ms
  • Orchestration, pre/post-processing: 100 ms
  • LLM initial token: 500-600 ms

Once you assign budgets, you can:

  • Choose model sizes (small reranker vs giant LLM).
  • Decide where to cache.
  • Decide when to precompute features.

Practical pipeline components

1. Fast API layer

I usually reach for FastAPI for Python-based inference services. It plays nicely with async patterns and type hints.

Simple skeleton:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import time

app = FastAPI()

class Query(BaseModel):
    text: str

class Response(BaseModel):
    answer: str
    latency_ms: int

@app.post("/infer", response_model=Response)
async def infer(query: Query):
    start = time.perf_counter()

    # TODO: call retrieval, model, post-process
    answer = f"Echo: {query.text}"

    latency_ms = int((time.perf_counter() - start) * 1000)
    if latency_ms > 1000:
        # Hook to send slow logs elsewhere
        pass

    return Response(answer=answer, latency_ms=latency_ms)

Use this layer for:

  • Validation & sanitization.
  • Lightweight routing (A/B, canary).
  • Rate limiting and auth (via middleware).

2. Queues and backpressure

For soft real-time, you want to absorb spikes without collapsing. A message queue or streaming system like Kafka, NATS, or Redis Stream is useful.

Pattern:

  • API service puts work items in a queue if backend is saturated.
  • Worker pool consumes and runs ML inference.
  • Optional: use a max queue size to fail fast instead of degrading indefinitely.

Using Redis and rq for a simple async pipeline:

# worker.py
from rq import Worker, Queue, Connection
import os

redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")

listen = ["default"]

if __name__ == "__main__":
    with Connection():
        worker = Worker(list(map(Queue, listen)))
        worker.work()

And enqueue from your FastAPI service:

# app.py
from rq import Queue
from redis import Redis
from fastapi import BackgroundTasks

redis_conn = Redis.from_url("redis://localhost:6379/0")
queue = Queue("default", connection=redis_conn)

async def heavy_inference_job(text: str) -> str:
    # Heavy model call
    return f"Processed: {text}"

@app.post("/async-infer")
async def async_infer(query: Query, background_tasks: BackgroundTasks):
    job = queue.enqueue(heavy_inference_job, query.text)
    return {"job_id": job.id}

This turns your pipeline into a near real-time system where clients can poll or receive callbacks.

3. Efficient model serving

The core of the pipeline is the model service. Key practices:

  • Model warmup: run a few dummy inferences on startup.
  • Batching: group multiple requests into a single forward pass if you are GPU-bound.
  • Concurrency control: limit simultaneous requests per model instance.

A minimal batching loop with asyncio:

import asyncio
from typing import List, Tuple

BATCH_SIZE = 8
BATCH_TIMEOUT = 0.005  # 5 ms

class Batcher:
    def __init__(self, model):
        self.model = model
        self.queue: List[Tuple[asyncio.Future, str]] = []
        self.lock = asyncio.Lock()
        self.task = asyncio.create_task(self._batch_loop())

    async def submit(self, text: str) -> str:
        fut: asyncio.Future = asyncio.get_event_loop().create_future()
        async with self.lock:
            self.queue.append((fut, text))
        return await fut

    async def _batch_loop(self):
        while True:
            await asyncio.sleep(BATCH_TIMEOUT)
            async with self.lock:
                if not self.queue:
                    continue
                batch = self.queue[:BATCH_SIZE]
                self.queue = self.queue[BATCH_SIZE:]

            futures, texts = zip(*batch)
            # Single batched model call
            outputs = self.model.predict(list(texts))

            for fut, out in zip(futures, outputs):
                if not fut.done():
                    fut.set_result(out)

Tie this into a FastAPI endpoint:

batcher = Batcher(model)

@app.post("/model-infer")
async def model_infer(query: Query):
    output = await batcher.submit(query.text)
    return {"output": output}

With a tight timeout, you keep single-request latency acceptable while still benefiting from batching under load.

4. Pre-processing and feature pipelines

Real-time pipelines often contain a surprising amount of non-ML work:

  • Tokenization and normalization.
  • Feature joins with user or product metadata.
  • Security and privacy filters.

Guidelines:

  • Keep per-request CPU-heavy preprocessing small.
  • Push heavy feature engineering to offline batches where possible.
  • For text, apply simple deterministic functions at request time.

Example: a small, fast sanitizer and feature builder for a RAG query.

import re
from typing import Dict

PRIVATE_PATTERNS = [
    re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),  # SSN-like
]

def sanitize_text(text: str) -> str:
    text = text.strip()
    for pat in PRIVATE_PATTERNS:
        text = pat.sub("[SENSITIVE]", text)
    return text

async def build_features(user_id: str, query: str) -> Dict:
    # In a real system, load user profile from cache or fast DB
    user_tier = "pro" if user_id.startswith("P") else "free"
    return {
        "user_id": user_id,
        "user_tier": user_tier,
        "query": sanitize_text(query),
    }

5. Retrieval and ranking in RAG pipelines

If your pipeline is RAG-based, a big chunk of your latency is in retrieval and ranking.

Components:

  • Vector search (dense retrieval over an embedding index).
  • Hybrid search (dense + sparse) for better recall.
  • Reranking model to pick the best contexts.

Practical tips:

  • Use smaller embedding models for queries if possible.
  • Keep top-k small for the final reranker (e.g. 10-50).
  • Profile your vector DB under realistic load before committing to a provider.

Minimal retrieval module example using a generic client:

class Retriever:
    def __init__(self, vector_client, embed_model):
        self.client = vector_client
        self.embed_model = embed_model

    async def retrieve(self, query: str, k: int = 10):
        q_emb = self.embed_model.encode([query])[0]
        # Assume async vector search client
        hits = await self.client.search(q_emb, top_k=k)
        return [hit["text"] for hit in hits]

This would be used inside your FastAPI endpoint, with careful time budgeting.

Observability and feedback loops

Real-time pipelines must be observable. Without that, you are blind when latency spikes or quality drops.

Essential metric categories:

  • Request metrics: latency histogram, throughput, error rates.
  • Model metrics: per-model latency, GPU utilization, queue depth.
  • Quality metrics: user feedback, click-through, human evals. Evaluating RAG system performance is a good starting point if your pipeline serves retrieval-augmented answers.

In code, you can add simple counters and histograms with Prometheus:

from prometheus_client import Counter, Histogram

REQUESTS = Counter("inference_requests_total", "Total inference requests")
LATENCY = Histogram("inference_latency_seconds", "Inference latency in seconds")

@app.post("/infer", response_model=Response)
@LATENCY.time()
async def infer(query: Query):
    REQUESTS.inc()
    # do work

Hook these metrics into alerts: p95 latency above 800 ms for 5 minutes is usually worth a page.

You also need logging of inputs and outputs (with anonymization) so you can debug failures and rebuild datasets for fine-tuning.

Scaling and reliability

Once the basic pipeline works, scaling it is largely an engineering problem.

Horizontal scaling

Containerize your API and model services, then use orchestration for autoscaling:

  • Build Docker images for your API and model services.
  • Use Kubernetes or similar for autoscaling.
  • Put a load balancer (NGINX, Envoy) in front of your services.

A solid CI/CD pipeline helps here, so that model updates and code changes flow through automated tests before reaching production.

Caching

Useful caches in a real-time pipeline:

  • Query result cache for repeated queries.
  • Embedding cache for repeated texts.
  • Feature cache for user or item metadata.

You can implement a simple in-memory LRU cache first, then move to Redis.

from functools import lru_cache

@lru_cache(maxsize=10_000)
def embed_text_cached(text: str):
    return embed_model.encode([text])[0]

Be careful with memory and invalidation rules.

Graceful degradation

When dependencies are slow or failing, your pipeline should degrade gracefully instead of fully breaking.

Examples:

  • If reranker service fails, fall back to raw vector search results.
  • If LLM is overloaded, fall back to a distilled smaller model.
  • If retrieval times out, answer with a generic, low-risk fallback.

Implement timeouts and fallbacks explicitly:

import asyncio

async def safe_retrieve(retriever, query: str, timeout: float = 0.2):
    try:
        return await asyncio.wait_for(retriever.retrieve(query), timeout=timeout)
    except asyncio.TimeoutError:
        # Log and return empty context list
        return []

Putting it together: a minimal RAG inference flow

To make it concrete, here is a simplified architecture for a real-time RAG chat endpoint:

  1. FastAPI receives /chat request.
  2. Request is validated and basic features built.
  3. Retriever service does vector search + hybrid search.
  4. Top-k contexts are truncated and passed to LLM service.
  5. LLM streams answer back to client.
  6. Metrics and logs are recorded.

Sketch of the main endpoint:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.post("/chat")
async def chat(query: Query):
    features = await build_features(user_id="U123", query=query.text)

    contexts = await safe_retrieve(retriever, features["query"])

    async def token_stream():
        async for token in llm_client.stream_generate(features["query"], contexts):
            yield token

    return StreamingResponse(token_stream(), media_type="text/plain")

Here you get:

  • Low latency first token via streaming.
  • Structured pipeline pieces you can optimize independently.
  • Clear places to plug in monitoring and privacy-preserving steps.

Key Takeaways

  • Define a concrete latency budget and design your pipeline to meet p95 and p99, not just average latency.
  • Separate concerns: API gateway, retrieval / feature services, and model services should be loosely coupled and independently scalable.
  • Use async processing, queues, and small batching windows to balance throughput and latency under load.
  • Keep preprocessing lightweight at request time and push heavy feature engineering to offline or near real-time jobs.
  • For RAG pipelines, optimize retrieval and ranking carefully since they dominate latency alongside the LLM.
  • Instrument everything: latency, throughput, errors, and quality metrics are essential for operating real-time systems.
  • Apply caching, horizontal scaling, and graceful degradation strategies to handle spikes and partial failures without breaking the user experience.
  • Start with a minimal, observable pipeline, then iterate on models and infrastructure together as you scale.

Related Articles

All Articles