Skip to content

Call-Spec Discipline

AF is built on a simple principle: separate declaration from execution.

The Core Insight

In agent systems, mixing "what to do" with "when to do it" creates cognitive load for both humans and LLMs. When a function call immediately triggers execution, you can't read the code to understand when side effects occur.

AF addresses this by treating agent calls as specifications that only execute when explicitly awaited.


The Problem: Why Call-Spec Matters

Pure SDK + ChatKit: The Boilerplate Problem

Writing complex multi-agent flows with the Pure Agents SDK requires significant boilerplate:

Pure SDK — ~126 lines of ceremony
"""Pure Agents SDK - Complex multi-agent with ChatKit streaming."""

import asyncio
from typing import Any

from agents import Agent, ModelSettings, Runner
from agents.extensions.chatkit import (
    AgentContext,
    close_workflow,
    emit_phase_label,
    stream_agent_response,
)
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai.types.shared.reasoning import Reasoning

app = FastAPI()


def create_agent(name: str, instructions: str) -> Agent:
    return Agent(
        name=name,
        instructions=instructions,
        model="gpt-5.2",
        model_settings=ModelSettings(
            store=True,
            reasoning=Reasoning(effort="medium", summary="auto"),
        ),
    )


classifier = create_agent("classifier", "Classify as SIMPLE or COMPLEX.")
researcher = create_agent("researcher", "Research the topic.")
reviewer = create_agent("reviewer", "Review. Reply APPROVED or REJECTED.")
refiner = create_agent("refiner", "Refine based on feedback.")
responder = create_agent("responder", "Give final response.")


def to_messages(text: str) -> list[dict[str, Any]]:
    return [{"role": "user", "content": [{"type": "input_text", "text": text}]}]


@app.post("/chatkit")
async def chatkit_endpoint(request: Request):
    body = await request.json()
    messages = body.get("messages", [])
    user_input = messages[-1]["content"][0]["text"]

    agent_context = AgentContext()
    event_queue: asyncio.Queue = asyncio.Queue()

    async def flow_logic():
        try:
            # Phase 1: Classification
            emit_phase_label(agent_context, "Classification")
            result = Runner.run_streamed(classifier, to_messages(user_input), context=agent_context)
            async for event in stream_agent_response(agent_context, result):
                await event_queue.put(event)
            classification = result.final_output
            await close_workflow(agent_context)

            # Phase 2: Research (conditional)
            if "COMPLEX" in classification.upper():
                emit_phase_label(agent_context, "Research")
                result = Runner.run_streamed(
                    researcher, to_messages(user_input), context=agent_context
                )
                async for event in stream_agent_response(agent_context, result):
                    await event_queue.put(event)
                draft = result.final_output
                await close_workflow(agent_context)
            else:
                draft = user_input

            # Phase 3: Review loop
            for attempt in range(3):
                emit_phase_label(agent_context, f"Review (attempt {attempt + 1})")
                result = Runner.run_streamed(
                    reviewer, to_messages(f"Review:\n{draft}"), context=agent_context
                )
                async for event in stream_agent_response(agent_context, result):
                    await event_queue.put(event)
                review = result.final_output
                await close_workflow(agent_context)

                if "APPROVED" in review.upper():
                    break

                emit_phase_label(agent_context, f"Refinement (attempt {attempt + 1})")
                result = Runner.run_streamed(
                    refiner,
                    to_messages(f"Draft:\n{draft}\n\nFeedback:\n{review}"),
                    context=agent_context,
                )
                async for event in stream_agent_response(agent_context, result):
                    await event_queue.put(event)
                draft = result.final_output
                await close_workflow(agent_context)

            # Phase 4: Final response
            emit_phase_label(agent_context, "Final Response")
            result = Runner.run_streamed(
                responder, to_messages(f"Based on:\n{draft}"), context=agent_context
            )
            async for event in stream_agent_response(agent_context, result):
                await event_queue.put(event)
            await close_workflow(agent_context)

        except Exception:
            try:
                await close_workflow(agent_context)
            except Exception:
                pass
            raise
        finally:
            await event_queue.put(None)

    async def event_generator():
        asyncio.create_task(flow_logic())
        while True:
            event = await event_queue.get()
            if event is None:
                break
            yield f"data: {event.model_dump_json()}\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")

What's wrong with this?

  • Manual emit_phase_label() + close_workflow() for every phase
  • async for event in stream_agent_response() repeated everywhere
  • try/finally blocks to ensure close_workflow() on errors
  • af.Event queue management boilerplate
  • Business logic buried in infrastructure code

The Solution: AF

The same workflow in AF:

"""AgenticFlow - Same complex flow, clean code."""

import agentic_flow as af

classifier = af.Agent(
    name="classifier",
    instructions="Classify as SIMPLE or COMPLEX.",
    model="gpt-5.2",
    model_settings=af.reasoning("medium"),
)
researcher = af.Agent(name="researcher", instructions="Research.", model="gpt-5.2")
reviewer = af.Agent(name="reviewer", instructions="Review. APPROVED or REJECTED.", model="gpt-5.2")
refiner = af.Agent(name="refiner", instructions="Refine based on feedback.", model="gpt-5.2")
responder = af.Agent(name="responder", instructions="Give final response.", model="gpt-5.2")


async def my_flow(user_message: str) -> str:
    # Internal classification - not saved to session
    async with af.phase("Classification"):
        classification = await classifier(user_message).stream()

    if "COMPLEX" in classification.upper():
        # Internal research - not saved to session
        async with af.phase("Research"):
            draft = await researcher(user_message).stream()
    else:
        draft = user_message

    for attempt in range(3):
        # Internal review - not saved to session
        async with af.phase(f"Review (attempt {attempt + 1})"):
            review = await reviewer(f"Review:\n{draft}").stream()

        if "APPROVED" in review.upper():
            break

        # Internal refinement - not saved to session
        async with af.phase(f"Refinement (attempt {attempt + 1})"):
            draft = await refiner(f"Draft:\n{draft}\n\nFeedback:\n{review}").stream()

    # persist=True saves the final response to session
    async with af.phase("Final Response", persist=True):
        return await responder(f"Based on:\n{draft}").stream()
"""AgenticFlow ChatKit Server - Minimal boilerplate."""

from agents import SQLiteSession
from chatkit.server import ChatKitServer

import agentic_flow as af

from .agenticflow_flow import my_flow


class MyServer(ChatKitServer):
    async def respond(self, thread, item, context):
        user_message = item.content[0].text if item else ""
        session = SQLiteSession(session_id=thread.id, db_path="chat.db")
        runner = af.Runner(flow=my_flow, session=session)

        async for event in af.chatkit.run_with_chatkit_context(
            runner, thread, self.store, context, user_message
        ):
            yield event
"""AgenticFlow CLI - Simple streaming handler."""

from agents import SQLiteSession

import agentic_flow as af

from .agenticflow_flow import my_flow


def cli_handler(event):
    if hasattr(event, "data") and hasattr(event.data, "delta"):
        print(event.data.delta, end="", flush=True)


runner = af.Runner(flow=my_flow, session=SQLiteSession("chat.db"), handler=cli_handler)
result = runner.run_sync("Explain quantum computing")

Side-by-Side Comparison

# Phase management
emit_phase_label(ctx, "Research")
result = Runner.run_streamed(agent, msgs, context=ctx)
async for event in stream_agent_response(ctx, result):
    await queue.put(event)
output = result.final_output
await close_workflow(ctx)
# Phase management
async with af.phase("Research"):
    output = await agent(msg).stream()
# Error handling
try:
    emit_phase_label(ctx, "Work")
    # ... agent execution ...
    await close_workflow(ctx)
except Exception:
    try:
        await close_workflow(ctx)
    except Exception:
        pass
    raise
# Error handling
async with af.phase("Work"):
    result = await agent(msg).stream()
# Cleanup is automatic

Comparison Table

Aspect Pure SDK AF
Lines of code ~126 ~43
Phase management Manual emit_phase_label + close_workflow Automatic async with af.phase()
Streaming async for event in stream_agent_response() .stream()
Error handling Manual try/finally Automatic cleanup
af.Event queue Manual management Handled internally
Business logic Buried in boilerplate Clear and visible
Adding streaming Structural rewrite Add .stream()

Declaration vs Execution

import agentic_flow as af

assistant = af.Agent(name="assistant", instructions="Help the user.", model="gpt-5.2")

# Declaration — creates a specification
spec = assistant("What is Python?")

# Execution — runs the agent
result = await spec

The specification (ExecutionSpec) captures:

  • Which agent to run
  • What prompt to send
  • How to run it (streaming, silent, isolated, snapshot)

Execution happens only when:

  • You await the specification

The Five Axes

AF separates concerns into five orthogonal axes:

graph LR
    subgraph WHAT["WHAT (Capabilities)"]
        A(prompt)
        B(instructions)
        C(tools)
    end

    subgraph WHERE["WHERE (Boundaries)"]
        D(session)
        E(phase)
        F(isolated)
        F2(snapshot)
    end

    subgraph HOW["HOW (Display)"]
        G(streaming)
        H(silent)
        I(handler)
    end

    subgraph LIMITS["LIMITS (Constraints)"]
        J(max_turns)
        K(guardrails)
    end

    subgraph WHEN["WHEN (Lifecycle)"]
        L(hooks)
        M(events)
    end
Axis Controls Specified At
WHAT Agent capabilities af.Agent(...), agent(prompt)
WHERE Data flow boundaries phase(), .isolated(), .snapshot(), af.Runner(session=...)
HOW Display and observation .stream(), .silent(), af.Runner(handler=...)
LIMITS Execution constraints .max_turns(), .run_config()
WHEN Lifecycle observation af.Agent(hooks=...), events

WHAT — Agent Capabilities

Defines what the agent can do:

  • prompt — The input message
  • instructions — System prompt
  • tools — Available tools
  • output_type — Structured output schema
  • model, model_settings — Model configuration
  • handoffs — Delegation targets

WHERE — Data Flow Boundaries

Controls where data flows:

  • Session — Global conversation history
  • PhaseSession — Local thinking space
  • .snapshot() — Read-only context (concurrent-safe)
  • .isolated() — No context (stateless)
  • Context — Dependency injection (SDK pass-through via .context())

These are NOT hidden. You can explicitly access them:

from agentic_flow.agent import current_session, current_handler, current_phase_session

async def my_flow(user_message: str):
    # Access current Session
    session = current_session.get()
    if session:
        history = await session.get_items()
        print(f"History: {len(history)} messages")

    # Access current Handler
    handler = current_handler.get()

    # Access current PhaseSession (if inside phase)
    phase_ctx = current_phase_session.get()

    result = await agent(user_message).stream()
    return result

AgenticFlow uses Python's contextvars to inject these dependencies. They are publicly accessible and documented.

For details, see Context Resolution

HOW — Display and Observation

Controls how execution appears:

  • .stream() — Internal streaming execution (display is full-text)
  • .silent() — Suppress UI
  • handler — Custom event processing
  • tracing — Execution observation (SDK pass-through via .run_config())

LIMITS — Execution Constraints

Controls execution boundaries:

  • .max_turns(n) — Limit agent turns
  • guardrails — Input/output validation (SDK pass-through)
  • tool_use_behavior — Tool execution control (SDK pass-through)

WHEN — Lifecycle Observation

Observe execution lifecycle:

  • PhaseStarted, PhaseEnded — AF events
  • AgentHooks — Agent lifecycle (SDK pass-through)
  • RunHooks — Execution lifecycle (SDK pass-through)

These axes don't mix

You can't pass stream=True to agent(). You can't pass handler to phase(). This separation is enforced by the API.


The Single Execution Trigger

There is exactly one way to execute an agent: await.

# These don't execute:
spec = agent("prompt")
spec = agent("prompt").stream()
spec = agent("prompt").silent().isolated()
spec = agent("prompt").snapshot().stream()

# This executes:
result = await spec

This makes execution points visible in your code. You can read a flow and know exactly where agents run.


Modifiers Are Declarative

Modifiers configure execution without triggering it:

spec = assistant("Hello")          # ExecutionSpec
spec = spec.stream()               # Still ExecutionSpec (with streaming flag)
spec = spec.silent()               # Still ExecutionSpec (with silent flag)
result = await spec                # Now it executes

Modifier order doesn't matter

await agent("prompt").stream().silent()    # Same as
await agent("prompt").silent().stream()    # This

Boundaries Are Explicit

phase() creates explicit execution boundaries:

async with af.phase("Research"):
    # Boundary start is visible here
    result = await researcher(query).stream()
    # Boundary end is guaranteed (even on exception)

Why This Matters

  • For Humans


    • Readable: Execution points visible by scanning for await
    • Debuggable: Set breakpoints at the single execution trigger
    • Maintainable: Adding streaming is one modifier, not a rewrite
  • For LLMs


    • Predictable: No hidden state transitions to track
    • Verifiable: Invariants can be checked (boundaries always close)
    • Autonomous: Clearer code is easier for LLMs to understand

Summary

Principle Implementation
Call ≠ Execute agent(prompt) returns ExecutionSpec, not result
Single trigger Only await executes
Modifiers are flags .stream(), .silent(), .isolated(), .snapshot(), .max_turns() don't execute
Boundaries are visible async with af.phase() marks start/end
Axes are separate WHAT / WHERE / HOW / LIMITS / WHEN don't mix

Next: ExecutionSpec