API Reference¶
Complete API documentation for AF.
Agent¶
Agent[T]¶
Wrapper around SDK Agent that enables callable form.
class af.Agent(Generic[T]):
def __init__(
self,
*,
output_type: type[T] | None = None,
**sdk_kwargs: Any,
) -> None: ...
def __call__(self, input: str) -> af.ExecutionSpec[T]: ...
Parameters:
| Parameter | Type | Description |
|---|---|---|
output_type |
type[T] \| None |
Pydantic model for typed output. If None, T = str. |
**sdk_kwargs |
Any |
All arguments passed to agents.Agent |
Common sdk_kwargs:
| Argument | Type | Description |
|---|---|---|
name |
str |
Agent name (required) |
instructions |
str |
System instructions |
model |
str |
Model name (e.g., "gpt-5.2") |
model_settings |
ModelSettings |
Model configuration |
tools |
list |
Tool functions |
handoffs |
list |
Handoff agents |
Example:
import agentic_flow as af
from pydantic import BaseModel
class Analysis(BaseModel):
sentiment: str
score: float
# str output
assistant = af.Agent(name="assistant", instructions="...", model="gpt-5.2")
# Typed output
analyzer = af.Agent(name="analyzer", instructions="...", output_type=Analysis, model="gpt-5.2")
ExecutionSpec¶
ExecutionSpec[T]¶
Awaitable execution specification. Created by agent(prompt).
@dataclass
class ExecutionSpec(Generic[T]):
sdk_agent: SDKAgent
input: str = ""
is_streaming: bool = False
is_isolated: bool = False
is_snapshot: bool = False
is_silent: bool = False
max_turns_limit: int | None = None
run_kwargs: dict[str, Any] = field(default_factory=dict)
# WHERE axis
def isolated(self) -> ExecutionSpec[T]: ...
def snapshot(self) -> ExecutionSpec[T]: ...
# HOW axis
def stream(self) -> ExecutionSpec[T]: ...
def silent(self) -> ExecutionSpec[T]: ...
# LIMITS axis
def max_turns(self, max_turns: int) -> ExecutionSpec[T]: ...
# SDK pass-through
def run_config(self, run_config: RunConfig) -> ExecutionSpec[T]: ...
def context(self, context: Any) -> ExecutionSpec[T]: ...
def run_kwarg(self, **kwargs: Any) -> ExecutionSpec[T]: ...
def __await__(self): ...
Methods:
| Method | Returns | Axis | Description |
|---|---|---|---|
isolated() |
ExecutionSpec[T] |
WHERE | Execute without context |
snapshot() |
ExecutionSpec[T] |
WHERE | Read-only context snapshot |
stream() |
ExecutionSpec[T] |
HOW | Enable streaming mode |
silent() |
ExecutionSpec[T] |
HOW | Suppress UI display |
max_turns(n) |
ExecutionSpec[T] |
LIMITS | Limit execution turns |
run_config(cfg) |
ExecutionSpec[T] |
SDK | Set RunConfig |
context(ctx) |
ExecutionSpec[T] |
SDK | Inject context (DI) |
run_kwarg(**kw) |
ExecutionSpec[T] |
SDK | Set arbitrary SDK params |
__await__ |
T |
- | Execute and return result |
Example:
spec = assistant("Hello") # ExecutionSpec[str]
spec = spec.stream() # ExecutionSpec[str] with is_streaming=True
result = await spec # str
# With SDK pass-through
result = await agent("task") \
.max_turns(5) \
.context(app_ctx) \
.stream()
Runner¶
Runner¶
Flow execution container with session and handler injection.
class Runner:
def __init__(
self,
flow: Callable[[str], Awaitable[T]],
session: Session | None = None,
handler: af.Handler | None = None,
) -> None: ...
async def __call__(self, user_message: str) -> T: ...
def run(self, user_message: str) -> af.RunHandle: ...
def run_sync(self, user_message: str) -> T: ...
Parameters:
| Parameter | Type | Description |
|---|---|---|
flow |
Callable[[str], Awaitable[T]] |
Async function to execute |
session |
Session \| None |
SDK Session for history |
handler |
Handler \| None |
Event handler |
Methods:
| Method | Returns | Description |
|---|---|---|
__call__(msg) |
T |
Execute flow (async) |
run(msg) |
RunHandle |
Create deferred handle |
run_sync(msg) |
T |
Execute synchronously |
Example:
import agentic_flow as af
from agents import SQLiteSession
runner = af.Runner(
flow=my_flow,
session=SQLiteSession("chat.db"),
handler=my_handler,
)
# Async
result = await runner("Hello")
# Sync
result = runner.run_sync("Hello")
# Deferred
result = runner.run("Hello").sync()
RunHandle¶
RunHandle¶
Deferred execution handle for synchronous contexts.
Methods:
| Method | Returns | Description |
|---|---|---|
sync() |
T |
Execute synchronously |
__await__ |
T |
Execute asynchronously |
phase¶
phase()¶
Context manager for workflow phases.
@asynccontextmanager
async def phase(
label: str,
share_context: bool = True,
persist: bool = False,
) -> AsyncIterator[PhaseSession | None]: ...
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
label |
str |
(required) | Phase display name |
share_context |
bool |
True |
Create PhaseSession |
persist |
bool |
False |
Write to Session at end |
Example:
async with af.phase("Research"):
result = await agent(query).stream()
async with af.phase("Response", persist=True):
return await agent(result).stream()
PhaseSession¶
PhaseSession¶
SessionABC-compliant session for phase execution.
class PhaseSession(SessionABC):
session_id: str
label: str
data: dict
inherited_history: list[TResponseInputItem]
items: list[TResponseInputItem]
async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: ...
async def add_items(self, items: list[TResponseInputItem]) -> None: ...
async def pop_item(self) -> TResponseInputItem | None: ...
async def clear_session(self) -> None: ...
Attributes:
| Attribute | Type | Description |
|---|---|---|
label |
str |
Phase label |
inherited_history |
list |
Session history at phase start (read-only) |
items |
list |
Messages within phase (SDK-managed) |
data |
dict |
Custom data storage |
Methods:
| Method | Returns | Description |
|---|---|---|
get_items(limit) |
list |
inherited_history + items (async) |
add_items(items) |
None |
Add items to items (called by SDK) |
pop_item() |
item \| None |
Pop from items |
clear_session() |
None |
Clear items |
Example:
async with af.phase("Research") as ctx:
await agent(query).stream()
ctx.my_note = "important"
print(ctx.my_note)
Event Types¶
Event¶
Union type for all events:
PhaseStarted¶
PhaseEnded¶
@dataclass(frozen=True)
class PhaseEnded:
type: Literal["phase.ended"]
label: str
elapsed_ms: int
ts: float
AgentResult¶
Handler¶
Handler Events¶
Handler receives AF events only. SDK streaming deltas are consumed internally and not forwarded to the handler. Display is always full-text-at-once.
Events the handler receives:
| Event | When | Key Fields |
|---|---|---|
AgentResult |
After each agent execution | content: Any (full output) |
PhaseStarted |
On phase() entry |
label: str |
PhaseEnded |
On phase() exit |
label: str, elapsed_ms: int |
Display fallback: ChatKit > Handler > print() (mutually exclusive).
Example: Handler
import agentic_flow as af
def handler(event):
if isinstance(event, af.PhaseStarted):
print(f"\n[{event.label}]")
elif isinstance(event, af.PhaseEnded):
print(f" ({event.elapsed_ms}ms)")
elif isinstance(event, af.AgentResult):
print(event.content)
Utilities¶
reasoning()¶
Create ModelSettings with reasoning enabled.
def reasoning(
effort: Literal["low", "medium", "high"] = "medium",
summary: Literal["auto", "concise", "detailed"] = "auto",
**model_settings_kwargs: Any,
) -> ModelSettings: ...
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
effort |
str |
"medium" |
Reasoning effort level |
summary |
str |
"auto" |
Summary style |
**kwargs |
Any |
Additional ModelSettings args |
Example:
import agentic_flow as af
agent = af.Agent(
name="thinker",
instructions="Think step by step.",
model="gpt-5.2",
model_settings=af.reasoning("high"),
)
ChatKit Integration¶
run_with_chatkit_context()¶
Execute Runner with ChatKit context.
async def run_with_chatkit_context(
runner: Runner,
thread: ThreadMetadata,
store: Store,
context: dict[str, Any],
user_message: str,
) -> AsyncIterator[ThreadStreamEvent]: ...
Parameters:
| Parameter | Type | Description |
|---|---|---|
runner |
Runner |
Runner instance |
thread |
ThreadMetadata |
ChatKit thread |
store |
Store |
ChatKit store |
context |
dict |
Request context |
user_message |
str |
User message |
Yields: ThreadStreamEvent