Hélain Zimmermann

Zero-Trust for AI Agents: Securing Non-Human Identities

At Ailog, we recently audited a mid-size fintech company's identity infrastructure. They had 340 human employees. They also had over 47,000 non-human identities: service accounts, API keys, bot tokens, CI/CD runners, and, increasingly, autonomous AI agents. The ratio was not 100-to-1; it was 138-to-1. And their IAM system was designed entirely around the assumption that identities belong to people.

This is the new reality. As multi-agent systems see explosive adoption, the most significant shift in enterprise attack surface is not phishing or ransomware. It is machine-to-machine interactions between autonomous agents that hold credentials, make decisions, and call external tools with no human in the loop. Traditional identity and access management was never built for this.

Why Traditional IAM Fails for AI Agents

Identity and Access Management (IAM) systems were designed around a set of assumptions that simply do not hold for AI agents.

Session duration. Human users log in, work for hours, and log out. AI agents operate continuously, often running for weeks or months without restarting. Long-lived sessions with static credentials are a gift to attackers. A compromised agent token that is valid for 90 days gives an attacker three months of unrestricted access.

Credential rotation. Organizations rotate human passwords every 60 or 90 days and enforce MFA. Agent credentials, typically API keys or service account tokens, are often set once and forgotten. In our audit of the fintech company, 68% of agent service account keys had not been rotated in over six months. Twelve of them had never been rotated at all.

Behavioral baselines. IAM systems can flag anomalous human behavior: a login from a new country, access at 3 AM, a sudden spike in resource requests. But what is "normal" for an AI agent? An agent that processes customer inquiries might legitimately make 10,000 API calls per hour, access multiple databases, and invoke external tools in rapid succession. Traditional anomaly detection treats this as a DDoS attack.

Role granularity. Human roles map reasonably well to job functions: "engineer," "analyst," "admin." Agent roles are far more granular and dynamic. A coding agent might need read access to a repository, write access to a specific branch, execute permissions on a CI runner, and temporary access to a secrets vault, all within a single task that lasts 15 minutes. Coarse-grained RBAC cannot model this.

Identity federation. In multi-agent architectures, agents call other agents. Agent A might delegate a subtask to Agent B, which calls Agent C to access an external API. Traditional IAM has no concept of transitive delegation chains where the original requester is three hops away.

SPIFFE for Non-Human Identities

The SPIFFE (Secure Production Identity Framework For Everyone) standard, originally designed for microservices, is the closest thing we have to a purpose-built identity layer for non-human workloads. It provides three critical primitives that map well to AI agent infrastructure.

SPIFFE IDs are URI-based identifiers (e.g., spiffe://ailog.dev/agent/trading-bot/v2) that uniquely name a workload without relying on network location or static secrets. Each agent gets a cryptographically verifiable identity that is independent of the host it runs on.

SVIDs (SPIFFE Verifiable Identity Documents) are short-lived X.509 certificates or JWT tokens issued to workloads. Unlike long-lived API keys, SVIDs expire quickly (typically 1 hour or less) and are automatically rotated.

The SPIRE server acts as an attestation authority, verifying that a workload is what it claims to be based on platform-level evidence (Kubernetes pod identity, AWS instance metadata, process attributes) before issuing an SVID.

Here is a practical implementation of agent identity verification using SPIFFE with the py-spiffe library:

from spiffe import SpiffeId, WorkloadApiClient, X509Source
from spiffe import TlsOptions, MutualTlsMode
import ssl
import httpx


class AgentIdentityManager:
    """Manages SPIFFE-based identity for an AI agent."""

    def __init__(self, spiffe_socket: str = "unix:///tmp/spire-agent/public/api.sock"):
        self.workload_client = WorkloadApiClient(spiffe_socket)
        self.x509_source = X509Source(self.workload_client)

    def get_agent_svid(self):
        """Fetch the current short-lived SVID for this agent."""
        svid = self.x509_source.get_x509_svid()
        return {
            "spiffe_id": str(svid.spiffe_id),
            "cert_chain": svid.cert_chain,
            "private_key": svid.private_key,
            "expiry": svid.expiry,
        }

    def create_mtls_context(self, authorized_ids: list[str]) -> ssl.SSLContext:
        """Create an mTLS context that only trusts specific SPIFFE IDs.

        This ensures the agent will only communicate with
        explicitly authorized peer agents or services.
        """
        allowed = [SpiffeId.parse(sid) for sid in authorized_ids]
        tls_options = TlsOptions(
            mutual_tls_mode=MutualTlsMode.STRICT,
            authorized_spiffe_ids=allowed,
        )
        return self.x509_source.get_tls_context(tls_options)

    async def call_peer_agent(self, url: str, payload: dict, peer_id: str):
        """Make an authenticated request to a peer agent."""
        ctx = self.create_mtls_context(authorized_ids=[peer_id])
        async with httpx.AsyncClient(verify=ctx) as client:
            response = await client.post(url, json=payload)
            response.raise_for_status()
            return response.json()


# Usage
identity = AgentIdentityManager()
svid_info = identity.get_agent_svid()
print(f"Agent ID: {svid_info['spiffe_id']}")
print(f"SVID expires: {svid_info['expiry']}")

The key insight: every agent-to-agent call uses mutual TLS with SPIFFE IDs. Both sides prove their identity. No shared secrets, no API keys stored in config files. Certificates rotate automatically, typically every hour, so a compromised credential is useful for minutes, not months.

Least-Privilege for Agent Tool Access

The principle of least privilege is well understood in security, but applying it to AI agents requires rethinking what "privilege" means. An agent's capabilities are defined not just by database access or file permissions, but by which tools it can invoke, which parameters it can pass, and under what conditions.

I recommend a declarative tool allowlist pattern. Instead of granting an agent broad access and hoping it behaves, you define exactly which tools it may call, with parameter constraints and rate limits:

from dataclasses import dataclass, field
from typing import Any, Callable
import time
import hashlib
import json


@dataclass
class ToolPermission:
    tool_name: str
    allowed_params: dict[str, list[Any]] | None = None  # None means any value
    max_calls_per_minute: int = 60
    requires_human_approval: bool = False
    audit_level: str = "standard"  # "standard", "detailed", "full"


@dataclass
class AgentPolicy:
    agent_id: str
    permissions: list[ToolPermission] = field(default_factory=list)
    _call_log: list[dict] = field(default_factory=list, repr=False)

    def is_allowed(self, tool_name: str, params: dict) -> tuple[bool, str]:
        """Check if an agent is allowed to call a tool with given params."""
        permission = next(
            (p for p in self.permissions if p.tool_name == tool_name), None
        )

        if permission is None:
            return False, f"Tool '{tool_name}' not in allowlist for {self.agent_id}"

        # Check parameter constraints
        if permission.allowed_params:
            for key, allowed_values in permission.allowed_params.items():
                if key in params and params[key] not in allowed_values:
                    return False, (
                        f"Parameter '{key}={params[key]}' "
                        f"not in allowed values {allowed_values}"
                    )

        # Check rate limits
        now = time.time()
        recent = [
            c for c in self._call_log
            if c["tool"] == tool_name and now - c["time"] < 60
        ]
        if len(recent) >= permission.max_calls_per_minute:
            return False, f"Rate limit exceeded for '{tool_name}'"

        return True, "allowed"

    def log_call(self, tool_name: str, params: dict, result_hash: str):
        """Record a tool call for auditing and rate limiting."""
        self._call_log.append({
            "tool": tool_name,
            "params_hash": hashlib.sha256(
                json.dumps(params, sort_keys=True).encode()
            ).hexdigest(),
            "result_hash": result_hash,
            "time": time.time(),
            "agent_id": self.agent_id,
        })


def enforce_policy(policy: AgentPolicy):
    """Decorator that enforces tool access policy."""
    def decorator(func: Callable):
        async def wrapper(tool_name: str, params: dict, **kwargs):
            allowed, reason = policy.is_allowed(tool_name, params)
            if not allowed:
                raise PermissionError(
                    f"Policy violation for agent '{policy.agent_id}': {reason}"
                )

            result = await func(tool_name, params, **kwargs)

            result_hash = hashlib.sha256(
                json.dumps(result, sort_keys=True, default=str).encode()
            ).hexdigest()
            policy.log_call(tool_name, params, result_hash)
            return result
        return wrapper
    return decorator


# Define a policy for a trading analysis agent
trading_agent_policy = AgentPolicy(
    agent_id="spiffe://ailog.dev/agent/trading-analyzer",
    permissions=[
        ToolPermission(
            tool_name="query_market_data",
            allowed_params={"exchange": ["NYSE", "NASDAQ", "LSE"]},
            max_calls_per_minute=120,
        ),
        ToolPermission(
            tool_name="run_backtest",
            max_calls_per_minute=10,
            audit_level="detailed",
        ),
        ToolPermission(
            tool_name="execute_trade",
            max_calls_per_minute=5,
            requires_human_approval=True,
            audit_level="full",
        ),
        # Note: no permission for "access_customer_data" or "modify_config"
        # so those calls will be denied by default
    ],
)

This pattern is particularly important in agentic systems that access financial data, where an agent that should only read market prices must never be able to execute trades without explicit human approval.

Differential Privacy for Agent Memory

AI agents accumulate context over time. A customer support agent remembers past interactions. A coding agent retains project knowledge. A research agent builds a corpus of findings. This persistent memory is valuable, but it also concentrates sensitive information in a single, attackable location.

Applying differential privacy to agent memory adds a mathematical guarantee: even if an attacker gains access to the memory store, they cannot determine whether any specific piece of information was part of the original data.

import numpy as np
from dataclasses import dataclass


@dataclass
class DPMemoryConfig:
    epsilon: float = 1.0       # Privacy budget; lower means more private
    delta: float = 1e-5        # Probability of privacy breach
    noise_mechanism: str = "gaussian"  # "gaussian" or "laplace"
    max_sensitivity: float = 1.0


class DPAgentMemory:
    """Agent memory store with differential privacy guarantees."""

    def __init__(self, config: DPMemoryConfig):
        self.config = config
        self._raw_store: dict[str, np.ndarray] = {}
        self._access_count: dict[str, int] = {}
        self._privacy_budget_spent = 0.0

    def _compute_noise_scale(self) -> float:
        """Compute noise scale based on privacy parameters."""
        if self.config.noise_mechanism == "gaussian":
            # Gaussian mechanism: sigma = sensitivity * sqrt(2 * ln(1.25/delta)) / epsilon
            return (
                self.config.max_sensitivity
                * np.sqrt(2 * np.log(1.25 / self.config.delta))
                / self.config.epsilon
            )
        else:
            # Laplace mechanism: scale = sensitivity / epsilon
            return self.config.max_sensitivity / self.config.epsilon

    def store_embedding(self, key: str, embedding: np.ndarray):
        """Store an embedding, clipping to bound sensitivity."""
        norm = np.linalg.norm(embedding)
        if norm > self.config.max_sensitivity:
            embedding = embedding * (self.config.max_sensitivity / norm)
        self._raw_store[key] = embedding
        self._access_count[key] = 0

    def retrieve_embedding(self, key: str) -> np.ndarray | None:
        """Retrieve an embedding with calibrated noise."""
        if key not in self._raw_store:
            return None

        self._access_count[key] += 1
        scale = self._compute_noise_scale()

        # Add noise calibrated to the privacy parameters
        embedding = self._raw_store[key].copy()
        if self.config.noise_mechanism == "gaussian":
            noise = np.random.normal(0, scale, size=embedding.shape)
        else:
            noise = np.random.laplace(0, scale, size=embedding.shape)

        self._privacy_budget_spent += self.config.epsilon
        return embedding + noise

    def remaining_budget(self) -> float:
        """Track cumulative privacy spend under composition."""
        return max(0, 10.0 - self._privacy_budget_spent)  # Example total budget of 10.0

The tradeoff is real: noise reduces retrieval quality. In practice, I use an epsilon of 1.0 to 3.0 for agent memory, which adds enough noise to provide meaningful privacy without destroying the utility of the embeddings for similarity search.

Sandboxing Tool Calls

When an agent invokes an external tool (running code, calling an API, querying a database), that tool call should execute in an isolated sandbox. The principle is containment: if a tool call is malicious or produces unexpected side effects, the blast radius is limited.

import subprocess
import tempfile
import json
import os
from dataclasses import dataclass


@dataclass
class SandboxConfig:
    timeout_seconds: int = 30
    max_memory_mb: int = 512
    network_access: bool = False
    allowed_env_vars: list[str] | None = None


class ToolSandbox:
    """Execute agent tool calls in isolated sandboxes."""

    def __init__(self, config: SandboxConfig):
        self.config = config

    def execute_code(self, code: str, language: str = "python") -> dict:
        """Run code in a sandboxed subprocess with resource limits."""
        with tempfile.NamedTemporaryFile(
            mode="w", suffix=f".{language}", delete=False
        ) as f:
            f.write(code)
            script_path = f.name

        try:
            env = {}
            if self.config.allowed_env_vars:
                env = {
                    k: os.environ[k]
                    for k in self.config.allowed_env_vars
                    if k in os.environ
                }

            # Build command with resource limits using ulimit
            memory_kb = self.config.max_memory_mb * 1024
            cmd = (
                f"ulimit -v {memory_kb} && "
                f"python3 {script_path}"
            )

            if not self.config.network_access:
                # Use unshare to create a network namespace (Linux)
                cmd = f"unshare --net {cmd}"

            result = subprocess.run(
                ["sh", "-c", cmd],
                capture_output=True,
                text=True,
                timeout=self.config.timeout_seconds,
                env=env if env else None,
            )

            return {
                "success": result.returncode == 0,
                "stdout": result.stdout[:10_000],  # Truncate large outputs
                "stderr": result.stderr[:5_000],
                "return_code": result.returncode,
            }
        except subprocess.TimeoutExpired:
            return {
                "success": False,
                "stdout": "",
                "stderr": "Execution timed out",
                "return_code": -1,
            }
        finally:
            os.unlink(script_path)

    def validate_api_call(self, url: str, method: str, headers: dict) -> bool:
        """Validate an outbound API call against security policy."""
        allowed_domains = [
            "api.openai.com",
            "api.anthropic.com",
            "api.internal.ailog.dev",
        ]

        from urllib.parse import urlparse
        parsed = urlparse(url)

        if parsed.hostname not in allowed_domains:
            return False

        # Block credential exfiltration via headers
        sensitive_patterns = ["authorization", "x-api-key", "cookie"]
        for header_name in headers:
            if header_name.lower() in sensitive_patterns:
                if "Bearer" in str(headers[header_name]):
                    # Only allow if token matches expected format
                    token = headers[header_name].replace("Bearer ", "")
                    if len(token) > 200 or not token.startswith("sk-"):
                        return False

        return True

This is especially relevant as agents become capable of dynamic tool discovery. When an agent can find and invoke tools at runtime, the attack surface grows with every new tool it discovers. Sandboxing ensures that even a malicious tool cannot escape its execution boundary.

Monitoring Agent-to-Agent Communication

In multi-agent systems, individual agent security is necessary but not sufficient. You also need visibility into the communication fabric: which agents are talking to which, what data is flowing between them, and whether the interaction patterns match expected behavior.

import time
import json
import hashlib
from dataclasses import dataclass, field
from collections import defaultdict


@dataclass
class AgentInteraction:
    source_agent: str
    target_agent: str
    action: str
    payload_hash: str
    timestamp: float
    latency_ms: float
    status: str  # "success", "denied", "error"


class AgentCommunicationMonitor:
    """Monitor and analyze agent-to-agent communication patterns."""

    def __init__(self):
        self.interactions: list[AgentInteraction] = []
        self.expected_patterns: dict[str, set[str]] = {}  # source -> {allowed targets}
        self.baseline_rates: dict[str, float] = {}  # agent_pair -> expected calls/min

    def register_expected_pattern(self, source: str, targets: list[str], rate: float):
        """Define expected communication patterns for anomaly detection."""
        self.expected_patterns[source] = set(targets)
        for target in targets:
            key = f"{source}->{target}"
            self.baseline_rates[key] = rate

    def record_interaction(self, interaction: AgentInteraction):
        """Record and analyze an agent interaction."""
        self.interactions.append(interaction)

        # Check 1: Is this an expected communication path?
        if interaction.source_agent in self.expected_patterns:
            allowed = self.expected_patterns[interaction.source_agent]
            if interaction.target_agent not in allowed:
                self._alert(
                    "UNEXPECTED_PATH",
                    f"{interaction.source_agent} communicated with "
                    f"unauthorized target {interaction.target_agent}",
                    severity="high",
                )

        # Check 2: Rate anomaly detection
        key = f"{interaction.source_agent}->{interaction.target_agent}"
        if key in self.baseline_rates:
            recent_count = sum(
                1 for i in self.interactions
                if (
                    f"{i.source_agent}->{i.target_agent}" == key
                    and time.time() - i.timestamp < 60
                )
            )
            expected = self.baseline_rates[key]
            if recent_count > expected * 3:  # 3x threshold
                self._alert(
                    "RATE_ANOMALY",
                    f"{key}: {recent_count} calls/min vs {expected} expected",
                    severity="medium",
                )

        # Check 3: Repeated failures may indicate probing
        recent_failures = [
            i for i in self.interactions[-100:]
            if i.source_agent == interaction.source_agent and i.status == "denied"
        ]
        if len(recent_failures) > 10:
            self._alert(
                "REPEATED_DENIALS",
                f"{interaction.source_agent} has {len(recent_failures)} "
                f"recent denied requests (possible probing)",
                severity="high",
            )

    def _alert(self, alert_type: str, message: str, severity: str):
        """Emit a security alert."""
        alert = {
            "type": alert_type,
            "message": message,
            "severity": severity,
            "timestamp": time.time(),
        }
        # In production: send to SIEM, PagerDuty, or security dashboard
        print(f"[SECURITY ALERT] [{severity.upper()}] {alert_type}: {message}")

    def get_communication_graph(self) -> dict:
        """Generate a graph of agent communication for visualization."""
        graph = defaultdict(lambda: {"count": 0, "errors": 0, "avg_latency": 0.0})
        for interaction in self.interactions:
            key = f"{interaction.source_agent}->{interaction.target_agent}"
            entry = graph[key]
            entry["count"] += 1
            if interaction.status == "error":
                entry["errors"] += 1
            entry["avg_latency"] = (
                (entry["avg_latency"] * (entry["count"] - 1) + interaction.latency_ms)
                / entry["count"]
            )
        return dict(graph)

The communication graph is one of the most valuable security artifacts you can produce. When you visualize which agents talk to which, unexpected edges in the graph are immediately suspicious. An analytics agent that suddenly starts calling the trade execution agent? That warrants investigation.

Putting It Together: A Zero-Trust Agent Architecture

The components above are not isolated techniques. They form layers of a defense-in-depth architecture:

  1. Identity layer (SPIFFE): Every agent has a cryptographically verifiable, short-lived identity. No static API keys. No shared secrets.

  2. Policy layer (tool allowlists): Each agent can only invoke explicitly permitted tools with constrained parameters. Denied by default.

  3. Privacy layer (differential privacy): Agent memory stores are protected so that data exposure from a breach is mathematically bounded.

  4. Isolation layer (sandboxes): Tool executions run in resource-limited, network-restricted environments. A compromised tool cannot pivot to the host.

  5. Observability layer (communication monitoring): All agent interactions are logged, graphed, and checked against behavioral baselines. Anomalies trigger alerts.

The overarching principle is the same as zero-trust networking: never trust, always verify. Every agent call, every tool invocation, every memory access is authenticated, authorized, and audited. The identity of the caller is verified at every hop, not just at the perimeter.

This matters more as agent autonomy increases. A system where agents can see, hear, and act across modalities has a correspondingly larger attack surface. An agent that can read images, process audio, and invoke web APIs needs proportionally stronger guardrails than one that only processes text.

Practical Deployment Recommendations

Start with identity. If you do nothing else, replace static API keys with short-lived, automatically rotated credentials. SPIFFE/SPIRE is production-ready and integrates with Kubernetes, AWS, GCP, and Azure. The migration is incremental: you can run SPIRE alongside existing IAM and convert agents one at a time.

Inventory your non-human identities. Most organizations do not know how many service accounts, bot tokens, and agent credentials exist across their infrastructure. You cannot secure what you cannot see. Run an audit. The number will surprise you.

Define tool policies before deployment. Writing an allowlist after an agent is in production is painful because you are reverse-engineering what it "needs" from observed behavior. Define the policy first, deploy the agent with it, and loosen constraints only when you have evidence that a tool call is both necessary and safe.

Separate agent memory by sensitivity. Not all agent context needs the same protection level. A support agent's memory of general product FAQs does not need differential privacy. Its memory of customer PII does. Partition memory stores by data classification and apply protection accordingly.

Treat agent-to-agent calls like API calls. Mutual TLS, request signing, rate limiting, circuit breakers. Every pattern you use for microservice communication applies to agent communication, with the additional requirement of identity attestation at every hop.

Key Takeaways

  • Non-human identities (service accounts, agent credentials, bot tokens) outnumber human users by 100-to-1 or more in typical enterprise environments, and traditional IAM was not designed for them.
  • SPIFFE provides cryptographically verifiable, short-lived identities for agents, eliminating the risk of long-lived static API keys.
  • Declarative tool allowlists enforce least-privilege at the tool call level, not just the network or database level, and should be defined before deployment.
  • Differential privacy applied to agent memory stores provides mathematical guarantees against data exposure, even if the store is fully compromised.
  • Sandboxing tool executions with resource limits, network isolation, and output truncation contains the blast radius of malicious or malfunctioning tools.
  • Agent-to-agent communication monitoring, including graph analysis and behavioral baselining, is essential for detecting lateral movement and unauthorized access patterns in multi-agent systems.
  • Zero-trust for agents is not a single technology; it is a layered architecture combining identity, policy, privacy, isolation, and observability.
  • Start with the identity layer. Replacing static credentials with auto-rotating, cryptographically verified identities has the highest security impact for the lowest integration effort.

Related Articles

All Articles