Call-Spec Discipline¶
AF is built on a simple principle: separate declaration from execution.
The Core Insight¶
In agent systems, mixing "what to do" with "when to do it" creates cognitive load for both humans and LLMs. When a function call immediately triggers execution, you can't read the code to understand when side effects occur.
AF addresses this by treating agent calls as specifications that only execute when explicitly awaited.
The Problem: Why Call-Spec Matters¶
Pure SDK + ChatKit: The Boilerplate Problem¶
Writing complex multi-agent flows with the Pure Agents SDK requires significant boilerplate:
Pure SDK — ~126 lines of ceremony
"""Pure Agents SDK - Complex multi-agent with ChatKit streaming."""
import asyncio
from typing import Any
from agents import Agent, ModelSettings, Runner
from agents.extensions.chatkit import (
AgentContext,
close_workflow,
emit_phase_label,
stream_agent_response,
)
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai.types.shared.reasoning import Reasoning
app = FastAPI()
def create_agent(name: str, instructions: str) -> Agent:
return Agent(
name=name,
instructions=instructions,
model="gpt-5.2",
model_settings=ModelSettings(
store=True,
reasoning=Reasoning(effort="medium", summary="auto"),
),
)
classifier = create_agent("classifier", "Classify as SIMPLE or COMPLEX.")
researcher = create_agent("researcher", "Research the topic.")
reviewer = create_agent("reviewer", "Review. Reply APPROVED or REJECTED.")
refiner = create_agent("refiner", "Refine based on feedback.")
responder = create_agent("responder", "Give final response.")
def to_messages(text: str) -> list[dict[str, Any]]:
return [{"role": "user", "content": [{"type": "input_text", "text": text}]}]
@app.post("/chatkit")
async def chatkit_endpoint(request: Request):
body = await request.json()
messages = body.get("messages", [])
user_input = messages[-1]["content"][0]["text"]
agent_context = AgentContext()
event_queue: asyncio.Queue = asyncio.Queue()
async def flow_logic():
try:
# Phase 1: Classification
emit_phase_label(agent_context, "Classification")
result = Runner.run_streamed(classifier, to_messages(user_input), context=agent_context)
async for event in stream_agent_response(agent_context, result):
await event_queue.put(event)
classification = result.final_output
await close_workflow(agent_context)
# Phase 2: Research (conditional)
if "COMPLEX" in classification.upper():
emit_phase_label(agent_context, "Research")
result = Runner.run_streamed(
researcher, to_messages(user_input), context=agent_context
)
async for event in stream_agent_response(agent_context, result):
await event_queue.put(event)
draft = result.final_output
await close_workflow(agent_context)
else:
draft = user_input
# Phase 3: Review loop
for attempt in range(3):
emit_phase_label(agent_context, f"Review (attempt {attempt + 1})")
result = Runner.run_streamed(
reviewer, to_messages(f"Review:\n{draft}"), context=agent_context
)
async for event in stream_agent_response(agent_context, result):
await event_queue.put(event)
review = result.final_output
await close_workflow(agent_context)
if "APPROVED" in review.upper():
break
emit_phase_label(agent_context, f"Refinement (attempt {attempt + 1})")
result = Runner.run_streamed(
refiner,
to_messages(f"Draft:\n{draft}\n\nFeedback:\n{review}"),
context=agent_context,
)
async for event in stream_agent_response(agent_context, result):
await event_queue.put(event)
draft = result.final_output
await close_workflow(agent_context)
# Phase 4: Final response
emit_phase_label(agent_context, "Final Response")
result = Runner.run_streamed(
responder, to_messages(f"Based on:\n{draft}"), context=agent_context
)
async for event in stream_agent_response(agent_context, result):
await event_queue.put(event)
await close_workflow(agent_context)
except Exception:
try:
await close_workflow(agent_context)
except Exception:
pass
raise
finally:
await event_queue.put(None)
async def event_generator():
asyncio.create_task(flow_logic())
while True:
event = await event_queue.get()
if event is None:
break
yield f"data: {event.model_dump_json()}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
What's wrong with this?
- Manual
emit_phase_label()+close_workflow()for every phase -
async for event in stream_agent_response()repeated everywhere -
try/finallyblocks to ensureclose_workflow()on errors - af.Event queue management boilerplate
- Business logic buried in infrastructure code
The Solution: AF¶
The same workflow in AF:
"""AgenticFlow - Same complex flow, clean code."""
import agentic_flow as af
classifier = af.Agent(
name="classifier",
instructions="Classify as SIMPLE or COMPLEX.",
model="gpt-5.2",
model_settings=af.reasoning("medium"),
)
researcher = af.Agent(name="researcher", instructions="Research.", model="gpt-5.2")
reviewer = af.Agent(name="reviewer", instructions="Review. APPROVED or REJECTED.", model="gpt-5.2")
refiner = af.Agent(name="refiner", instructions="Refine based on feedback.", model="gpt-5.2")
responder = af.Agent(name="responder", instructions="Give final response.", model="gpt-5.2")
async def my_flow(user_message: str) -> str:
# Internal classification - not saved to session
async with af.phase("Classification"):
classification = await classifier(user_message).stream()
if "COMPLEX" in classification.upper():
# Internal research - not saved to session
async with af.phase("Research"):
draft = await researcher(user_message).stream()
else:
draft = user_message
for attempt in range(3):
# Internal review - not saved to session
async with af.phase(f"Review (attempt {attempt + 1})"):
review = await reviewer(f"Review:\n{draft}").stream()
if "APPROVED" in review.upper():
break
# Internal refinement - not saved to session
async with af.phase(f"Refinement (attempt {attempt + 1})"):
draft = await refiner(f"Draft:\n{draft}\n\nFeedback:\n{review}").stream()
# persist=True saves the final response to session
async with af.phase("Final Response", persist=True):
return await responder(f"Based on:\n{draft}").stream()
"""AgenticFlow ChatKit Server - Minimal boilerplate."""
from agents import SQLiteSession
from chatkit.server import ChatKitServer
import agentic_flow as af
from .agenticflow_flow import my_flow
class MyServer(ChatKitServer):
async def respond(self, thread, item, context):
user_message = item.content[0].text if item else ""
session = SQLiteSession(session_id=thread.id, db_path="chat.db")
runner = af.Runner(flow=my_flow, session=session)
async for event in af.chatkit.run_with_chatkit_context(
runner, thread, self.store, context, user_message
):
yield event
"""AgenticFlow CLI - Simple streaming handler."""
from agents import SQLiteSession
import agentic_flow as af
from .agenticflow_flow import my_flow
def cli_handler(event):
if hasattr(event, "data") and hasattr(event.data, "delta"):
print(event.data.delta, end="", flush=True)
runner = af.Runner(flow=my_flow, session=SQLiteSession("chat.db"), handler=cli_handler)
result = runner.run_sync("Explain quantum computing")
Side-by-Side Comparison¶
Comparison Table¶
| Aspect | Pure SDK | AF |
|---|---|---|
| Lines of code | ~126 | ~43 |
| Phase management | Manual emit_phase_label + close_workflow |
Automatic async with af.phase() |
| Streaming | async for event in stream_agent_response() |
.stream() |
| Error handling | Manual try/finally | Automatic cleanup |
| af.Event queue | Manual management | Handled internally |
| Business logic | Buried in boilerplate | Clear and visible |
| Adding streaming | Structural rewrite | Add .stream() |
Declaration vs Execution¶
import agentic_flow as af
assistant = af.Agent(name="assistant", instructions="Help the user.", model="gpt-5.2")
# Declaration — creates a specification
spec = assistant("What is Python?")
# Execution — runs the agent
result = await spec
The specification (ExecutionSpec) captures:
- Which agent to run
- What prompt to send
- How to run it (streaming, silent, isolated, snapshot)
Execution happens only when:
- You
awaitthe specification
The Five Axes¶
AF separates concerns into five orthogonal axes:
graph LR
subgraph WHAT["WHAT (Capabilities)"]
A(prompt)
B(instructions)
C(tools)
end
subgraph WHERE["WHERE (Boundaries)"]
D(session)
E(phase)
F(isolated)
F2(snapshot)
end
subgraph HOW["HOW (Display)"]
G(streaming)
H(silent)
I(handler)
end
subgraph LIMITS["LIMITS (Constraints)"]
J(max_turns)
K(guardrails)
end
subgraph WHEN["WHEN (Lifecycle)"]
L(hooks)
M(events)
end
| Axis | Controls | Specified At |
|---|---|---|
| WHAT | Agent capabilities | af.Agent(...), agent(prompt) |
| WHERE | Data flow boundaries | phase(), .isolated(), .snapshot(), af.Runner(session=...) |
| HOW | Display and observation | .stream(), .silent(), af.Runner(handler=...) |
| LIMITS | Execution constraints | .max_turns(), .run_config() |
| WHEN | Lifecycle observation | af.Agent(hooks=...), events |
WHAT — Agent Capabilities¶
Defines what the agent can do:
prompt— The input messageinstructions— System prompttools— Available toolsoutput_type— Structured output schemamodel,model_settings— Model configurationhandoffs— Delegation targets
WHERE — Data Flow Boundaries¶
Controls where data flows:
Session— Global conversation historyPhaseSession— Local thinking space.snapshot()— Read-only context (concurrent-safe).isolated()— No context (stateless)Context— Dependency injection (SDK pass-through via.context())
These are NOT hidden. You can explicitly access them:
from agentic_flow.agent import current_session, current_handler, current_phase_session
async def my_flow(user_message: str):
# Access current Session
session = current_session.get()
if session:
history = await session.get_items()
print(f"History: {len(history)} messages")
# Access current Handler
handler = current_handler.get()
# Access current PhaseSession (if inside phase)
phase_ctx = current_phase_session.get()
result = await agent(user_message).stream()
return result
AgenticFlow uses Python's contextvars to inject these dependencies. They are publicly accessible and documented.
For details, see Context Resolution
HOW — Display and Observation¶
Controls how execution appears:
.stream()— Internal streaming execution (display is full-text).silent()— Suppress UIhandler— Custom event processingtracing— Execution observation (SDK pass-through via.run_config())
LIMITS — Execution Constraints¶
Controls execution boundaries:
.max_turns(n)— Limit agent turnsguardrails— Input/output validation (SDK pass-through)tool_use_behavior— Tool execution control (SDK pass-through)
WHEN — Lifecycle Observation¶
Observe execution lifecycle:
PhaseStarted,PhaseEnded— AF eventsAgentHooks— Agent lifecycle (SDK pass-through)RunHooks— Execution lifecycle (SDK pass-through)
These axes don't mix
You can't pass stream=True to agent(). You can't pass handler to phase(). This separation is enforced by the API.
The Single Execution Trigger¶
There is exactly one way to execute an agent: await.
# These don't execute:
spec = agent("prompt")
spec = agent("prompt").stream()
spec = agent("prompt").silent().isolated()
spec = agent("prompt").snapshot().stream()
# This executes:
result = await spec
This makes execution points visible in your code. You can read a flow and know exactly where agents run.
Modifiers Are Declarative¶
Modifiers configure execution without triggering it:
spec = assistant("Hello") # ExecutionSpec
spec = spec.stream() # Still ExecutionSpec (with streaming flag)
spec = spec.silent() # Still ExecutionSpec (with silent flag)
result = await spec # Now it executes
Modifier order doesn't matter
Boundaries Are Explicit¶
phase() creates explicit execution boundaries:
async with af.phase("Research"):
# Boundary start is visible here
result = await researcher(query).stream()
# Boundary end is guaranteed (even on exception)
Why This Matters¶
-
For Humans
- Readable: Execution points visible by scanning for
await - Debuggable: Set breakpoints at the single execution trigger
- Maintainable: Adding streaming is one modifier, not a rewrite
- Readable: Execution points visible by scanning for
-
For LLMs
- Predictable: No hidden state transitions to track
- Verifiable: Invariants can be checked (boundaries always close)
- Autonomous: Clearer code is easier for LLMs to understand
Summary¶
| Principle | Implementation |
|---|---|
| Call ≠ Execute | agent(prompt) returns ExecutionSpec, not result |
| Single trigger | Only await executes |
| Modifiers are flags | .stream(), .silent(), .isolated(), .snapshot(), .max_turns() don't execute |
| Boundaries are visible | async with af.phase() marks start/end |
| Axes are separate | WHAT / WHERE / HOW / LIMITS / WHEN don't mix |
Next: ExecutionSpec