5.3 KiB
Streaming — Python
Quick Start
```python with client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, messages=[{"role": "user", "content": "Write a story"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True) ```
Async
```python async with async_client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, messages=[{"role": "user", "content": "Write a story"}] ) as stream: async for text in stream.text_stream: print(text, end="", flush=True) ```
Handling Different Content Types
Claude may return text, thinking blocks, or tool use. Handle each appropriately:
Opus 4.6: Use `thinking: {type: "adaptive"}`. On older models, use `thinking: {type: "enabled", budget_tokens: N}` instead.
```python with client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, thinking={"type": "adaptive"}, messages=[{"role": "user", "content": "Analyze this problem"}] ) as stream: for event in stream: if event.type == "content_block_start": if event.content_block.type == "thinking": print("\n[Thinking...]") elif event.content_block.type == "text": print("\n[Response:]")
elif event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
print(event.delta.thinking, end="", flush=True)
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
```
Streaming with Tool Use
The Python tool runner currently returns complete messages. Use streaming for individual API calls within a manual loop if you need per-token streaming with tools:
```python with client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, tools=tools, messages=messages ) as stream: for text in stream.text_stream: print(text, end="", flush=True)
response = stream.get_final_message()
# Continue with tool execution if response.stop_reason == "tool_use"
```
Getting the Final Message
```python with client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, messages=[{"role": "user", "content": "Hello"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)
# Get full message after streaming
final_message = stream.get_final_message()
print(f"\\n\\nTokens used: {final_message.usage.output_tokens}")
```
Streaming with Progress Updates
```python def stream_with_progress(client, **kwargs): """Stream a response with progress updates.""" total_tokens = 0 content_parts = []
with client.messages.stream(**kwargs) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
text = event.delta.text
content_parts.append(text)
print(text, end="", flush=True)
elif event.type == "message_delta":
if event.usage and event.usage.output_tokens is not None:
total_tokens = event.usage.output_tokens
final_message = stream.get_final_message()
print(f"\\n\\n[Tokens used: {total_tokens}]")
return "".join(content_parts)
```
Error Handling in Streams
```python try: with client.messages.stream( model="{{OPUS_ID}}", max_tokens=64000, messages=[{"role": "user", "content": "Write a story"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True) except anthropic.APIConnectionError: print("\nConnection lost. Please retry.") except anthropic.RateLimitError: print("\nRate limited. Please wait and retry.") except anthropic.APIStatusError as e: print(f"\nAPI error: {e.status_code}") ```
Stream Event Types
| Event Type | Description | When it fires |
|---|---|---|
| `message_start` | Contains message metadata | Once at the beginning |
| `content_block_start` | New content block beginning | When a text/tool_use block starts |
| `content_block_delta` | Incremental content update | For each token/chunk |
| `content_block_stop` | Content block complete | When a block finishes |
| `message_delta` | Message-level updates | Contains `stop_reason`, usage |
| `message_stop` | Message complete | Once at the end |
Best Practices
- Always flush output — Use `flush=True` to show tokens immediately
- Handle partial responses — If the stream is interrupted, you may have incomplete content
- Track token usage — The `message_delta` event contains usage information
- Use timeouts — Set appropriate timeouts for your application
- Default to streaming — Use `.get_final_message()` to get the complete response even when streaming, giving you timeout protection without needing to handle individual events