05 — Streaming
Use ctx.stream_message() to receive the agent's response as an async generator of StreamEvent dicts. Each event carries a state update, artifact chunk, or the final message.
stream_message(parts)Returns an async generator — use
async for event in ctx.stream_message(...):statusUpdateState-transition events:
submitted → working → completed.artifactUpdateStructured output chunks streamed incrementally.
messageFinal assembled message — equivalent to what
send_message() returns.Event shape
The SDK normalises raw A2A events into a wrapped dict before yielding:
| Key | Present when |
|---|---|
event["statusUpdate"] | State transitions (submitted, working, completed, …) |
event["artifactUpdate"] | Structured output chunk available. |
event["message"] | Final assembled agent message. |
event["task"] | Full task object (first event only). |
Full code
"""
05 — Streaming.
Stream the agent's response event-by-event over SSE.
"""
import asyncio
from corti_agent_sdk import CortiClient, AgentsClient
async def main():
async with CortiClient(
tenant_name="YOUR_TENANT",
environment="eu",
auth={"client_id": "YOUR_ID", "client_secret": "YOUR_SECRET"},
) as client:
agents = AgentsClient(client)
agent = await agents.create(
name="stream-demo",
description="Demonstrates streaming responses.",
system_prompt="You are a helpful assistant.",
)
ctx = agent.create_context()
async for event in ctx.stream_message([{"kind": "text", "text": "Tell me about ICD-10."}]):
# State transitions
if su := event.get("statusUpdate"):
state = (su.get("status") or {}).get("state")
print(f" [{state}]")
# Final message text
if msg := event.get("message"):
for part in msg.get("parts", []):
if part.get("kind") == "text":
print("Agent:", part["text"])
print("Context ID:", ctx.id) # available after first event
asyncio.run(main())
Note:
stream_message() does not automatically forward credentials on auth-required the way send_message() does. For MCP auth flows, use send_message() or handle the retry yourself.