# Streaming — Python ## Quick Start ```python with client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, messages=[{"role": "user", "content": "Write a story"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True) ``` ### Async ```python async with async_client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, messages=[{"role": "user", "content": "Write a story"}] ) as stream: async for text in stream.text_stream: print(text, end="", flush=True) ``` --- ## Handling Different Content Types Claude may return text, thinking blocks, or tool use. Handle each appropriately: > **Opus 4.7 / Opus 4.6:** Use `thinking: {type: "adaptive"}`. On older models, use `thinking: {type: "enabled", budget_tokens: N}` instead. ```python with client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, thinking={"type": "adaptive"}, messages=[{"role": "user", "content": "Analyze this problem"}] ) as stream: for event in stream: if event.type == "content_block_start": if event.content_block.type == "thinking": print("\n[Thinking...]") elif event.content_block.type == "text": print("\n[Response:]") elif event.type == "content_block_delta": if event.delta.type == "thinking_delta": print(event.delta.thinking, end="", flush=True) elif event.delta.type == "text_delta": print(event.delta.text, end="", flush=True) ``` --- ## Streaming with Tool Use The Python tool runner currently returns complete messages. Use streaming for individual API calls within a manual loop if you need per-token streaming with tools: ```python with client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, tools=tools, messages=messages ) as stream: for text in stream.text_stream: print(text, end="", flush=True) response = stream.get_final_message() # Continue with tool execution if response.stop_reason == "tool_use" ``` --- ## Getting the Final Message ```python with client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, messages=[{"role": "user", "content": "Hello"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True) # Get full message after streaming final_message = stream.get_final_message() print(f"\n\nTokens used: {final_message.usage.output_tokens}") ``` --- ## Streaming with Progress Updates ```python def stream_with_progress(client, **kwargs): """Stream a response with progress updates.""" total_tokens = 0 content_parts = [] with client.messages.stream(**kwargs) as stream: for event in stream: if event.type == "content_block_delta": if event.delta.type == "text_delta": text = event.delta.text content_parts.append(text) print(text, end="", flush=True) elif event.type == "message_delta": if event.usage and event.usage.output_tokens is not None: total_tokens = event.usage.output_tokens final_message = stream.get_final_message() print(f"\n\n[Tokens used: {total_tokens}]") return "".join(content_parts) ``` --- ## Error Handling in Streams ```python try: with client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, messages=[{"role": "user", "content": "Write a story"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True) except anthropic.APIConnectionError: print("\nConnection lost. Please retry.") except anthropic.RateLimitError: print("\nRate limited. Please wait and retry.") except anthropic.APIStatusError as e: print(f"\nAPI error: {e.status_code}") ``` --- ## Stream Event Types | Event Type | Description | When it fires | | --------------------- | --------------------------- | --------------------------------- | | `message_start` | Contains message metadata | Once at the beginning | | `content_block_start` | New content block beginning | When a text/tool_use block starts | | `content_block_delta` | Incremental content update | For each token/chunk | | `content_block_stop` | Content block complete | When a block finishes | | `message_delta` | Message-level updates | Contains `stop_reason`, usage | | `message_stop` | Message complete | Once at the end | ## Best Practices 1. **Always flush output** — Use `flush=True` to show tokens immediately 2. **Handle partial responses** — If the stream is interrupted, you may have incomplete content 3. **Track token usage** — The `message_delta` event contains usage information 4. **Use timeouts** — Set appropriate timeouts for your application 5. **Default to streaming** — Use `.get_final_message()` to get the complete response even when streaming, giving you timeout protection without needing to handle individual events