claude-code-system-prompts/system-prompts/data-claude-api-reference-python.md
2026-06-09 10:53:08 -06:00

16 KiB

Claude API — Python

Installation

pip install anthropic

Client Initialization

import anthropic

# Default — resolves credentials from the environment:
# ANTHROPIC_API_KEY, or ANTHROPIC_AUTH_TOKEN, or an `ant auth login` profile.
# Prefer this for local dev; don't hardcode a key.
client = anthropic.Anthropic()

# Explicit API key (only when you must inject a specific key)
client = anthropic.Anthropic(api_key="your-api-key")

# Async client
async_client = anthropic.AsyncAnthropic()

Client Configuration

Per-request overrides

Use with_options() to override client settings for a single call without mutating the client:

client.with_options(timeout=5.0, max_retries=5).messages.create(
    model="{{OPUS_ID}}",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}],
)

Timeouts

Default request timeout is 10 minutes. Pass a float (seconds) or an httpx.Timeout for granular control. On timeout the SDK raises anthropic.APITimeoutError (and retries per max_retries).

import httpx

client = anthropic.Anthropic(timeout=20.0)
client = anthropic.Anthropic(
    timeout=httpx.Timeout(60.0, read=5.0, write=10.0, connect=2.0),
)

Retries

The SDK auto-retries connection errors, 408, 409, 429, and ≥500 with exponential backoff (default 2 retries). Set max_retries on the client or via with_options(); max_retries=0 disables.

Async performance (aiohttp backend)

For high-concurrency async workloads, install anthropic[aiohttp] and pass DefaultAioHttpClient instead of the default httpx backend:

from anthropic import AsyncAnthropic, DefaultAioHttpClient

async with AsyncAnthropic(http_client=DefaultAioHttpClient()) as client:
    ...

Custom HTTP client (proxy, base URL)

Use DefaultHttpxClient / DefaultAsyncHttpxClient — not raw httpx.Client — so the SDK's default timeouts and connection limits are preserved:

from anthropic import Anthropic, DefaultHttpxClient

client = Anthropic(
    base_url="http://my.test.server.example.com:8083",  # or ANTHROPIC_BASE_URL env var
    http_client=DefaultHttpxClient(proxy="http://my.test.proxy.example.com"),
)

Logging

Set ANTHROPIC_LOG=debug (or info) to enable SDK logging via the standard logging module.


Basic Message Request

response = client.messages.create(
    model="{{OPUS_ID}}",
    max_tokens=16000,
    messages=[
        {"role": "user", "content": "What is the capital of France?"}
    ]
)
# response.content is a list of content block objects (TextBlock, ThinkingBlock,
# ToolUseBlock, ...). Check .type before accessing .text.
for block in response.content:
    if block.type == "text":
        print(block.text)

System Prompts

response = client.messages.create(
    model="{{OPUS_ID}}",
    max_tokens=16000,
    system="You are a helpful coding assistant. Always provide examples in Python.",
    messages=[{"role": "user", "content": "How do I read a JSON file?"}]
)

Mid-conversation system messages (beta, model-gated)

For operator instructions that arrive mid-conversation (mode switches, injected state), append {"role": "system", ...} to messages instead of editing top-level system — this preserves the cached prefix and carries operator authority. Must follow a user message; cannot be messages[0]. Unsupported models return a 400 (role 'system' is not supported on this model). See shared/prompt-caching.md for when to use this vs. top-level system.

response = client.messages.create(
    model=MODEL_ID,  # must support mid-conversation system messages
    max_tokens=16000,
    system=[{"type": "text", "text": STABLE_SYSTEM, "cache_control": {"type": "ephemeral"}}],
    messages=history + [
        {"role": "user", "content": user_message},
        {"role": "system", "content": "Terse mode enabled — keep responses under 40 words."},
    ],
    extra_headers={"anthropic-beta": "mid-conversation-system-2026-04-07"},
)

Vision (Images)

Base64

import base64

with open("image.png", "rb") as f:
    image_data = base64.standard_b64encode(f.read()).decode("utf-8")

response = client.messages.create(
    model="{{OPUS_ID}}",
    max_tokens=16000,
    messages=[{
        "role": "user",
        "content": [
            {
                "type": "image",
                "source": {
                    "type": "base64",
                    "media_type": "image/png",
                    "data": image_data
                }
            },
            {"type": "text", "text": "What's in this image?"}
        ]
    }]
)

URL

response = client.messages.create(
    model="{{OPUS_ID}}",
    max_tokens=16000,
    messages=[{
        "role": "user",
        "content": [
            {
                "type": "image",
                "source": {
                    "type": "url",
                    "url": "https://example.com/image.png"
                }
            },
            {"type": "text", "text": "Describe this image"}
        ]
    }]
)

Prompt Caching

Cache large context to reduce costs (up to 90% savings). Caching is a prefix match — any byte change anywhere in the prefix invalidates everything after it. For placement patterns, architectural guidance (frozen system prompt, deterministic tool order, where to put volatile content), and the silent-invalidator audit checklist, read shared/prompt-caching.md.

Use top-level cache_control to automatically cache the last cacheable block in the request — no need to annotate individual content blocks:

response = client.messages.create(
    model="{{OPUS_ID}}",
    max_tokens=16000,
    cache_control={"type": "ephemeral"},  # auto-caches the last cacheable block
    system="You are an expert on this large document...",
    messages=[{"role": "user", "content": "Summarize the key points"}]
)

Manual Cache Control

For fine-grained control, add cache_control to specific content blocks:

response = client.messages.create(
    model="{{OPUS_ID}}",
    max_tokens=16000,
    system=[{
        "type": "text",
        "text": "You are an expert on this large document...",
        "cache_control": {"type": "ephemeral"}  # default TTL is 5 minutes
    }],
    messages=[{"role": "user", "content": "Summarize the key points"}]
)

# With explicit TTL (time-to-live)
response = client.messages.create(
    model="{{OPUS_ID}}",
    max_tokens=16000,
    system=[{
        "type": "text",
        "text": "You are an expert on this large document...",
        "cache_control": {"type": "ephemeral", "ttl": "1h"}  # 1 hour TTL
    }],
    messages=[{"role": "user", "content": "Summarize the key points"}]
)

Verifying Cache Hits

print(response.usage.cache_creation_input_tokens)  # tokens written to cache (~1.25x cost)
print(response.usage.cache_read_input_tokens)      # tokens served from cache (~0.1x cost)
print(response.usage.input_tokens)                 # uncached tokens (full cost)

If cache_read_input_tokens is zero across repeated identical-prefix requests, a silent invalidator is at work — datetime.now() or a UUID in the system prompt, unsorted json.dumps(), or a varying tool set. See shared/prompt-caching.md for the full audit table.


Extended Thinking

Fable 5, Opus 4.8, Opus 4.7, Opus 4.6, and Sonnet 4.6: Use adaptive thinking. budget_tokens is removed on Fable 5, Opus 4.8, and 4.7 (400 if sent); deprecated on Opus 4.6 and Sonnet 4.6. Older models: Use thinking: {type: "enabled", budget_tokens: N} (must be < max_tokens, min 1024).

# Fable 5 / Opus 4.8 / 4.7 / 4.6: adaptive thinking (recommended)
response = client.messages.create(
    model="{{OPUS_ID}}",
    max_tokens=16000,
    thinking={"type": "adaptive"},
    output_config={"effort": "high"},  # low | medium | high | max
    messages=[{"role": "user", "content": "Solve this step by step..."}]
)

# Access thinking and response
for block in response.content:
    if block.type == "thinking":
        print(f"Thinking: {block.thinking}")
    elif block.type == "text":
        print(f"Response: {block.text}")

Error Handling

import anthropic

try:
    response = client.messages.create(...)
except anthropic.BadRequestError as e:
    print(f"Bad request: {e.message}")
except anthropic.AuthenticationError:
    print("Invalid API key")
except anthropic.PermissionDeniedError:
    print("API key lacks required permissions")
except anthropic.NotFoundError:
    print("Invalid model or endpoint")
except anthropic.RateLimitError as e:
    retry_after = int(e.response.headers.get("retry-after", "60"))
    print(f"Rate limited. Retry after {retry_after}s.")
except anthropic.APIStatusError as e:
    if e.status_code >= 500:
        print(f"Server error ({e.status_code}). Retry later.")
    else:
        print(f"API error: {e.message}")
except anthropic.APIConnectionError:
    print("Network error. Check internet connection.")

Response Helpers

Every response object exposes _request_id (populated from the request-id header) — log it when reporting failures to Anthropic. Despite the underscore prefix, this property is public.

message = client.messages.create(...)
print(message._request_id)       # req_018EeWyXxfu5pfWkrYcMdjWG
print(message.to_json())          # serialize the Pydantic model
print(message.to_dict())          # plain dict

To access raw headers or other response metadata, use .with_raw_response:

raw = client.messages.with_raw_response.create(
    model="{{OPUS_ID}}",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}],
)
print(raw.headers.get("request-id"))
message = raw.parse()  # the Message object messages.create() would have returned

Multi-Turn Conversations

The API is stateless — send the full conversation history each time.

class ConversationManager:
    """Manage multi-turn conversations with the Claude API."""

    def __init__(self, client: anthropic.Anthropic, model: str, system: str = None):
        self.client = client
        self.model = model
        self.system = system
        self.messages = []

    def send(self, user_message: str, **kwargs) -> str:
        """Send a message and get a response."""
        self.messages.append({"role": "user", "content": user_message})

        response = self.client.messages.create(
            model=self.model,
            max_tokens=kwargs.get("max_tokens", 16000),
            system=self.system,
            messages=self.messages,
            **kwargs
        )

        assistant_message = next(
            (b.text for b in response.content if b.type == "text"), ""
        )
        self.messages.append({"role": "assistant", "content": assistant_message})

        return assistant_message

# Usage
conversation = ConversationManager(
    client=anthropic.Anthropic(),
    model="{{OPUS_ID}}",
    system="You are a helpful assistant."
)

response1 = conversation.send("My name is Alice.")
response2 = conversation.send("What's my name?")  # Claude remembers "Alice"

Rules:

  • Consecutive same-role messages are allowed — the API combines them into a single turn
  • First message must be user
  • role: "system" messages are allowed mid-conversation under the mid-conversation-system-2026-04-07 beta on supporting models — see § Mid-conversation system messages above

Compaction (long conversations)

Beta, Fable 5, Opus 4.8, Opus 4.7, Opus 4.6, and Sonnet 4.6. When conversations approach the 200K context window, compaction automatically summarizes earlier context server-side. The API returns a compaction block; you must pass it back on subsequent requests — append response.content, not just the text.

import anthropic

client = anthropic.Anthropic()
messages = []

def chat(user_message: str) -> str:
    messages.append({"role": "user", "content": user_message})

    response = client.beta.messages.create(
        betas=["compact-2026-01-12"],
        model="{{OPUS_ID}}",
        max_tokens=16000,
        messages=messages,
        context_management={
            "edits": [{"type": "compact_20260112"}]
        }
    )

    # Append full content — compaction blocks must be preserved
    messages.append({"role": "assistant", "content": response.content})

    return next(block.text for block in response.content if block.type == "text")

# Compaction triggers automatically when context grows large
print(chat("Help me build a Python web scraper"))
print(chat("Add support for JavaScript-rendered pages"))
print(chat("Now add rate limiting and error handling"))

Stop Reasons

The stop_reason field in the response indicates why the model stopped generating:

Value Meaning
end_turn Claude finished its response naturally
max_tokens Hit the max_tokens limit — increase it or use streaming
stop_sequence Hit a custom stop sequence
tool_use Claude wants to call a tool — execute it and continue
pause_turn Model paused and can be resumed (agentic flows)
refusal Claude refused for safety reasons — check stop_details

Structured Stop Details

When stop_reason is "refusal", the response includes a stop_details object with structured information about the refusal:

if response.stop_reason == "refusal" and response.stop_details:
    print(f"Category: {response.stop_details.category}")   # "cyber" | "bio" | None
    print(f"Explanation: {response.stop_details.explanation}")

Cost Optimization Strategies

1. Use Prompt Caching for Repeated Context

# Automatic caching (simplest — caches the last cacheable block)
response = client.messages.create(
    model="{{OPUS_ID}}",
    max_tokens=16000,
    cache_control={"type": "ephemeral"},
    system=large_document_text,  # e.g., 50KB of context
    messages=[{"role": "user", "content": "Summarize the key points"}]
)

# First request: full cost
# Subsequent requests: ~90% cheaper for cached portion

2. Choose the Right Model

# Default to Opus for most tasks
response = client.messages.create(
    model="{{OPUS_ID}}",  # $5.00/$25.00 per 1M tokens
    max_tokens=16000,
    messages=[{"role": "user", "content": "Explain quantum computing"}]
)

# Use Sonnet for high-volume production workloads
standard_response = client.messages.create(
    model="{{SONNET_ID}}",  # $3.00/$15.00 per 1M tokens
    max_tokens=16000,
    messages=[{"role": "user", "content": "Summarize this document"}]
)

# Use Haiku only for simple, speed-critical tasks
simple_response = client.messages.create(
    model="{{HAIKU_ID}}",  # $1.00/$5.00 per 1M tokens
    max_tokens=256,
    messages=[{"role": "user", "content": "Classify this as positive or negative"}]
)

3. Use Token Counting Before Requests

count_response = client.messages.count_tokens(
    model="{{OPUS_ID}}",
    messages=messages,
    system=system
)

estimated_input_cost = count_response.input_tokens * 0.000005  # $5/1M tokens
print(f"Estimated input cost: ${estimated_input_cost:.4f}")

Retry with Exponential Backoff

Note: The Anthropic SDK automatically retries rate limit (429) and server errors (5xx) with exponential backoff. You can configure this with max_retries (default: 2). Only implement custom retry logic if you need behavior beyond what the SDK provides.

import time
import random
import anthropic

def call_with_retry(
    client: anthropic.Anthropic,
    max_retries: int = 5,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    **kwargs
):
    """Call the API with exponential backoff retry."""
    last_exception = None

    for attempt in range(max_retries):
        try:
            return client.messages.create(**kwargs)
        except anthropic.RateLimitError as e:
            last_exception = e
        except anthropic.APIStatusError as e:
            if e.status_code >= 500:
                last_exception = e
            else:
                raise  # Client errors (4xx except 429) should not be retried

        delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
        print(f"Retry {attempt + 1}/{max_retries} after {delay:.1f}s")
        time.sleep(delay)

    raise last_exception