Skip to content

AF - Agentic Flow Framework

Call-Spec Discipline for LLM Agent Workflows


See the Difference

Writing multi-agent flows with the Pure SDK requires significant boilerplate. The agentic flow pattern eliminates it.

Pure SDK + ChatKit — ~126 lines of ceremony
"""Pure Agents SDK - Complex multi-agent with ChatKit streaming."""

import asyncio
from typing import Any

from agents import Agent, ModelSettings, Runner
from agents.extensions.chatkit import (
    AgentContext,
    close_workflow,
    emit_phase_label,
    stream_agent_response,
)
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai.types.shared.reasoning import Reasoning

app = FastAPI()


def create_agent(name: str, instructions: str) -> Agent:
    return Agent(
        name=name,
        instructions=instructions,
        model="gpt-5.2",
        model_settings=ModelSettings(
            store=True,
            reasoning=Reasoning(effort="medium", summary="auto"),
        ),
    )


classifier = create_agent("classifier", "Classify as SIMPLE or COMPLEX.")
researcher = create_agent("researcher", "Research the topic.")
reviewer = create_agent("reviewer", "Review. Reply APPROVED or REJECTED.")
refiner = create_agent("refiner", "Refine based on feedback.")
responder = create_agent("responder", "Give final response.")


def to_messages(text: str) -> list[dict[str, Any]]:
    return [{"role": "user", "content": [{"type": "input_text", "text": text}]}]


@app.post("/chatkit")
async def chatkit_endpoint(request: Request):
    body = await request.json()
    messages = body.get("messages", [])
    user_input = messages[-1]["content"][0]["text"]

    agent_context = AgentContext()
    event_queue: asyncio.Queue = asyncio.Queue()

    async def flow_logic():
        try:
            # Phase 1: Classification
            emit_phase_label(agent_context, "Classification")
            result = Runner.run_streamed(classifier, to_messages(user_input), context=agent_context)
            async for event in stream_agent_response(agent_context, result):
                await event_queue.put(event)
            classification = result.final_output
            await close_workflow(agent_context)

            # Phase 2: Research (conditional)
            if "COMPLEX" in classification.upper():
                emit_phase_label(agent_context, "Research")
                result = Runner.run_streamed(
                    researcher, to_messages(user_input), context=agent_context
                )
                async for event in stream_agent_response(agent_context, result):
                    await event_queue.put(event)
                draft = result.final_output
                await close_workflow(agent_context)
            else:
                draft = user_input

            # Phase 3: Review loop
            for attempt in range(3):
                emit_phase_label(agent_context, f"Review (attempt {attempt + 1})")
                result = Runner.run_streamed(
                    reviewer, to_messages(f"Review:\n{draft}"), context=agent_context
                )
                async for event in stream_agent_response(agent_context, result):
                    await event_queue.put(event)
                review = result.final_output
                await close_workflow(agent_context)

                if "APPROVED" in review.upper():
                    break

                emit_phase_label(agent_context, f"Refinement (attempt {attempt + 1})")
                result = Runner.run_streamed(
                    refiner,
                    to_messages(f"Draft:\n{draft}\n\nFeedback:\n{review}"),
                    context=agent_context,
                )
                async for event in stream_agent_response(agent_context, result):
                    await event_queue.put(event)
                draft = result.final_output
                await close_workflow(agent_context)

            # Phase 4: Final response
            emit_phase_label(agent_context, "Final Response")
            result = Runner.run_streamed(
                responder, to_messages(f"Based on:\n{draft}"), context=agent_context
            )
            async for event in stream_agent_response(agent_context, result):
                await event_queue.put(event)
            await close_workflow(agent_context)

        except Exception:
            try:
                await close_workflow(agent_context)
            except Exception:
                pass
            raise
        finally:
            await event_queue.put(None)

    async def event_generator():
        asyncio.create_task(flow_logic())
        while True:
            event = await event_queue.get()
            if event is None:
                break
            yield f"data: {event.model_dump_json()}\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")

Problems:

  • Manual emit_phase_label() + close_workflow() for every phase
  • async for event in stream_agent_response() repeated everywhere
  • try/finally blocks to ensure close_workflow() on errors
  • Event queue management boilerplate
  • Business logic buried in infrastructure code

The same workflow using the agentic flow approach:

"""AgenticFlow - Same complex flow, clean code."""

import agentic_flow as af

classifier = af.Agent(
    name="classifier",
    instructions="Classify as SIMPLE or COMPLEX.",
    model="gpt-5.2",
    model_settings=af.reasoning("medium"),
)
researcher = af.Agent(name="researcher", instructions="Research.", model="gpt-5.2")
reviewer = af.Agent(name="reviewer", instructions="Review. APPROVED or REJECTED.", model="gpt-5.2")
refiner = af.Agent(name="refiner", instructions="Refine based on feedback.", model="gpt-5.2")
responder = af.Agent(name="responder", instructions="Give final response.", model="gpt-5.2")


async def my_flow(user_message: str) -> str:
    # Internal classification - not saved to session
    async with af.phase("Classification"):
        classification = await classifier(user_message).stream()

    if "COMPLEX" in classification.upper():
        # Internal research - not saved to session
        async with af.phase("Research"):
            draft = await researcher(user_message).stream()
    else:
        draft = user_message

    for attempt in range(3):
        # Internal review - not saved to session
        async with af.phase(f"Review (attempt {attempt + 1})"):
            review = await reviewer(f"Review:\n{draft}").stream()

        if "APPROVED" in review.upper():
            break

        # Internal refinement - not saved to session
        async with af.phase(f"Refinement (attempt {attempt + 1})"):
            draft = await refiner(f"Draft:\n{draft}\n\nFeedback:\n{review}").stream()

    # persist=True saves the final response to session
    async with af.phase("Final Response", persist=True):
        return await responder(f"Based on:\n{draft}").stream()
"""AgenticFlow ChatKit Server - Minimal boilerplate."""

from agents import SQLiteSession
from chatkit.server import ChatKitServer

import agentic_flow as af

from .agenticflow_flow import my_flow


class MyServer(ChatKitServer):
    async def respond(self, thread, item, context):
        user_message = item.content[0].text if item else ""
        session = SQLiteSession(session_id=thread.id, db_path="chat.db")
        runner = af.Runner(flow=my_flow, session=session)

        async for event in af.chatkit.run_with_chatkit_context(
            runner, thread, self.store, context, user_message
        ):
            yield event
"""AgenticFlow CLI - Simple streaming handler."""

from agents import SQLiteSession

import agentic_flow as af

from .agenticflow_flow import my_flow


def cli_handler(event):
    if hasattr(event, "data") and hasattr(event.data, "delta"):
        print(event.data.delta, end="", flush=True)


runner = af.Runner(flow=my_flow, session=SQLiteSession("chat.db"), handler=cli_handler)
result = runner.run_sync("Explain quantum computing")
Aspect Pure SDK AF
Lines of code ~126 ~43
Phase management Manual Automatic
Error handling try/finally Automatic
Adding streaming Rewrite .stream()

How It Works: Call-Spec Discipline

The secret is simple: separate declaration from execution.

import agentic_flow as af

assistant = af.Agent(name="assistant", instructions="Help the user.", model="gpt-5.2")

# Declaration — creates a specification, NOT executed
spec = assistant("Hello")

# Execution — happens here, and ONLY here
result = await spec
Expression What it does Executes?
agent(prompt) Creates ExecutionSpec[T] No
.stream() / .silent() / .isolated() Adds modifiers No
await spec Runs the agent Yes

This makes your code:

  • Readable — Execution points visible by scanning for await
  • Debuggable — Set breakpoints at the single execution trigger
  • Maintainable — Adding streaming is one modifier, not a structural rewrite

Learn more about Call-Spec Discipline


Quick Start

import agentic_flow as af
from agents import SQLiteSession

researcher = af.Agent(name="researcher", instructions="Research topics.", model="gpt-5.2")
responder = af.Agent(name="responder", instructions="Respond to user.", model="gpt-5.2")

async def my_flow(user_message: str) -> str:
    # Internal thinking - not saved to session
    async with af.phase("Research"):
        research = await researcher(user_message).stream()

    # persist=True saves the final response to session
    async with af.phase("Response", persist=True):
        return await responder(f"Based on: {research}").stream()

runner = af.Runner(flow=my_flow, session=SQLiteSession("chat.db"))
result = await runner("What is Python?")

Installation

# With uv (recommended)
uv add ds-agentic-flow

# With pip
pip install ds-agentic-flow

Next Steps

  • Getting Started


    Install and run your first agent in 5 minutes

    Quickstart

  • Concepts


    Deep dive into Call-Spec discipline

    Concepts

  • Examples


    Multi-agent workflows, review loops, and more

    Examples

  • API Reference


    Complete API documentation

    Reference