@corti/agent-sdk0.1.0-alpha

03 — Workflow

A Workflow is a deterministic, code-first pipeline. Steps run in order; each step receives the previous step's text as input. Use when, transform, and retries to add conditional logic.

workflow(steps)Factory that accepts AgentHandles, Parallel groups, or WorkflowStep dicts.
whenOptional predicate (prev: MessageResponse) → bool. If it returns False, the step is skipped.
transformOptional function (prev: MessageResponse) → str | list[Part]. Reshape the output before passing to the next step.
retriesRe-attempt the step up to N extra times on failure. Combine with retry_delay (seconds).

Step options

KeyTypeDefaultDescription
agentAgentHandle | ParallelrequiredThe agent or parallel group to run.
whenCallable[[MessageResponse], bool]always runSkip this step if the predicate returns False.
transformCallable[[MessageResponse], str | list]pass text throughShape the previous output before passing to this step.
retriesint0Extra attempts on "failed" status.
retry_delayfloat1.0Seconds to wait between retries.

Full code

"""
03 — Workflow.

A three-step pipeline: extract → code → review (conditional).
"""
import asyncio
from corti_agent_sdk import CortiClient, AgentsClient, workflow, WorkflowStep

async def main():
    async with CortiClient(
        tenant_name="YOUR_TENANT",
        environment="eu",
        auth={"client_id": "YOUR_ID", "client_secret": "YOUR_SECRET"},
    ) as client:
        agents = AgentsClient(client)

        extractor = await agents.create(
            name="extractor",
            description="Extracts key symptoms from a clinical note.",
            system_prompt="List the key symptoms as a comma-separated string. No prose.",
        )
        coder = await agents.create(
            name="coder",
            description="Maps symptoms to ICD-10 codes.",
            system_prompt="Return only comma-separated ICD-10 codes.",
        )
        reviewer = await agents.create(
            name="reviewer",
            description="Flags potentially incorrect codes.",
            system_prompt='Reply "ok" if the codes look correct, otherwise explain why.',
        )

        result = await workflow([
            extractor,   # bare AgentHandle — shorthand for WorkflowStep(agent=extractor)

            coder,

            WorkflowStep(
                agent=reviewer,
                # Only run if coder returned something
                when=lambda r: bool(r.text),
                # Prepend context so the reviewer knows what note was coded
                transform=lambda r: f"Codes: {r.text}",
                retries=1,
                retry_delay=2.0,
            ),
        ]).run("Patient presents with chest pain and dyspnoea.")

        print("Final output:", result.output.text)
        print("Steps run:   ", len(result.steps))
        print("Stopped early:", result.stopped_early)

asyncio.run(main())

Result shape

FieldDescription
result.outputMessageResponse from the last executed step.
result.stepsList of MessageResponse for every step that ran (skipped steps excluded).
result.stopped_earlyTrue if a step's status was "failed" and execution halted.

Next steps