@corti/agent-sdk0.1.0-alpha

07 — State graph

stateGraph() maintains a shared state dict across nodes and routes dynamically — including cycles. This example builds a clinical triage pipeline: triage → code → review → (loop back if rejected).

Shared stateA plain dict accumulated across every node. Each node returns a partial update that is shallow-merged in.
NodesAsync functions async def(state) → dict. agent_node() wraps an AgentHandle into this shape.
EdgesStatic node names, END, or callables (state) → str | END that pick the next node after the current one updates state.
max_iterationsSafety limit (default 25). The graph stops with terminated_by="maxIterations" if the budget is exhausted.

Graph structure

#   triage ──(urgent?)──► coder ──► reviewer ──(approved?)──► END
#              │                        │
#              ▼ (routine)              ▼ (rejected)
#             END                    coder   ← loop

agent_node()

agent_node(agent, get_input, merge_response) wraps an AgentHandle as a node function:

ArgumentTypePurpose
agentAgentHandleThe agent to invoke.
get_input(state)Callable[[dict], str | list]Extract the agent's input from current state.
merge_response(response, state)Callable[[MessageResponse, dict], dict]Return the state patch to merge in.

You can also write a raw async node function directly:

async def my_node(state):
    result = await my_agent.run(state["note"])
    return {"severity": (result.text or "").strip()}

Full code

"""
07 — Stateful graph routing.

Clinical triage pipeline with a reviewer loop:

  triage → coder → reviewer → END
                 ↑         |
                 └─ reject ┘
"""
import asyncio
from corti_agent_sdk import CortiClient, AgentsClient, stateGraph, agent_node, END

async def main():
    async with CortiClient(
        tenant_name="YOUR_TENANT",
        environment="eu",
        auth={"client_id": "YOUR_ID", "client_secret": "YOUR_SECRET"},
    ) as client:
        agents = AgentsClient(client)

        triage_agent = await agents.create(
            name="sg-triage",
            description="Classifies clinical urgency.",
            system_prompt='Reply with exactly one word: "urgent" or "routine". No punctuation.',
        )
        coder_agent = await agents.create(
            name="sg-coder",
            description="Assigns ICD-10 codes.",
            system_prompt="Suggest up to three ICD-10 codes. Format: comma-separated codes only.",
        )
        reviewer_agent = await agents.create(
            name="sg-reviewer",
            description="Reviews proposed ICD-10 codes.",
            system_prompt=(
                'Your reply MUST begin with exactly "approved:" or "rejected:" '
                "(lowercase, colon). No preamble."
            ),
        )

        graph = (
            stateGraph()
            .add_node("triage",
                agent_node(
                    triage_agent,
                    lambda s: s["note"],
                    lambda r, s: {"severity": r.text or ""},
                ))
            .add_node("coder",
                agent_node(
                    coder_agent,
                    lambda s: s["note"],
                    lambda r, s: {"codes": r.text or ""},
                ))
            .add_node("reviewer",
                agent_node(
                    reviewer_agent,
                    lambda s: f"Note: {s['note']}\n\nProposed codes: {s['codes']}",
                    lambda r, s: {
                        "reviewer_feedback": r.text or "",
                        "approved": (r.text or "").strip().lower().startswith("approved"),
                    },
                ))
            # Only code urgent cases
            .add_edge("triage", lambda s: "coder" if "urgent" in s["severity"].lower() else END)
            # Coder always goes to reviewer
            .add_edge("coder", "reviewer")
            # Loop back if reviewer rejects
            .add_edge("reviewer", lambda s: END if s["approved"] else "coder")
        )

        initial_state = {
            "note": "Patient presents with sudden onset chest pain radiating to left arm.",
            "severity": "",
            "codes": "",
            "reviewer_feedback": "",
            "approved": False,
        }

        result = await graph.run("triage", initial_state, max_iterations=10)

        print("Severity:         ", result.state["severity"])
        print("ICD-10 codes:     ", result.state["codes"])
        print("Reviewer feedback:", result.state["reviewer_feedback"])
        print("Approved:         ", result.state["approved"])
        print()
        print("Execution trace:")
        for step in result.steps:
            print(f"  [{step.node}]", list(step.delta.keys()))
        print("Iterations:   ", result.iterations)
        print("Terminated by:", result.terminated_by)

asyncio.run(main())

Result shape

FieldDescription
result.stateFinal accumulated state dict.
result.stepsList of StateGraphStep(node, delta, state) — one per execution, including repeated nodes from cycles.
result.iterationsTotal node executions. Always equals len(result.steps).
result.terminated_by"end" (edge returned END) · "maxIterations" · "noEdge".
Python note: TypeScript uses terminatedBy (camelCase); Python uses terminated_by (snake_case). Same for stoppedEarlystopped_early in WorkflowResult.

vs workflow()

workflow()stateGraph()
ShapeOrdered list of stepsNamed nodes with explicit edges
Shared statePrevious response text onlyFull dict, accumulated across all nodes
Branchingwhen predicate (skip or run)Routing function — jump to any node
CyclesNot supportedSupported, bounded by max_iterations
Best forFixed, known pipelinesConditional flows, review loops, dynamic routing

Next steps