← Back to Blog

Claude Streaming: Real-Time API Responses (2026)

How to stream responses from Claude's API: Python SDK, async streaming, SSE for web UIs, TypeScript, tool use, and error handling.


Streaming lets Claude start sending text back to you immediately — token by token — instead of waiting for the entire response to finish before returning anything. For users, this feels dramatically faster. For developers, it opens up real-time UIs, progressive rendering, and tighter feedback loops.

The Anthropic Python and TypeScript SDKs handle the underlying Server-Sent Events (SSE) protocol for you, exposing clean iterators and context managers so you can focus on what to do with each chunk rather than parsing raw event streams.

Why Use Streaming?

Without streaming, a long Claude response — say, a 500-word explanation — forces the user to stare at a loading spinner for 10-15 seconds, then see everything appear at once. With streaming, they see the first word in under a second and watch the response build in real time. This is the standard experience in Claude.ai, and users now expect it.

Streaming also benefits backend pipelines: you can begin processing early chunks while later chunks are still generating, reducing end-to-end latency in multi-step workflows.

Basic Streaming in Python

The Anthropic Python SDK provides a stream() method that returns a context manager. The simplest way to stream and print text:

import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain quantum entanglement simply."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

print()  # newline after stream ends

stream.text_stream is a generator that yields text delta strings as they arrive. It filters out all non-text events automatically, so you only see the actual words Claude is writing.

Getting the Final Message After Streaming

Sometimes you want to stream for the user experience but still access the complete final message (for logging, storage, or downstream processing). Use stream.get_final_message() inside the context manager, or after it exits:

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku about recursion."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
    
    # Available after the stream completes (still inside context manager)
    final_message = stream.get_final_message()
    
print(f"\nStop reason: {final_message.stop_reason}")
print(f"Input tokens: {final_message.usage.input_tokens}")
print(f"Output tokens: {final_message.usage.output_tokens}")

The final_message is a standard Message object identical to what non-streaming client.messages.create() returns. Token counts, stop reason, and model are all present.

Async Streaming

For web servers, async frameworks (FastAPI, aiohttp), or any code using asyncio, use the async client and async with:

import asyncio
import anthropic

async def stream_response(prompt: str) -> str:
    client = anthropic.AsyncAnthropic()
    full_text = ""
    
    async with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
            full_text += text
    
    return full_text

asyncio.run(stream_response("What is the best sorting algorithm?"))

The async version is identical in structure to the sync version. Swap client.messages.stream() for async with, and for text for async for text.

Streaming in TypeScript / Node.js

The TypeScript SDK provides the same streaming pattern:

import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

const stream = client.messages.stream({
  model: "claude-sonnet-4-6",
  max_tokens: 1024,
  messages: [{ role: "user", content: "Explain streaming APIs in one paragraph." }],
});

for await (const event of stream) {
  if (
    event.type === "content_block_delta" &&
    event.delta.type === "text_delta"
  ) {
    process.stdout.write(event.delta.text);
  }
}

const finalMessage = await stream.finalMessage();
console.log(`\nTokens used: ${finalMessage.usage.output_tokens}`);

If you prefer a higher-level abstraction, the TypeScript SDK also exposes a .on("text", callback) pattern:

const stream = client.messages
  .stream({
    model: "claude-sonnet-4-6",
    max_tokens: 1024,
    messages: [{ role: "user", content: "Tell me a joke." }],
  })
  .on("text", (text) => {
    process.stdout.write(text);
  });

await stream.finalMessage();

Building a Streaming Web UI with Server-Sent Events

The most common production use case: a web app where the user sees Claude's response stream in real time. The architecture is:

  1. Browser sends a fetch request to your backend
  2. Backend opens a streaming request to the Claude API
  3. Backend forwards chunks to the browser via SSE
  4. Browser JavaScript appends each chunk to the DOM

FastAPI (Python) SSE endpoint example:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()
client = anthropic.AsyncAnthropic()

@app.get("/stream")
async def stream_chat(prompt: str):
    async def generate():
        async with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}]
        ) as stream:
            async for text in stream.text_stream:
                # SSE format: "data: " prefix + double newline
                yield f"data: {text}\n\n"
        yield "data: [DONE]\n\n"
    
    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
    )

Frontend JavaScript to consume the SSE stream:

async function streamResponse(prompt) {
  const output = document.getElementById("output");
  output.textContent = "";
  
  const response = await fetch(`/stream?prompt=${encodeURIComponent(prompt)}`);
  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    
    const chunk = decoder.decode(value);
    const lines = chunk.split("\n");
    
    for (const line of lines) {
      if (line.startsWith("data: ")) {
        const text = line.slice(6);
        if (text === "[DONE]") return;
        output.textContent += text;
      }
    }
  }
}

The X-Accel-Buffering: no header prevents Nginx from buffering the stream. Without it, chunks batch up and the real-time effect disappears.

Low-Level Event Streaming

If you need full control over every event in the stream — including message_start, content_block_start, content_block_delta, and message_stop — iterate over the raw stream instead of text_stream:

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hello"}]
) as stream:
    for event in stream:
        if event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(f"Text chunk: {event.delta.text!r}")
        elif event.type == "message_stop":
            print("Stream complete")

This matters for tool use: when Claude calls a tool, the delta type is input_json_delta (not text_delta), and you need the raw event to detect which tool is being called and accumulate its JSON arguments.

Streaming with Tool Use

Tool call arguments stream as partial JSON in input_json_delta events. The recommended pattern is to accumulate the JSON string, then parse it when the content block stops:

import json
import anthropic

client = anthropic.Anthropic()

tools = [{
    "name": "get_weather",
    "description": "Get current weather for a city.",
    "input_schema": {
        "type": "object",
        "properties": {"city": {"type": "string"}},
        "required": ["city"]
    }
}]

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=1024,
    tools=tools,
    messages=[{"role": "user", "content": "What's the weather in Tokyo?"}]
) as stream:
    current_tool_json = ""
    current_tool_name = ""
    
    for event in stream:
        if event.type == "content_block_start":
            if hasattr(event.content_block, "name"):
                current_tool_name = event.content_block.name
                current_tool_json = ""
        
        elif event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
            elif event.delta.type == "input_json_delta":
                current_tool_json += event.delta.partial_json
        
        elif event.type == "content_block_stop" and current_tool_name:
            tool_input = json.loads(current_tool_json)
            print(f"\nTool call: {current_tool_name}({tool_input})")
            current_tool_name = ""

Error Handling in Streams

Errors in streaming requests are raised as exceptions, same as non-streaming calls. The key difference is timing: a RateLimitError might be raised partway through a stream if you hit limits during generation (rare, but possible).

from anthropic import RateLimitError, APIStatusError

try:
    with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            yield text  # or print, or send to client

except RateLimitError:
    # Back off and retry, or queue the request
    print("Rate limit hit — retry after delay")
except APIStatusError as e:
    print(f"API error {e.status_code}: {e.message}")

If you are streaming to a web client and an error occurs mid-stream, you will need a convention to signal the error over SSE. A common pattern is to send data: [ERROR] message here and handle it in the frontend.

Streaming vs. Non-Streaming: When to Use Each

Use CaseRecommendation
User-facing chat / assistantAlways stream — dramatic UX improvement
Background batch processingNon-streaming — simpler, no SSE overhead
Classification tasks (short output)Non-streaming — response is fast anyway
Code generation with live previewStream — users can start reading early
Multi-step pipelines (output feeds next step)Either — stream if intermediate steps are fast enough; non-streaming if you need the full output before proceeding
Logging / auditing (need complete message)Stream for UX, then call get_final_message() for the record

Token Counts with Streaming

Token usage is only available after the stream completes — you cannot know output token count mid-stream because Claude is still generating. Use get_final_message().usage after the context manager exits to get exact counts for billing and monitoring.

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=2048,
    messages=messages
) as stream:
    for text in stream.text_stream:
        deliver_to_client(text)
    
    usage = stream.get_final_message().usage

# Log after stream is done
log_usage(input_tokens=usage.input_tokens, output_tokens=usage.output_tokens)

The Bottom Line

Streaming is the right default for any user-facing Claude integration. The Python and TypeScript SDKs make it straightforward — client.messages.stream() with text_stream handles 90% of cases. For SSE web UIs, forward chunks with the data: prefix and a sentinel like [DONE]. For tool use or raw event inspection, iterate the stream directly and handle content_block_delta events by type.

Next in this series: Claude Vision and Multimodal — how to send images, PDFs, and mixed content to the API. For related topics, see our guides on prompt caching (cuts streaming costs when your system prompt is large) and context window management.


Free: Claude custom instructions template pack

Eight copy-paste templates — developer, writer, analyst, CLAUDE.md starter, and more. Plus new guides in your inbox. No spam, unsubscribe anytime.

Or grab the templates directly — no email needed

Keep learning — for free

50+ AI courses. 590+ lessons. No paywall for starters.

Need help building this?

We build MCP servers, Claude workflows, and AI agents for teams. Strategy calls start at $150/hr.