claude-code-system-prompts/system-prompts/data-streaming-reference-python.md
2026-02-18 14:50:58 -07:00

5.3 KiB

Streaming — Python

Quick Start

```python with client.messages.stream( model="claude-opus-4-6", max_tokens=1024, messages=[{"role": "user", "content": "Write a story"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True) ```

Async

```python async with async_client.messages.stream( model="claude-opus-4-6", max_tokens=1024, messages=[{"role": "user", "content": "Write a story"}] ) as stream: async for text in stream.text_stream: print(text, end="", flush=True) ```


Handling Different Content Types

Claude may return text, thinking blocks, or tool use. Handle each appropriately:

Opus 4.6: Use `thinking: {type: "adaptive"}`. On older models, use `thinking: {type: "enabled", budget_tokens: N}` instead.

```python with client.messages.stream( model="claude-opus-4-6", max_tokens=16000, thinking={"type": "adaptive"}, messages=[{"role": "user", "content": "Analyze this problem"}] ) as stream: for event in stream: if event.type == "content_block_start": if event.content_block.type == "thinking": print("\n[Thinking...]") elif event.content_block.type == "text": print("\n[Response:]")

    elif event.type == "content_block_delta":
        if event.delta.type == "thinking_delta":
            print(event.delta.thinking, end="", flush=True)
        elif event.delta.type == "text_delta":
            print(event.delta.text, end="", flush=True)

```


Streaming with Tool Use

The Python tool runner currently returns complete messages. Use streaming for individual API calls within a manual loop if you need per-token streaming with tools:

```python with client.messages.stream( model="claude-opus-4-6", max_tokens=4096, tools=tools, messages=messages ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

response = stream.get_final_message()
# Continue with tool execution if response.stop_reason == "tool_use"

```


Getting the Final Message

```python with client.messages.stream( model="claude-opus-4-6", max_tokens=1024, messages=[{"role": "user", "content": "Hello"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True)

# Get full message after streaming
final_message = stream.get_final_message()
print(f"\\n\\nTokens used: {final_message.usage.output_tokens}")

```


Streaming with Progress Updates

```python def stream_with_progress(client, **kwargs): """Stream a response with progress updates.""" total_tokens = 0 content_parts = []

with client.messages.stream(**kwargs) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                text = event.delta.text
                content_parts.append(text)
                print(text, end="", flush=True)

        elif event.type == "message_delta":
            if event.usage and event.usage.output_tokens is not None:
                total_tokens = event.usage.output_tokens

    final_message = stream.get_final_message()

print(f"\\n\\n[Tokens used: {total_tokens}]")
return "".join(content_parts)

```


Error Handling in Streams

```python try: with client.messages.stream( model="claude-opus-4-6", max_tokens=1024, messages=[{"role": "user", "content": "Write a story"}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True) except anthropic.APIConnectionError: print("\nConnection lost. Please retry.") except anthropic.RateLimitError: print("\nRate limited. Please wait and retry.") except anthropic.APIStatusError as e: print(f"\nAPI error: {e.status_code}") ```


Stream Event Types

Event Type Description When it fires
`message_start` Contains message metadata Once at the beginning
`content_block_start` New content block beginning When a text/tool_use block starts
`content_block_delta` Incremental content update For each token/chunk
`content_block_stop` Content block complete When a block finishes
`message_delta` Message-level updates Contains `stop_reason`, usage
`message_stop` Message complete Once at the end

Best Practices

  1. Always flush output — Use `flush=True` to show tokens immediately
  2. Handle partial responses — If the stream is interrupted, you may have incomplete content
  3. Track token usage — The `message_delta` event contains usage information
  4. Use timeouts — Set appropriate timeouts for your application
  5. Default to streaming — Use `.get_final_message()` to get the complete response even when streaming, giving you timeout protection without needing to handle individual events