Building an Agent Framework: Production Patterns
March 31, 2026 · 11 min read
Part 1 covered the architecture, Elasticsearch-backed quad-core memory, hybrid search, and streaming SSE. Part 2 covered MaaS — FastMCP proxy aggregation, role-based tool filtering, and helper tools. This post closes the series with the patterns that make the framework production-ready: privacy redaction, context budget management, structured logging, distributed tracing, and usage metrics.
None of these features make the agent smarter. They make it safe, observable, and cost-efficient — which matters more once the system handles real user data at scale.
Production Patterns at a Glance
The same stack from earlier posts — PydanticAI on the agent side, MaaS for tools, Elasticsearch for memory and metrics — gains a cross-cutting layer:
- History processors — Run before every model call: redact PII, then trim or summarize so context stays within budget.
- Structured logging — JSON to stdout (or console for local dev) with request-scoped context, separate from model-facing redaction.
- Tracing — OpenTelemetry instruments FastAPI when an OTLP endpoint is configured; otherwise tracing is a no-op.
- Usage metrics — Each execution writes a document to an Elasticsearch data stream for cost and performance analysis.
- Graceful degradation — Memory, hybrid search, summarization, OTLP export, and metrics indexing fail soft so the agent still answers.
The history pipeline looks like this:
flowchart TD
H["Conversation history"]
PR["privacy_history_processor\n(PII → redacted)"]
CB["context_budget_processor\n(trim / summarize)"]
M["Model call"]
H --> PR --> CB --> M
Privacy-Preserving History Processors
PydanticAI supports history processors — async functions that transform the conversation history before every model call. The framework chains two processors: one for privacy, one for context budget.
Redacting PII
Every message in the conversation history passes through privacy_history_processor before the model sees it. The processor scans text content for patterns that look like PII and replaces them with [REDACTED]:
_PII_PATTERNS = [
re.compile(r"\b[\w\.-]+@[\w\.-]+\.[A-Za-z]{2,}\b"), # Email addresses
re.compile(r"\b\d{3}-\d{2}-\d{4}\b"), # SSN format
re.compile(r"\b(?:\d[ -]*?){13,19}\b"), # Credit card numbers
]
def _redact_text(text: str) -> str:
if not text:
return text
redacted = text
for pattern in _PII_PATTERNS:
redacted = pattern.sub("[REDACTED]", redacted)
return redacted
The processor itself walks every part of every message — user prompts, system prompts, text parts, and tool return values:
async def privacy_history_processor(
_ctx: RunContext[AgentDeps],
history: list[ModelRequest | ModelResponse],
) -> list[ModelRequest | ModelResponse]:
"""Redact sensitive data in history before it reaches the model."""
redacted_history: list[ModelRequest | ModelResponse] = []
for message in history:
new_parts: list[Any] = []
for part in message.parts:
if isinstance(part, (TextPart, UserPromptPart, SystemPromptPart)):
new_parts.append(
_update_part_content(part, _redact_text(str(part.content)))
)
elif isinstance(part, ToolReturnPart) and isinstance(part.content, str):
new_parts.append(
_update_part_content(part, _redact_text(part.content))
)
else:
new_parts.append(part)
redacted_history.append(_copy_message_with_parts(message, new_parts))
return redacted_history
A few design points worth calling out:
Immutable copies. The processor never mutates the original message objects. _update_part_content uses Pydantic’s model_copy (or dataclasses.replace as a fallback) to create new part instances. _copy_message_with_parts does the same for the message envelope. This means the original history is preserved for logging and debugging — only the model sees the redacted version.
Tool returns are redacted too. If a tool fetches data that happens to contain an email address or SSN, the redaction catches it before the model processes the result in subsequent turns. This is important because tool outputs are unpredictable — you cannot know ahead of time what external data will contain.
It runs on every model call. History processors fire before each run_stream() invocation, not just at the beginning of a conversation. As the conversation grows and includes more tool results, the redaction stays current.
Managing the Context Budget
Long conversations accumulate history fast. Every user message, every model response, every tool call and return — they all stack up. Eventually the history exceeds the model’s context window, or you start burning tokens on stale context that does not help the current task.
The context_budget_processor solves this with a three-part strategy: keep system prompts, keep recent turns, summarize the middle.
flowchart LR
SYS["System prompts\nalways kept"]
MID["Middle zone\nsummarized when over budget"]
REC["Last N turns\nalways kept"]
SYS -.->|"older"| MID -.->|"newer"| REC
HISTORY_TOKEN_BUDGET = 2000
HISTORY_KEEP_LAST = 10
HISTORY_SUMMARY_CHAR_LIMIT = 1200
async def context_budget_processor(
ctx: RunContext[AgentDeps],
history: list[ModelRequest | ModelResponse],
) -> list[ModelRequest | ModelResponse]:
"""Trim or summarize history to keep within the token budget."""
if _history_token_count(history) <= HISTORY_TOKEN_BUDGET:
return history # Under budget, pass through unchanged
last_start = max(len(history) - HISTORY_KEEP_LAST, 0)
# Always keep system prompts and the last N turns
system_indexes = [
i for i, msg in enumerate(history)
if _has_system_prompt(msg) and i < last_start
]
keep_indexes = set(system_indexes + list(range(last_start, len(history))))
# Everything else gets summarized
middle_messages = [
msg for i, msg in enumerate(history) if i not in keep_indexes
]
if not middle_messages:
return [history[i] for i in sorted(keep_indexes)]
# Summarize via Elasticsearch inference or truncate as fallback
middle_text = "\n".join(_message_text(msg) for msg in middle_messages)
summary = None
if ctx.deps.memory:
summary = await ctx.deps.memory.summarize_text(middle_text)
if not summary:
summary = middle_text[:HISTORY_SUMMARY_CHAR_LIMIT]
summary_part = SystemPromptPart(
content=f"Summary of earlier context: {summary}"
)
summary_message = ModelRequest(parts=[summary_part])
# Reconstruct: system prompts → summary block → messages from last_start onward
new_history: list[ModelRequest | ModelResponse] = []
for index, message in enumerate(history):
if index in system_indexes:
new_history.append(message)
if index == last_start:
new_history.append(summary_message)
if index >= last_start:
new_history.append(message)
return new_history
The reconstruction loop walks the original message list once: it appends each preserved system message when encountered, injects the single summary ModelRequest at the start of the recent window, then appends every message from last_start through the end (so the last N turns appear in order after the summary).
The logic breaks history into three zones:
- System prompts (indices before the recent window) — Kept explicitly so core instructions are not folded into the summary block.
- Last
HISTORY_KEEP_LASTturns — The recent window; kept verbatim because it is most likely relevant to the current task. - Middle zone — Messages that are neither in the kept system set nor in the recent window; collapsed into one summary
ModelRequestwhen over budget.
The summarization path is interesting. When Elasticsearch memory is available, the processor calls summarize_text(), which uses a configured completion model (Gemini Flash via Elasticsearch’s inference API) to produce a concise summary. When memory is not available, it falls back to simple character truncation. Either way, the middle zone collapses to a fraction of its original size.
The result is a token-bounded history that preserves the agent’s instructions, keeps recent context fresh, and retains a compressed version of older interactions — all without the model needing to know any of this happened.
Structured Logging
Every log entry in the framework is structured JSON, written to stdout for consumption by container orchestrators and log aggregation systems.
The logging configuration uses structlog with context variables for cross-request correlation:
def configure_logging() -> structlog.BoundLogger:
shared_processors = [
structlog.contextvars.merge_contextvars, # Inject request context
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.UnicodeDecoder(),
_add_service_context, # Service name + version
]
if settings.log_format == "console":
processors = [*shared_processors, structlog.dev.ConsoleRenderer(colors=True)]
else:
processors = [
*shared_processors,
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
]
structlog.configure(
processors=processors,
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
cache_logger_on_first_use=True,
)
Two format modes: json for production (machine-parseable, Elasticsearch-ready) and console for local development (colored, human-readable). Same processors, different renderers.
Log Sanitization
Log entries go through their own sanitization layer, separate from the PII redaction in history processors. This catches sensitive values that appear in tool arguments, API responses, and configuration:
_REDACT_PATTERN = re.compile(
r"(password|secret|token|api_key|authorization|bearer|credential)",
re.IGNORECASE,
)
MAX_ARG_LENGTH = 500
MAX_RESPONSE_LENGTH = 1000
def sanitize_for_logging(value: Any, max_length: int = MAX_ARG_LENGTH) -> Any:
if isinstance(value, str):
redacted = "[REDACTED]" if _REDACT_PATTERN.search(value) else value
if len(redacted) > max_length:
return redacted[:max_length - 3] + "..."
return redacted
elif isinstance(value, dict):
return {
k: "[REDACTED]" if _REDACT_PATTERN.search(str(k))
else sanitize_for_logging(v, max_length)
for k, v in value.items()
}
elif isinstance(value, (list, tuple)):
return [sanitize_for_logging(item, max_length) for item in value[:20]]
return value
This is recursive — dictionaries have their keys checked against the redaction pattern, and nested structures are walked. Tool arguments get truncated at 500 characters, tool responses at 1000. These limits prevent a single verbose tool return from bloating log storage.
The sanitization functions (sanitize_tool_args, sanitize_response) are used throughout the SSE streaming layer from Part 1 — every tool_call and tool_return event passes through them before reaching the client or the logs.
OpenTelemetry & Request Tracing
The framework uses OpenTelemetry for distributed tracing with automatic FastAPI instrumentation. Spans, structured logs, and usage documents (below) all attach to the same request lifecycle — complementary to Part 1’s SSE stream, where tool_call and tool_return payloads are sanitized before they reach the client.
flowchart TD
REQ["HTTP request"]
SPAN["Spans\n(FastAPIInstrumentor)"]
LOG["Structured logs\n(structlog + request_id)"]
MET["Usage document\n(Elasticsearch)"]
REQ --> SPAN
REQ --> LOG
REQ --> MET
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
resource = Resource.create({"service.name": settings.otel_service_name})
provider = TracerProvider(resource=resource)
if settings.otel_exporter_otlp_endpoint:
exporter = OTLPSpanExporter(endpoint=settings.otel_exporter_otlp_endpoint)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
FastAPIInstrumentor.instrument_app(app)
FastAPIInstrumentor wraps every HTTP handler with a span, capturing method, path, status code, and latency automatically. The OTLP exporter is only configured when a backend endpoint is set — in development, tracing is a no-op.
Request Correlation Middleware
A custom middleware bridges OpenTelemetry traces with structlog context:
@app.middleware("http")
async def request_logging_middleware(request: Request, call_next):
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
start_time = time.time()
bind_contextvars(
request_id=request_id,
method=request.method,
path=request.url.path,
)
logger.info("request_started", category="http_request", phase="start")
try:
response = await call_next(request)
latency_ms = (time.time() - start_time) * 1000
logger.info(
"request_completed",
category="http_request",
phase="end",
status_code=response.status_code,
latency_ms=round(latency_ms, 2),
)
response.headers["X-Request-ID"] = request_id
return response
except Exception as exc:
latency_ms = (time.time() - start_time) * 1000
logger.error(
"request_failed",
category="http_request",
phase="error",
error=str(exc),
latency_ms=round(latency_ms, 2),
)
raise
finally:
clear_contextvars()
bind_contextvars from structlog injects request_id, method, and path into every log entry emitted during the request — including logs from deep in the memory layer or MCP client. When the request completes, clear_contextvars cleans up. The X-Request-ID header is propagated back to the client, so frontend logs can be correlated with backend traces.
This means a single agent execution — which might involve memory queries, multiple MCP tool calls, and streaming responses — produces a coherent log trail tied to one request ID.
Usage Metrics
Every agent execution writes a usage document to Elasticsearch’s agent-usage-metrics data stream:
async def write_usage(
self, *, agent_id, session_id, model, mcp_tools, mcp_resources,
input_tokens, output_tokens, total_tokens,
cache_read_tokens, cache_write_tokens,
requests, tool_calls, latency_ms, context_window_size,
status, error_message=None, wait=False,
) -> None:
document = {
"@timestamp": now,
"metric_id": str(uuid4()),
"agent_id": agent_id,
"session_id": session_id,
"model": model,
"mcp_tools": mcp_tools,
"mcp_resources": mcp_resources,
"input_tokens": int(input_tokens),
"output_tokens": int(output_tokens),
"total_tokens": int(total_tokens),
"cache_read_tokens": int(cache_read_tokens),
"cache_write_tokens": int(cache_write_tokens),
"requests": int(requests),
"tool_calls": int(tool_calls),
"latency_ms": float(latency_ms),
"context_window_size": int(context_window_size),
"status": status,
"error_message": error_message,
}
# Fire-and-forget indexing to avoid blocking the response
_ = asyncio.create_task(
self._index_document(settings.memory_usage_index, document)
)
This captures everything needed for cost attribution and performance analysis: which model ran, how many tokens were consumed (including cache hits), which tools were invoked, total latency, and whether the execution succeeded or failed.
The wait parameter controls whether indexing is synchronous (for tests) or fire-and-forget (in production). Agent responses should never block on metrics persistence.
Because usage documents land in Elasticsearch with @timestamp, they slot into standard Kibana dashboards for time-series analysis. You can query token consumption per model, average latency per agent, tool call frequency, error rates — all with standard Elasticsearch aggregations. This data stream shares the same ILM policy as the memory indices, so old metrics age out automatically.
Graceful Degradation
A recurring pattern across the framework: optional subsystems never block core execution.
- If Elasticsearch is unreachable, memory operations log a warning and return empty results. The agent executes without memory context.
- If the hybrid search query fails (inference model not deployed), it falls back to a plain
matchquery for text search. - If summarization fails in the context budget processor, it falls back to character truncation.
- If the OTLP endpoint is not configured, tracing becomes a no-op.
- If usage metric indexing fails, it fails silently in a background task.
This is a deliberate design choice. An agent that cannot answer because the metrics backend is down is worse than an agent that answers without recording metrics. Each subsystem degrades independently, and the core request-response path has the fewest hard dependencies possible.
Series Wrap-Up
Across three posts, this series covered:
- Part 1 — Architecture overview, Elasticsearch-backed quad-core memory (episodic, semantic, procedural, state), hybrid search with RRF and reranking, streaming SSE
- Part 2 — MaaS (MCP as a Service), FastMCP proxy aggregation, role-based tool filtering with transforms, helper tools bridging MCP primitives
- Part 3 — Privacy redaction, context budget management, structured logging, OpenTelemetry tracing, usage metrics, graceful degradation
The framework is built on a few core convictions: agents need memory beyond the context window, tool access should be centralized and access-controlled, every execution should be observable, and production systems must degrade gracefully. PydanticAI, FastMCP, and Elasticsearch each handle their part cleanly — the value is in how they compose.