AF - Agentic Flow Framework¶
Call-Spec Discipline for LLM Agent Workflows
See the Difference¶
Writing multi-agent flows with the Pure SDK requires significant boilerplate. The agentic flow pattern eliminates it.
Pure SDK + ChatKit — ~126 lines of ceremony
"""Pure Agents SDK - Complex multi-agent with ChatKit streaming."""
import asyncio
from typing import Any
from agents import Agent, ModelSettings, Runner
from agents.extensions.chatkit import (
AgentContext,
close_workflow,
emit_phase_label,
stream_agent_response,
)
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai.types.shared.reasoning import Reasoning
app = FastAPI()
def create_agent(name: str, instructions: str) -> Agent:
return Agent(
name=name,
instructions=instructions,
model="gpt-5.2",
model_settings=ModelSettings(
store=True,
reasoning=Reasoning(effort="medium", summary="auto"),
),
)
classifier = create_agent("classifier", "Classify as SIMPLE or COMPLEX.")
researcher = create_agent("researcher", "Research the topic.")
reviewer = create_agent("reviewer", "Review. Reply APPROVED or REJECTED.")
refiner = create_agent("refiner", "Refine based on feedback.")
responder = create_agent("responder", "Give final response.")
def to_messages(text: str) -> list[dict[str, Any]]:
return [{"role": "user", "content": [{"type": "input_text", "text": text}]}]
@app.post("/chatkit")
async def chatkit_endpoint(request: Request):
body = await request.json()
messages = body.get("messages", [])
user_input = messages[-1]["content"][0]["text"]
agent_context = AgentContext()
event_queue: asyncio.Queue = asyncio.Queue()
async def flow_logic():
try:
# Phase 1: Classification
emit_phase_label(agent_context, "Classification")
result = Runner.run_streamed(classifier, to_messages(user_input), context=agent_context)
async for event in stream_agent_response(agent_context, result):
await event_queue.put(event)
classification = result.final_output
await close_workflow(agent_context)
# Phase 2: Research (conditional)
if "COMPLEX" in classification.upper():
emit_phase_label(agent_context, "Research")
result = Runner.run_streamed(
researcher, to_messages(user_input), context=agent_context
)
async for event in stream_agent_response(agent_context, result):
await event_queue.put(event)
draft = result.final_output
await close_workflow(agent_context)
else:
draft = user_input
# Phase 3: Review loop
for attempt in range(3):
emit_phase_label(agent_context, f"Review (attempt {attempt + 1})")
result = Runner.run_streamed(
reviewer, to_messages(f"Review:\n{draft}"), context=agent_context
)
async for event in stream_agent_response(agent_context, result):
await event_queue.put(event)
review = result.final_output
await close_workflow(agent_context)
if "APPROVED" in review.upper():
break
emit_phase_label(agent_context, f"Refinement (attempt {attempt + 1})")
result = Runner.run_streamed(
refiner,
to_messages(f"Draft:\n{draft}\n\nFeedback:\n{review}"),
context=agent_context,
)
async for event in stream_agent_response(agent_context, result):
await event_queue.put(event)
draft = result.final_output
await close_workflow(agent_context)
# Phase 4: Final response
emit_phase_label(agent_context, "Final Response")
result = Runner.run_streamed(
responder, to_messages(f"Based on:\n{draft}"), context=agent_context
)
async for event in stream_agent_response(agent_context, result):
await event_queue.put(event)
await close_workflow(agent_context)
except Exception:
try:
await close_workflow(agent_context)
except Exception:
pass
raise
finally:
await event_queue.put(None)
async def event_generator():
asyncio.create_task(flow_logic())
while True:
event = await event_queue.get()
if event is None:
break
yield f"data: {event.model_dump_json()}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
Problems:
- Manual
emit_phase_label()+close_workflow()for every phase -
async for event in stream_agent_response()repeated everywhere -
try/finallyblocks to ensureclose_workflow()on errors - Event queue management boilerplate
- Business logic buried in infrastructure code
The same workflow using the agentic flow approach:
"""AgenticFlow - Same complex flow, clean code."""
import agentic_flow as af
classifier = af.Agent(
name="classifier",
instructions="Classify as SIMPLE or COMPLEX.",
model="gpt-5.2",
model_settings=af.reasoning("medium"),
)
researcher = af.Agent(name="researcher", instructions="Research.", model="gpt-5.2")
reviewer = af.Agent(name="reviewer", instructions="Review. APPROVED or REJECTED.", model="gpt-5.2")
refiner = af.Agent(name="refiner", instructions="Refine based on feedback.", model="gpt-5.2")
responder = af.Agent(name="responder", instructions="Give final response.", model="gpt-5.2")
async def my_flow(user_message: str) -> str:
# Internal classification - not saved to session
async with af.phase("Classification"):
classification = await classifier(user_message).stream()
if "COMPLEX" in classification.upper():
# Internal research - not saved to session
async with af.phase("Research"):
draft = await researcher(user_message).stream()
else:
draft = user_message
for attempt in range(3):
# Internal review - not saved to session
async with af.phase(f"Review (attempt {attempt + 1})"):
review = await reviewer(f"Review:\n{draft}").stream()
if "APPROVED" in review.upper():
break
# Internal refinement - not saved to session
async with af.phase(f"Refinement (attempt {attempt + 1})"):
draft = await refiner(f"Draft:\n{draft}\n\nFeedback:\n{review}").stream()
# persist=True saves the final response to session
async with af.phase("Final Response", persist=True):
return await responder(f"Based on:\n{draft}").stream()
"""AgenticFlow ChatKit Server - Minimal boilerplate."""
from agents import SQLiteSession
from chatkit.server import ChatKitServer
import agentic_flow as af
from .agenticflow_flow import my_flow
class MyServer(ChatKitServer):
async def respond(self, thread, item, context):
user_message = item.content[0].text if item else ""
session = SQLiteSession(session_id=thread.id, db_path="chat.db")
runner = af.Runner(flow=my_flow, session=session)
async for event in af.chatkit.run_with_chatkit_context(
runner, thread, self.store, context, user_message
):
yield event
"""AgenticFlow CLI - Simple streaming handler."""
from agents import SQLiteSession
import agentic_flow as af
from .agenticflow_flow import my_flow
def cli_handler(event):
if hasattr(event, "data") and hasattr(event.data, "delta"):
print(event.data.delta, end="", flush=True)
runner = af.Runner(flow=my_flow, session=SQLiteSession("chat.db"), handler=cli_handler)
result = runner.run_sync("Explain quantum computing")
| Aspect | Pure SDK | AF |
|---|---|---|
| Lines of code | ~126 | ~43 |
| Phase management | Manual | Automatic |
| Error handling | try/finally | Automatic |
| Adding streaming | Rewrite | .stream() |
How It Works: Call-Spec Discipline¶
The secret is simple: separate declaration from execution.
import agentic_flow as af
assistant = af.Agent(name="assistant", instructions="Help the user.", model="gpt-5.2")
# Declaration — creates a specification, NOT executed
spec = assistant("Hello")
# Execution — happens here, and ONLY here
result = await spec
| Expression | What it does | Executes? |
|---|---|---|
agent(prompt) |
Creates ExecutionSpec[T] |
No |
.stream() / .silent() / .isolated() |
Adds modifiers | No |
await spec |
Runs the agent | Yes |
This makes your code:
- Readable — Execution points visible by scanning for
await - Debuggable — Set breakpoints at the single execution trigger
- Maintainable — Adding streaming is one modifier, not a structural rewrite
Learn more about Call-Spec Discipline
Quick Start¶
import agentic_flow as af
from agents import SQLiteSession
researcher = af.Agent(name="researcher", instructions="Research topics.", model="gpt-5.2")
responder = af.Agent(name="responder", instructions="Respond to user.", model="gpt-5.2")
async def my_flow(user_message: str) -> str:
# Internal thinking - not saved to session
async with af.phase("Research"):
research = await researcher(user_message).stream()
# persist=True saves the final response to session
async with af.phase("Response", persist=True):
return await responder(f"Based on: {research}").stream()
runner = af.Runner(flow=my_flow, session=SQLiteSession("chat.db"))
result = await runner("What is Python?")
Installation¶
Next Steps¶
-
Getting Started
Install and run your first agent in 5 minutes
-
Concepts
Deep dive into Call-Spec discipline
-
Examples
Multi-agent workflows, review loops, and more
-
API Reference
Complete API documentation