Streaming lets Claude start sending text back to you immediately — token by token — instead of waiting for the entire response to finish before returning anything. For users, this feels dramatically faster. For developers, it opens up real-time UIs, progressive rendering, and tighter feedback loops.
The Anthropic Python and TypeScript SDKs handle the underlying Server-Sent Events (SSE) protocol for you, exposing clean iterators and context managers so you can focus on what to do with each chunk rather than parsing raw event streams.
Why Use Streaming?
Without streaming, a long Claude response — say, a 500-word explanation — forces the user to stare at a loading spinner for 10-15 seconds, then see everything appear at once. With streaming, they see the first word in under a second and watch the response build in real time. This is the standard experience in Claude.ai, and users now expect it.
Streaming also benefits backend pipelines: you can begin processing early chunks while later chunks are still generating, reducing end-to-end latency in multi-step workflows.
Basic Streaming in Python
The Anthropic Python SDK provides a stream() method that returns a context manager. The simplest way to stream and print text:
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain quantum entanglement simply."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # newline after stream endsstream.text_stream is a generator that yields text delta strings as they arrive. It filters out all non-text events automatically, so you only see the actual words Claude is writing.
Getting the Final Message After Streaming
Sometimes you want to stream for the user experience but still access the complete final message (for logging, storage, or downstream processing). Use stream.get_final_message() inside the context manager, or after it exits:
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a haiku about recursion."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
# Available after the stream completes (still inside context manager)
final_message = stream.get_final_message()
print(f"\nStop reason: {final_message.stop_reason}")
print(f"Input tokens: {final_message.usage.input_tokens}")
print(f"Output tokens: {final_message.usage.output_tokens}")The final_message is a standard Message object identical to what non-streaming client.messages.create() returns. Token counts, stop reason, and model are all present.
Async Streaming
For web servers, async frameworks (FastAPI, aiohttp), or any code using asyncio, use the async client and async with:
import asyncio
import anthropic
async def stream_response(prompt: str) -> str:
client = anthropic.AsyncAnthropic()
full_text = ""
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
full_text += text
return full_text
asyncio.run(stream_response("What is the best sorting algorithm?"))The async version is identical in structure to the sync version. Swap client.messages.stream() for async with, and for text for async for text.
Streaming in TypeScript / Node.js
The TypeScript SDK provides the same streaming pattern:
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
const stream = client.messages.stream({
model: "claude-sonnet-4-6",
max_tokens: 1024,
messages: [{ role: "user", content: "Explain streaming APIs in one paragraph." }],
});
for await (const event of stream) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
process.stdout.write(event.delta.text);
}
}
const finalMessage = await stream.finalMessage();
console.log(`\nTokens used: ${finalMessage.usage.output_tokens}`);If you prefer a higher-level abstraction, the TypeScript SDK also exposes a .on("text", callback) pattern:
const stream = client.messages
.stream({
model: "claude-sonnet-4-6",
max_tokens: 1024,
messages: [{ role: "user", content: "Tell me a joke." }],
})
.on("text", (text) => {
process.stdout.write(text);
});
await stream.finalMessage();Building a Streaming Web UI with Server-Sent Events
The most common production use case: a web app where the user sees Claude's response stream in real time. The architecture is:
- Browser sends a fetch request to your backend
- Backend opens a streaming request to the Claude API
- Backend forwards chunks to the browser via SSE
- Browser JavaScript appends each chunk to the DOM
FastAPI (Python) SSE endpoint example:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
app = FastAPI()
client = anthropic.AsyncAnthropic()
@app.get("/stream")
async def stream_chat(prompt: str):
async def generate():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for text in stream.text_stream:
# SSE format: "data: " prefix + double newline
yield f"data: {text}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)Frontend JavaScript to consume the SSE stream:
async function streamResponse(prompt) {
const output = document.getElementById("output");
output.textContent = "";
const response = await fetch(`/stream?prompt=${encodeURIComponent(prompt)}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split("\n");
for (const line of lines) {
if (line.startsWith("data: ")) {
const text = line.slice(6);
if (text === "[DONE]") return;
output.textContent += text;
}
}
}
}The X-Accel-Buffering: no header prevents Nginx from buffering the stream. Without it, chunks batch up and the real-time effect disappears.
Low-Level Event Streaming
If you need full control over every event in the stream — including message_start, content_block_start, content_block_delta, and message_stop — iterate over the raw stream instead of text_stream:
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
) as stream:
for event in stream:
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(f"Text chunk: {event.delta.text!r}")
elif event.type == "message_stop":
print("Stream complete")This matters for tool use: when Claude calls a tool, the delta type is input_json_delta (not text_delta), and you need the raw event to detect which tool is being called and accumulate its JSON arguments.
Streaming with Tool Use
Tool call arguments stream as partial JSON in input_json_delta events. The recommended pattern is to accumulate the JSON string, then parse it when the content block stops:
import json
import anthropic
client = anthropic.Anthropic()
tools = [{
"name": "get_weather",
"description": "Get current weather for a city.",
"input_schema": {
"type": "object",
"properties": {"city": {"type": "string"}},
"required": ["city"]
}
}]
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
tools=tools,
messages=[{"role": "user", "content": "What's the weather in Tokyo?"}]
) as stream:
current_tool_json = ""
current_tool_name = ""
for event in stream:
if event.type == "content_block_start":
if hasattr(event.content_block, "name"):
current_tool_name = event.content_block.name
current_tool_json = ""
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.delta.type == "input_json_delta":
current_tool_json += event.delta.partial_json
elif event.type == "content_block_stop" and current_tool_name:
tool_input = json.loads(current_tool_json)
print(f"\nTool call: {current_tool_name}({tool_input})")
current_tool_name = ""Error Handling in Streams
Errors in streaming requests are raised as exceptions, same as non-streaming calls. The key difference is timing: a RateLimitError might be raised partway through a stream if you hit limits during generation (rare, but possible).
from anthropic import RateLimitError, APIStatusError
try:
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
yield text # or print, or send to client
except RateLimitError:
# Back off and retry, or queue the request
print("Rate limit hit — retry after delay")
except APIStatusError as e:
print(f"API error {e.status_code}: {e.message}")If you are streaming to a web client and an error occurs mid-stream, you will need a convention to signal the error over SSE. A common pattern is to send data: [ERROR] message here and handle it in the frontend.
Streaming vs. Non-Streaming: When to Use Each
| Use Case | Recommendation |
|---|---|
| User-facing chat / assistant | Always stream — dramatic UX improvement |
| Background batch processing | Non-streaming — simpler, no SSE overhead |
| Classification tasks (short output) | Non-streaming — response is fast anyway |
| Code generation with live preview | Stream — users can start reading early |
| Multi-step pipelines (output feeds next step) | Either — stream if intermediate steps are fast enough; non-streaming if you need the full output before proceeding |
| Logging / auditing (need complete message) | Stream for UX, then call get_final_message() for the record |
Token Counts with Streaming
Token usage is only available after the stream completes — you cannot know output token count mid-stream because Claude is still generating. Use get_final_message().usage after the context manager exits to get exact counts for billing and monitoring.
with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=messages
) as stream:
for text in stream.text_stream:
deliver_to_client(text)
usage = stream.get_final_message().usage
# Log after stream is done
log_usage(input_tokens=usage.input_tokens, output_tokens=usage.output_tokens)The Bottom Line
Streaming is the right default for any user-facing Claude integration. The Python and TypeScript SDKs make it straightforward — client.messages.stream() with text_stream handles 90% of cases. For SSE web UIs, forward chunks with the data: prefix and a sentinel like [DONE]. For tool use or raw event inspection, iterate the stream directly and handle content_block_delta events by type.
Next in this series: Claude Vision and Multimodal — how to send images, PDFs, and mixed content to the API. For related topics, see our guides on prompt caching (cuts streaming costs when your system prompt is large) and context window management.