Python实现Claude API流式输出教程(附完整代码)

分类:技术交流发布时间:建议阅读时长:31 分钟
作者:sodope llm

引言

在构建AI对话应用时,用户体验是决定产品成败的关键因素之一。当用户提交一个问题后,如果需要等待10秒才能看到完整答案,这种体验显然不够好。而通过**流式输出(Streaming)**技术,可以让模型一边生成内容一边输出,用户几乎立即就能看到文字不断出现,大幅提升体验感。

本文将详细介绍如何用Python实现Claude API的流式输出,从基础概念到完整生产代码,覆盖SSE、WebSocket两种方案的选择,以及如何通过国内AI API中转平台实现无翻墙的稳定访问。


一、什么是流式输出?为什么需要它?

1.1 传统请求 vs 流式输出

传统请求模式(同步阻塞):

  • 客户端发送请求 → 等待服务端完整生成 → 一次性返回全部内容
  • 对于Claude生成200字的回答,可能需要等待3-8秒

流式输出模式(Streaming):

  • 客户端发送请求 → 服务端边生成边推送 → 客户端实时显示
  • 用户在200毫秒内就能看到第一个字符出现

1.2 流式输出的技术原理

Claude API的流式输出基于**Server-Sent Events(SSE)**协议实现。SSE是HTTP协议的一个扩展,允许服务器持续向客户端推送数据,非常适合”单向实时数据流”场景。

基本流程如下:

  1. 客户端发送带有 stream=True 参数的请求
  2. 服务端以 text/event-stream 格式持续推送 delta 数据
  3. 每个 event 包含当前生成的文本片段(token)
  4. 收到 [DONE] 信号时流式传输结束

二、Python基础流式调用实现

2.1 安装依赖

pip install anthropic

2.2 最简单的流式输出

import anthropic
# 使用jiekou.ai中转,国内开发者无需翻墙
client = anthropic.Anthropic(
api_key="your-jiekou-api-key",
base_url="https://api.jiekou.ai/v1"
)
def stream_claude_response(prompt: str):
"""最基础的流式输出实现"""
with client.messages.stream(
model="claude-3-7-sonnet-20250219",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print() # 换行
# 测试调用
stream_claude_response("请用100字介绍一下Python的优缺点")

2.3 获取完整的事件信息

有时候我们不只需要文本,还需要知道token使用量、模型信息等元数据:

import anthropic
client = anthropic.Anthropic(
api_key="your-jiekou-api-key",
base_url="https://api.jiekou.ai/v1"
)
def stream_with_metadata(prompt: str):
"""获取完整事件信息的流式输出"""
full_response = ""
with client.messages.stream(
model="claude-3-7-sonnet-20250219",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
for event in stream:
# 处理文本增量事件
if hasattr(event, 'type'):
if event.type == 'content_block_delta':
if hasattr(event.delta, 'text'):
text = event.delta.text
full_response += text
print(text, end="", flush=True)
# 获取最终的使用统计
final_message = stream.get_final_message()
print(f"\n\n--- 统计信息 ---")
print(f"输入tokens: {final_message.usage.input_tokens}")
print(f"输出tokens: {final_message.usage.output_tokens}")
return full_response

三、生产环境:FastAPI + 流式输出

在实际产品中,我们通常需要通过后端API将Claude的流式输出转发给前端。下面是一个完整的FastAPI实现:

3.1 后端实现

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic
import json
app = FastAPI()
client = anthropic.Anthropic(
api_key="your-jiekou-api-key",
base_url="https://api.jiekou.ai/v1"
)
def generate_stream(prompt: str):
"""生成SSE格式的流式数据"""
try:
with client.messages.stream(
model="claude-3-7-sonnet-20250219",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
# 以SSE格式发送数据
data = json.dumps({"content": text}, ensure_ascii=False)
yield f"data: {data}\n\n"
# 发送结束信号
yield "data: [DONE]\n\n"
except anthropic.APIError as e:
error_data = json.dumps({"error": str(e)})
yield f"data: {error_data}\n\n"
@app.post("/chat/stream")
async def chat_stream(request: dict):
prompt = request.get("message", "")
return StreamingResponse(
generate_stream(prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no" # 关闭Nginx缓冲,确保实时推送
}
)
@app.get("/health")
async def health():
return {"status": "ok"}

3.2 前端JavaScript接收示例

async function sendMessage(userMessage) {
const eventSource = new EventSource('/chat/stream');
const response = await fetch('/chat/stream', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({message: userMessage})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const {done, value} = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
try {
const parsed = JSON.parse(data);
// 实时更新UI
appendToOutput(parsed.content);
} catch (e) {
// 忽略解析错误
}
}
}
}
}

四、进阶技巧

4.1 多轮对话的流式输出

def multi_turn_stream(conversation_history: list, new_message: str):
"""支持多轮对话的流式输出"""
# 将新消息追加到历史
conversation_history.append({
"role": "user",
"content": new_message
})
full_response = ""
with client.messages.stream(
model="claude-3-7-sonnet-20250219",
max_tokens=2048,
system="你是一个专业的Python编程助手,请用简洁清晰的语言回答问题。",
messages=conversation_history
) as stream:
for text in stream.text_stream:
full_response += text
print(text, end="", flush=True)
print()
# 将AI回复加入历史
conversation_history.append({
"role": "assistant",
"content": full_response
})
return full_response, conversation_history
# 使用示例
history = []
response1, history = multi_turn_stream(history, "Python的列表和元组有什么区别?")
response2, history = multi_turn_stream(history, "那什么时候应该用列表,什么时候用元组?")

4.2 超时与重试机制

在生产环境中,网络波动是常态,需要加入重试逻辑:

import time
from typing import Generator
def stream_with_retry(prompt: str, max_retries: int = 3) -> Generator:
"""带重试机制的流式输出"""
for attempt in range(max_retries):
try:
with client.messages.stream(
model="claude-3-7-sonnet-20250219",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
yield text
return # 成功则退出重试循环
except anthropic.APIConnectionError as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 指数退避
print(f"\n连接失败,{wait_time}秒后重试(第{attempt+1}次)...")
time.sleep(wait_time)
else:
raise e
except anthropic.RateLimitError:
print("\n触发频率限制,等待60秒...")
time.sleep(60)

4.3 流式输出的取消控制

import threading
class StreamController:
def __init__(self):
self.cancelled = False
def cancel(self):
self.cancelled = True
def cancellable_stream(prompt: str, controller: StreamController):
"""可取消的流式输出"""
with client.messages.stream(
model="claude-3-7-sonnet-20250219",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
if controller.cancelled:
print("\n[用户取消生成]")
break
print(text, end="", flush=True)
# 使用示例:3秒后自动取消
controller = StreamController()
thread = threading.Thread(
target=cancellable_stream,
args=("请写一篇关于Python的长文章", controller)
)
thread.start()
time.sleep(3)
controller.cancel()
thread.join()

五、SSE vs WebSocket:方案选择

很多开发者会困惑:流式输出应该用SSE还是WebSocket?

SSE(Server-Sent Events)适用场景:

  • 单向数据流(服务器→客户端)
  • 实现简单,兼容性好(HTTP协议)
  • 自动重连机制
  • AI对话的典型场景

WebSocket适用场景:

  • 双向实时通信
  • 需要客户端主动推送数据
  • 游戏、协作编辑等复杂场景

对于AI对话流式输出,SSE是更合适的选择:更简单、更轻量,完全满足需求。


总结

流式输出是AI对话应用中提升用户体验最有效的技术手段之一。通过本文的代码示例,你可以快速实现从基础流式调用到生产级FastAPI服务的完整链路。

对于国内开发者,直连Claude API存在网络不稳定的问题。使用 jiekou.ai 这样的国内AI API中转平台,不仅解决了访问问题,其稳定的网络连接对流式输出场景尤为重要——因为流式传输对网络延迟和稳定性要求更高,中间断流会导致用户体验大打折扣。jiekou.ai 支持Claude 3.7 Sonnet等主流模型,按量计费,非常适合产品化场景。

下一步建议:将本文的FastAPI示例部署到服务器,结合前端实现一个完整的AI对话界面,亲身体验流式输出带来的流畅感。

分享:
联系我们