Zero-Trust for AI Agents: Securing Non-Human Identities
At Ailog, we recently audited a mid-size fintech company's identity infrastructure. They had 340 human employees. They also had over 47,000 non-human identities: service accounts, API keys, bot tokens, CI/CD runners, and, increasingly, autonomous AI agents. The ratio was not 100-to-1; it was 138-to-1. And their IAM system was designed entirely around the assumption that identities belong to people.
This is the new reality. As multi-agent systems see explosive adoption, the most significant shift in enterprise attack surface is not phishing or ransomware. It is machine-to-machine interactions between autonomous agents that hold credentials, make decisions, and call external tools with no human in the loop. Traditional identity and access management was never built for this.
Why Traditional IAM Fails for AI Agents
Identity and Access Management (IAM) systems were designed around a set of assumptions that simply do not hold for AI agents.
Session duration. Human users log in, work for hours, and log out. AI agents operate continuously, often running for weeks or months without restarting. Long-lived sessions with static credentials are a gift to attackers. A compromised agent token that is valid for 90 days gives an attacker three months of unrestricted access.
Credential rotation. Organizations rotate human passwords every 60 or 90 days and enforce MFA. Agent credentials, typically API keys or service account tokens, are often set once and forgotten. In our audit of the fintech company, 68% of agent service account keys had not been rotated in over six months. Twelve of them had never been rotated at all.
Behavioral baselines. IAM systems can flag anomalous human behavior: a login from a new country, access at 3 AM, a sudden spike in resource requests. But what is "normal" for an AI agent? An agent that processes customer inquiries might legitimately make 10,000 API calls per hour, access multiple databases, and invoke external tools in rapid succession. Traditional anomaly detection treats this as a DDoS attack.
Role granularity. Human roles map reasonably well to job functions: "engineer," "analyst," "admin." Agent roles are far more granular and dynamic. A coding agent might need read access to a repository, write access to a specific branch, execute permissions on a CI runner, and temporary access to a secrets vault, all within a single task that lasts 15 minutes. Coarse-grained RBAC cannot model this.
Identity federation. In multi-agent architectures, agents call other agents. Agent A might delegate a subtask to Agent B, which calls Agent C to access an external API. Traditional IAM has no concept of transitive delegation chains where the original requester is three hops away.
SPIFFE for Non-Human Identities
The SPIFFE (Secure Production Identity Framework For Everyone) standard, originally designed for microservices, is the closest thing we have to a purpose-built identity layer for non-human workloads. It provides three critical primitives that map well to AI agent infrastructure.
SPIFFE IDs are URI-based identifiers (e.g., spiffe://ailog.dev/agent/trading-bot/v2) that uniquely name a workload without relying on network location or static secrets. Each agent gets a cryptographically verifiable identity that is independent of the host it runs on.
SVIDs (SPIFFE Verifiable Identity Documents) are short-lived X.509 certificates or JWT tokens issued to workloads. Unlike long-lived API keys, SVIDs expire quickly (typically 1 hour or less) and are automatically rotated.
The SPIRE server acts as an attestation authority, verifying that a workload is what it claims to be based on platform-level evidence (Kubernetes pod identity, AWS instance metadata, process attributes) before issuing an SVID.
Here is a practical implementation of agent identity verification using SPIFFE with the py-spiffe library:
from spiffe import SpiffeId, WorkloadApiClient, X509Source
from spiffe import TlsOptions, MutualTlsMode
import ssl
import httpx
class AgentIdentityManager:
"""Manages SPIFFE-based identity for an AI agent."""
def __init__(self, spiffe_socket: str = "unix:///tmp/spire-agent/public/api.sock"):
self.workload_client = WorkloadApiClient(spiffe_socket)
self.x509_source = X509Source(self.workload_client)
def get_agent_svid(self):
"""Fetch the current short-lived SVID for this agent."""
svid = self.x509_source.get_x509_svid()
return {
"spiffe_id": str(svid.spiffe_id),
"cert_chain": svid.cert_chain,
"private_key": svid.private_key,
"expiry": svid.expiry,
}
def create_mtls_context(self, authorized_ids: list[str]) -> ssl.SSLContext:
"""Create an mTLS context that only trusts specific SPIFFE IDs.
This ensures the agent will only communicate with
explicitly authorized peer agents or services.
"""
allowed = [SpiffeId.parse(sid) for sid in authorized_ids]
tls_options = TlsOptions(
mutual_tls_mode=MutualTlsMode.STRICT,
authorized_spiffe_ids=allowed,
)
return self.x509_source.get_tls_context(tls_options)
async def call_peer_agent(self, url: str, payload: dict, peer_id: str):
"""Make an authenticated request to a peer agent."""
ctx = self.create_mtls_context(authorized_ids=[peer_id])
async with httpx.AsyncClient(verify=ctx) as client:
response = await client.post(url, json=payload)
response.raise_for_status()
return response.json()
# Usage
identity = AgentIdentityManager()
svid_info = identity.get_agent_svid()
print(f"Agent ID: {svid_info['spiffe_id']}")
print(f"SVID expires: {svid_info['expiry']}")
The key insight: every agent-to-agent call uses mutual TLS with SPIFFE IDs. Both sides prove their identity. No shared secrets, no API keys stored in config files. Certificates rotate automatically, typically every hour, so a compromised credential is useful for minutes, not months.
Least-Privilege for Agent Tool Access
The principle of least privilege is well understood in security, but applying it to AI agents requires rethinking what "privilege" means. An agent's capabilities are defined not just by database access or file permissions, but by which tools it can invoke, which parameters it can pass, and under what conditions.
I recommend a declarative tool allowlist pattern. Instead of granting an agent broad access and hoping it behaves, you define exactly which tools it may call, with parameter constraints and rate limits:
from dataclasses import dataclass, field
from typing import Any, Callable
import time
import hashlib
import json
@dataclass
class ToolPermission:
tool_name: str
allowed_params: dict[str, list[Any]] | None = None # None means any value
max_calls_per_minute: int = 60
requires_human_approval: bool = False
audit_level: str = "standard" # "standard", "detailed", "full"
@dataclass
class AgentPolicy:
agent_id: str
permissions: list[ToolPermission] = field(default_factory=list)
_call_log: list[dict] = field(default_factory=list, repr=False)
def is_allowed(self, tool_name: str, params: dict) -> tuple[bool, str]:
"""Check if an agent is allowed to call a tool with given params."""
permission = next(
(p for p in self.permissions if p.tool_name == tool_name), None
)
if permission is None:
return False, f"Tool '{tool_name}' not in allowlist for {self.agent_id}"
# Check parameter constraints
if permission.allowed_params:
for key, allowed_values in permission.allowed_params.items():
if key in params and params[key] not in allowed_values:
return False, (
f"Parameter '{key}={params[key]}' "
f"not in allowed values {allowed_values}"
)
# Check rate limits
now = time.time()
recent = [
c for c in self._call_log
if c["tool"] == tool_name and now - c["time"] < 60
]
if len(recent) >= permission.max_calls_per_minute:
return False, f"Rate limit exceeded for '{tool_name}'"
return True, "allowed"
def log_call(self, tool_name: str, params: dict, result_hash: str):
"""Record a tool call for auditing and rate limiting."""
self._call_log.append({
"tool": tool_name,
"params_hash": hashlib.sha256(
json.dumps(params, sort_keys=True).encode()
).hexdigest(),
"result_hash": result_hash,
"time": time.time(),
"agent_id": self.agent_id,
})
def enforce_policy(policy: AgentPolicy):
"""Decorator that enforces tool access policy."""
def decorator(func: Callable):
async def wrapper(tool_name: str, params: dict, **kwargs):
allowed, reason = policy.is_allowed(tool_name, params)
if not allowed:
raise PermissionError(
f"Policy violation for agent '{policy.agent_id}': {reason}"
)
result = await func(tool_name, params, **kwargs)
result_hash = hashlib.sha256(
json.dumps(result, sort_keys=True, default=str).encode()
).hexdigest()
policy.log_call(tool_name, params, result_hash)
return result
return wrapper
return decorator
# Define a policy for a trading analysis agent
trading_agent_policy = AgentPolicy(
agent_id="spiffe://ailog.dev/agent/trading-analyzer",
permissions=[
ToolPermission(
tool_name="query_market_data",
allowed_params={"exchange": ["NYSE", "NASDAQ", "LSE"]},
max_calls_per_minute=120,
),
ToolPermission(
tool_name="run_backtest",
max_calls_per_minute=10,
audit_level="detailed",
),
ToolPermission(
tool_name="execute_trade",
max_calls_per_minute=5,
requires_human_approval=True,
audit_level="full",
),
# Note: no permission for "access_customer_data" or "modify_config"
# so those calls will be denied by default
],
)
This pattern is particularly important in agentic systems that access financial data, where an agent that should only read market prices must never be able to execute trades without explicit human approval.
Differential Privacy for Agent Memory
AI agents accumulate context over time. A customer support agent remembers past interactions. A coding agent retains project knowledge. A research agent builds a corpus of findings. This persistent memory is valuable, but it also concentrates sensitive information in a single, attackable location.
Applying differential privacy to agent memory adds a mathematical guarantee: even if an attacker gains access to the memory store, they cannot determine whether any specific piece of information was part of the original data.
import numpy as np
from dataclasses import dataclass
@dataclass
class DPMemoryConfig:
epsilon: float = 1.0 # Privacy budget; lower means more private
delta: float = 1e-5 # Probability of privacy breach
noise_mechanism: str = "gaussian" # "gaussian" or "laplace"
max_sensitivity: float = 1.0
class DPAgentMemory:
"""Agent memory store with differential privacy guarantees."""
def __init__(self, config: DPMemoryConfig):
self.config = config
self._raw_store: dict[str, np.ndarray] = {}
self._access_count: dict[str, int] = {}
self._privacy_budget_spent = 0.0
def _compute_noise_scale(self) -> float:
"""Compute noise scale based on privacy parameters."""
if self.config.noise_mechanism == "gaussian":
# Gaussian mechanism: sigma = sensitivity * sqrt(2 * ln(1.25/delta)) / epsilon
return (
self.config.max_sensitivity
* np.sqrt(2 * np.log(1.25 / self.config.delta))
/ self.config.epsilon
)
else:
# Laplace mechanism: scale = sensitivity / epsilon
return self.config.max_sensitivity / self.config.epsilon
def store_embedding(self, key: str, embedding: np.ndarray):
"""Store an embedding, clipping to bound sensitivity."""
norm = np.linalg.norm(embedding)
if norm > self.config.max_sensitivity:
embedding = embedding * (self.config.max_sensitivity / norm)
self._raw_store[key] = embedding
self._access_count[key] = 0
def retrieve_embedding(self, key: str) -> np.ndarray | None:
"""Retrieve an embedding with calibrated noise."""
if key not in self._raw_store:
return None
self._access_count[key] += 1
scale = self._compute_noise_scale()
# Add noise calibrated to the privacy parameters
embedding = self._raw_store[key].copy()
if self.config.noise_mechanism == "gaussian":
noise = np.random.normal(0, scale, size=embedding.shape)
else:
noise = np.random.laplace(0, scale, size=embedding.shape)
self._privacy_budget_spent += self.config.epsilon
return embedding + noise
def remaining_budget(self) -> float:
"""Track cumulative privacy spend under composition."""
return max(0, 10.0 - self._privacy_budget_spent) # Example total budget of 10.0
The tradeoff is real: noise reduces retrieval quality. In practice, I use an epsilon of 1.0 to 3.0 for agent memory, which adds enough noise to provide meaningful privacy without destroying the utility of the embeddings for similarity search.
Sandboxing Tool Calls
When an agent invokes an external tool (running code, calling an API, querying a database), that tool call should execute in an isolated sandbox. The principle is containment: if a tool call is malicious or produces unexpected side effects, the blast radius is limited.
import subprocess
import tempfile
import json
import os
from dataclasses import dataclass
@dataclass
class SandboxConfig:
timeout_seconds: int = 30
max_memory_mb: int = 512
network_access: bool = False
allowed_env_vars: list[str] | None = None
class ToolSandbox:
"""Execute agent tool calls in isolated sandboxes."""
def __init__(self, config: SandboxConfig):
self.config = config
def execute_code(self, code: str, language: str = "python") -> dict:
"""Run code in a sandboxed subprocess with resource limits."""
with tempfile.NamedTemporaryFile(
mode="w", suffix=f".{language}", delete=False
) as f:
f.write(code)
script_path = f.name
try:
env = {}
if self.config.allowed_env_vars:
env = {
k: os.environ[k]
for k in self.config.allowed_env_vars
if k in os.environ
}
# Build command with resource limits using ulimit
memory_kb = self.config.max_memory_mb * 1024
cmd = (
f"ulimit -v {memory_kb} && "
f"python3 {script_path}"
)
if not self.config.network_access:
# Use unshare to create a network namespace (Linux)
cmd = f"unshare --net {cmd}"
result = subprocess.run(
["sh", "-c", cmd],
capture_output=True,
text=True,
timeout=self.config.timeout_seconds,
env=env if env else None,
)
return {
"success": result.returncode == 0,
"stdout": result.stdout[:10_000], # Truncate large outputs
"stderr": result.stderr[:5_000],
"return_code": result.returncode,
}
except subprocess.TimeoutExpired:
return {
"success": False,
"stdout": "",
"stderr": "Execution timed out",
"return_code": -1,
}
finally:
os.unlink(script_path)
def validate_api_call(self, url: str, method: str, headers: dict) -> bool:
"""Validate an outbound API call against security policy."""
allowed_domains = [
"api.openai.com",
"api.anthropic.com",
"api.internal.ailog.dev",
]
from urllib.parse import urlparse
parsed = urlparse(url)
if parsed.hostname not in allowed_domains:
return False
# Block credential exfiltration via headers
sensitive_patterns = ["authorization", "x-api-key", "cookie"]
for header_name in headers:
if header_name.lower() in sensitive_patterns:
if "Bearer" in str(headers[header_name]):
# Only allow if token matches expected format
token = headers[header_name].replace("Bearer ", "")
if len(token) > 200 or not token.startswith("sk-"):
return False
return True
This is especially relevant as agents become capable of dynamic tool discovery. When an agent can find and invoke tools at runtime, the attack surface grows with every new tool it discovers. Sandboxing ensures that even a malicious tool cannot escape its execution boundary.
Monitoring Agent-to-Agent Communication
In multi-agent systems, individual agent security is necessary but not sufficient. You also need visibility into the communication fabric: which agents are talking to which, what data is flowing between them, and whether the interaction patterns match expected behavior.
import time
import json
import hashlib
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class AgentInteraction:
source_agent: str
target_agent: str
action: str
payload_hash: str
timestamp: float
latency_ms: float
status: str # "success", "denied", "error"
class AgentCommunicationMonitor:
"""Monitor and analyze agent-to-agent communication patterns."""
def __init__(self):
self.interactions: list[AgentInteraction] = []
self.expected_patterns: dict[str, set[str]] = {} # source -> {allowed targets}
self.baseline_rates: dict[str, float] = {} # agent_pair -> expected calls/min
def register_expected_pattern(self, source: str, targets: list[str], rate: float):
"""Define expected communication patterns for anomaly detection."""
self.expected_patterns[source] = set(targets)
for target in targets:
key = f"{source}->{target}"
self.baseline_rates[key] = rate
def record_interaction(self, interaction: AgentInteraction):
"""Record and analyze an agent interaction."""
self.interactions.append(interaction)
# Check 1: Is this an expected communication path?
if interaction.source_agent in self.expected_patterns:
allowed = self.expected_patterns[interaction.source_agent]
if interaction.target_agent not in allowed:
self._alert(
"UNEXPECTED_PATH",
f"{interaction.source_agent} communicated with "
f"unauthorized target {interaction.target_agent}",
severity="high",
)
# Check 2: Rate anomaly detection
key = f"{interaction.source_agent}->{interaction.target_agent}"
if key in self.baseline_rates:
recent_count = sum(
1 for i in self.interactions
if (
f"{i.source_agent}->{i.target_agent}" == key
and time.time() - i.timestamp < 60
)
)
expected = self.baseline_rates[key]
if recent_count > expected * 3: # 3x threshold
self._alert(
"RATE_ANOMALY",
f"{key}: {recent_count} calls/min vs {expected} expected",
severity="medium",
)
# Check 3: Repeated failures may indicate probing
recent_failures = [
i for i in self.interactions[-100:]
if i.source_agent == interaction.source_agent and i.status == "denied"
]
if len(recent_failures) > 10:
self._alert(
"REPEATED_DENIALS",
f"{interaction.source_agent} has {len(recent_failures)} "
f"recent denied requests (possible probing)",
severity="high",
)
def _alert(self, alert_type: str, message: str, severity: str):
"""Emit a security alert."""
alert = {
"type": alert_type,
"message": message,
"severity": severity,
"timestamp": time.time(),
}
# In production: send to SIEM, PagerDuty, or security dashboard
print(f"[SECURITY ALERT] [{severity.upper()}] {alert_type}: {message}")
def get_communication_graph(self) -> dict:
"""Generate a graph of agent communication for visualization."""
graph = defaultdict(lambda: {"count": 0, "errors": 0, "avg_latency": 0.0})
for interaction in self.interactions:
key = f"{interaction.source_agent}->{interaction.target_agent}"
entry = graph[key]
entry["count"] += 1
if interaction.status == "error":
entry["errors"] += 1
entry["avg_latency"] = (
(entry["avg_latency"] * (entry["count"] - 1) + interaction.latency_ms)
/ entry["count"]
)
return dict(graph)
The communication graph is one of the most valuable security artifacts you can produce. When you visualize which agents talk to which, unexpected edges in the graph are immediately suspicious. An analytics agent that suddenly starts calling the trade execution agent? That warrants investigation.
Putting It Together: A Zero-Trust Agent Architecture
The components above are not isolated techniques. They form layers of a defense-in-depth architecture:
-
Identity layer (SPIFFE): Every agent has a cryptographically verifiable, short-lived identity. No static API keys. No shared secrets.
-
Policy layer (tool allowlists): Each agent can only invoke explicitly permitted tools with constrained parameters. Denied by default.
-
Privacy layer (differential privacy): Agent memory stores are protected so that data exposure from a breach is mathematically bounded.
-
Isolation layer (sandboxes): Tool executions run in resource-limited, network-restricted environments. A compromised tool cannot pivot to the host.
-
Observability layer (communication monitoring): All agent interactions are logged, graphed, and checked against behavioral baselines. Anomalies trigger alerts.
The overarching principle is the same as zero-trust networking: never trust, always verify. Every agent call, every tool invocation, every memory access is authenticated, authorized, and audited. The identity of the caller is verified at every hop, not just at the perimeter.
This matters more as agent autonomy increases. A system where agents can see, hear, and act across modalities has a correspondingly larger attack surface. An agent that can read images, process audio, and invoke web APIs needs proportionally stronger guardrails than one that only processes text.
Practical Deployment Recommendations
Start with identity. If you do nothing else, replace static API keys with short-lived, automatically rotated credentials. SPIFFE/SPIRE is production-ready and integrates with Kubernetes, AWS, GCP, and Azure. The migration is incremental: you can run SPIRE alongside existing IAM and convert agents one at a time.
Inventory your non-human identities. Most organizations do not know how many service accounts, bot tokens, and agent credentials exist across their infrastructure. You cannot secure what you cannot see. Run an audit. The number will surprise you.
Define tool policies before deployment. Writing an allowlist after an agent is in production is painful because you are reverse-engineering what it "needs" from observed behavior. Define the policy first, deploy the agent with it, and loosen constraints only when you have evidence that a tool call is both necessary and safe.
Separate agent memory by sensitivity. Not all agent context needs the same protection level. A support agent's memory of general product FAQs does not need differential privacy. Its memory of customer PII does. Partition memory stores by data classification and apply protection accordingly.
Treat agent-to-agent calls like API calls. Mutual TLS, request signing, rate limiting, circuit breakers. Every pattern you use for microservice communication applies to agent communication, with the additional requirement of identity attestation at every hop.
Key Takeaways
- Non-human identities (service accounts, agent credentials, bot tokens) outnumber human users by 100-to-1 or more in typical enterprise environments, and traditional IAM was not designed for them.
- SPIFFE provides cryptographically verifiable, short-lived identities for agents, eliminating the risk of long-lived static API keys.
- Declarative tool allowlists enforce least-privilege at the tool call level, not just the network or database level, and should be defined before deployment.
- Differential privacy applied to agent memory stores provides mathematical guarantees against data exposure, even if the store is fully compromised.
- Sandboxing tool executions with resource limits, network isolation, and output truncation contains the blast radius of malicious or malfunctioning tools.
- Agent-to-agent communication monitoring, including graph analysis and behavioral baselining, is essential for detecting lateral movement and unauthorized access patterns in multi-agent systems.
- Zero-trust for agents is not a single technology; it is a layered architecture combining identity, policy, privacy, isolation, and observability.
- Start with the identity layer. Replacing static credentials with auto-rotating, cryptographically verified identities has the highest security impact for the lowest integration effort.
Related Articles
AI and Security: How Agents Like OpenClaw Can Be Exploited
Exposed instances, stolen API keys, and malicious extensions: how autonomous AI agents create new attack vectors and what you can do about it.
8 min read · beginnerAI SecurityEU AI Act 2026: What Developers Need to Comply with Now
A practical compliance checklist for AI developers as EU AI Act enforcement moves from draft to reality in 2026
9 min read · intermediateAI SecurityAI Agents in Finance: MNPI Risks and Cross-Deal Contamination
How RAG systems in finance can leak MNPI across deals, and practical mitigation strategies for quants building compliant AI pipelines
8 min read · intermediate