Skip to content

Multi-Agent Workflow

This example demonstrates a multi-agent research workflow with classification, research, and response phases.

Overview

graph LR
    A(User Message) --> B(Classifier)
    B -->|SIMPLE| C(Responder)
    B -->|COMPLEX| D(Researcher)
    D --> E(Responder)
    C --> F(Response)
    E --> F

Complete Example

import agentic_flow as af
from agents import SQLiteSession

# Define agents
classifier = af.Agent(
    name="classifier",
    instructions="""Classify the user's request as SIMPLE or COMPLEX.
    SIMPLE: Greetings, simple questions, basic information
    COMPLEX: Research questions, analysis, comparisons
    Respond with only SIMPLE or COMPLEX.""",
    model="gpt-5.2",
)

researcher = af.Agent(
    name="researcher",
    instructions="""Research the given topic thoroughly.
    Provide detailed findings with sources when possible.""",
    model="gpt-5.2",
    model_settings=af.reasoning("medium"),
)

responder = af.Agent(
    name="responder",
    instructions="""Provide a clear, helpful response to the user.
    If research findings are provided, base your response on them.""",
    model="gpt-5.2",
)


async def multi_agent_flow(user_message: str) -> str:
    # Phase 1: Classification
    async with af.phase("Classification"):
        classification = await classifier(user_message).stream()

    # Phase 2: Research (conditional)
    if "COMPLEX" in classification.upper():
        async with af.phase("Research"):
            research = await researcher(user_message).stream()
        context = f"Research findings:\n{research}"
    else:
        context = f"User message:\n{user_message}"

    # Phase 3: Response
    async with af.phase("Response", persist=True):
        return await responder(context).stream()


# Run
runner = af.Runner(
    flow=multi_agent_flow,
    session=SQLiteSession("multi_agent.db"),
)

result = runner.run_sync("What are the key differences between REST and GraphQL?")
print(result)

Key Points

Conditional Branching

Standard Python if statements control flow:

if "COMPLEX" in classification.upper():
    async with af.phase("Research"):
        research = await researcher(user_message).stream()

Phase Structure

  • Classification: Quick categorization
  • Research: Deep analysis (only for complex queries)
  • Response: User-facing answer (persisted)

Data Flow

Agent outputs flow through Python variables:

classification = await classifier(msg).stream()  # str
research = await researcher(msg).stream()        # str
response = await responder(context).stream()     # str

With Typed Output

Use Pydantic models for structured data:

from pydantic import BaseModel

class Classification(BaseModel):
    category: str  # "SIMPLE" or "COMPLEX"
    confidence: float
    reason: str

classifier = af.Agent(
    name="classifier",
    instructions="Classify the request.",
    output_type=Classification,
    model="gpt-5.2",
)

async def typed_flow(user_message: str) -> str:
    async with af.phase("Classification"):
        result: Classification = await classifier(user_message).stream()

    if result.category == "COMPLEX" and result.confidence > 0.7:
        async with af.phase("Research"):
            research = await researcher(user_message).stream()
        context = f"Research:\n{research}"
    else:
        context = user_message

    async with af.phase("Response", persist=True):
        return await responder(context).stream()

With Handler

Add streaming output for CLI:

import agentic_flow as af

def cli_handler(event):
    if isinstance(event, af.PhaseStarted):
        print(f"\n[{event.label}]")
    elif isinstance(event, af.PhaseEnded):
        print(f"\n[/{event.label}]")
    elif isinstance(event, af.AgentResult):
        print(event.content)

runner = af.Runner(
    flow=multi_agent_flow,
    session=SQLiteSession("multi_agent.db"),
    handler=cli_handler,
)

Parallel Research

For independent research tasks, use asyncio.gather with .isolated() or .snapshot():

  • .isolated() — No context at all (stateless, cheapest)
  • .snapshot() — Read-only context (sees conversation history, doesn't write)
import asyncio

async def parallel_research_flow(user_message: str) -> str:
    async with af.phase("Classification"):
        classification = await classifier(user_message).stream()

    if "COMPLEX" in classification.upper():
        async with af.phase("Research"):
            # Parallel research — each sees phase context but doesn't write
            results = await asyncio.gather(
                researcher(f"Technical aspects of: {user_message}").snapshot(),
                researcher(f"Practical applications of: {user_message}").snapshot(),
                researcher(f"Comparisons and alternatives for: {user_message}").snapshot(),
            )
            research = "\n\n".join(results)
    else:
        research = user_message

    async with af.phase("Response", persist=True):
        return await responder(f"Based on research:\n{research}").stream()

When to use .isolated() vs .snapshot()

Use .isolated() for pure transformations that don't need context (translation, formatting). Use .snapshot() when parallel agents need to see the accumulated conversation or phase context.


Next: Review Loop