Python实现Claude API流式输出教程(附完整代码)
引言
在构建AI对话应用时,用户体验是决定产品成败的关键因素之一。当用户提交一个问题后,如果需要等待10秒才能看到完整答案,这种体验显然不够好。而通过**流式输出(Streaming)**技术,可以让模型一边生成内容一边输出,用户几乎立即就能看到文字不断出现,大幅提升体验感。
本文将详细介绍如何用Python实现Claude API的流式输出,从基础概念到完整生产代码,覆盖SSE、WebSocket两种方案的选择,以及如何通过国内AI API中转平台实现无翻墙的稳定访问。
一、什么是流式输出?为什么需要它?
1.1 传统请求 vs 流式输出
传统请求模式(同步阻塞):
- 客户端发送请求 → 等待服务端完整生成 → 一次性返回全部内容
- 对于Claude生成200字的回答,可能需要等待3-8秒
流式输出模式(Streaming):
- 客户端发送请求 → 服务端边生成边推送 → 客户端实时显示
- 用户在200毫秒内就能看到第一个字符出现
1.2 流式输出的技术原理
Claude API的流式输出基于**Server-Sent Events(SSE)**协议实现。SSE是HTTP协议的一个扩展,允许服务器持续向客户端推送数据,非常适合”单向实时数据流”场景。
基本流程如下:
- 客户端发送带有
stream=True参数的请求 - 服务端以
text/event-stream格式持续推送 delta 数据 - 每个 event 包含当前生成的文本片段(token)
- 收到
[DONE]信号时流式传输结束
二、Python基础流式调用实现
2.1 安装依赖
pip install anthropic
2.2 最简单的流式输出
import anthropic# 使用jiekou.ai中转,国内开发者无需翻墙client = anthropic.Anthropic( api_key="your-jiekou-api-key", base_url="https://api.jiekou.ai/v1")def stream_claude_response(prompt: str): """最基础的流式输出实现""" with client.messages.stream( model="claude-3-7-sonnet-20250219", max_tokens=1024, messages=[{"role": "user", "content": prompt}] ) as stream: for text in stream.text_stream: print(text, end="", flush=True) print() # 换行# 测试调用stream_claude_response("请用100字介绍一下Python的优缺点")
2.3 获取完整的事件信息
有时候我们不只需要文本,还需要知道token使用量、模型信息等元数据:
import anthropicclient = anthropic.Anthropic( api_key="your-jiekou-api-key", base_url="https://api.jiekou.ai/v1")def stream_with_metadata(prompt: str): """获取完整事件信息的流式输出""" full_response = "" with client.messages.stream( model="claude-3-7-sonnet-20250219", max_tokens=1024, messages=[{"role": "user", "content": prompt}] ) as stream: for event in stream: # 处理文本增量事件 if hasattr(event, 'type'): if event.type == 'content_block_delta': if hasattr(event.delta, 'text'): text = event.delta.text full_response += text print(text, end="", flush=True) # 获取最终的使用统计 final_message = stream.get_final_message() print(f"\n\n--- 统计信息 ---") print(f"输入tokens: {final_message.usage.input_tokens}") print(f"输出tokens: {final_message.usage.output_tokens}") return full_response
三、生产环境:FastAPI + 流式输出
在实际产品中,我们通常需要通过后端API将Claude的流式输出转发给前端。下面是一个完整的FastAPI实现:
3.1 后端实现
from fastapi import FastAPIfrom fastapi.responses import StreamingResponseimport anthropicimport jsonapp = FastAPI()client = anthropic.Anthropic( api_key="your-jiekou-api-key", base_url="https://api.jiekou.ai/v1")def generate_stream(prompt: str): """生成SSE格式的流式数据""" try: with client.messages.stream( model="claude-3-7-sonnet-20250219", max_tokens=2048, messages=[{"role": "user", "content": prompt}] ) as stream: for text in stream.text_stream: # 以SSE格式发送数据 data = json.dumps({"content": text}, ensure_ascii=False) yield f"data: {data}\n\n" # 发送结束信号 yield "data: [DONE]\n\n" except anthropic.APIError as e: error_data = json.dumps({"error": str(e)}) yield f"data: {error_data}\n\n"@app.post("/chat/stream")async def chat_stream(request: dict): prompt = request.get("message", "") return StreamingResponse( generate_stream(prompt), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "X-Accel-Buffering": "no" # 关闭Nginx缓冲,确保实时推送 } )@app.get("/health")async def health(): return {"status": "ok"}
3.2 前端JavaScript接收示例
async function sendMessage(userMessage) { const eventSource = new EventSource('/chat/stream'); const response = await fetch('/chat/stream', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({message: userMessage}) }); const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) { const {done, value} = await reader.read(); if (done) break; const chunk = decoder.decode(value); const lines = chunk.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6); if (data === '[DONE]') return; try { const parsed = JSON.parse(data); // 实时更新UI appendToOutput(parsed.content); } catch (e) { // 忽略解析错误 } } } }}
四、进阶技巧
4.1 多轮对话的流式输出
def multi_turn_stream(conversation_history: list, new_message: str): """支持多轮对话的流式输出""" # 将新消息追加到历史 conversation_history.append({ "role": "user", "content": new_message }) full_response = "" with client.messages.stream( model="claude-3-7-sonnet-20250219", max_tokens=2048, system="你是一个专业的Python编程助手,请用简洁清晰的语言回答问题。", messages=conversation_history ) as stream: for text in stream.text_stream: full_response += text print(text, end="", flush=True) print() # 将AI回复加入历史 conversation_history.append({ "role": "assistant", "content": full_response }) return full_response, conversation_history# 使用示例history = []response1, history = multi_turn_stream(history, "Python的列表和元组有什么区别?")response2, history = multi_turn_stream(history, "那什么时候应该用列表,什么时候用元组?")
4.2 超时与重试机制
在生产环境中,网络波动是常态,需要加入重试逻辑:
import timefrom typing import Generatordef stream_with_retry(prompt: str, max_retries: int = 3) -> Generator: """带重试机制的流式输出""" for attempt in range(max_retries): try: with client.messages.stream( model="claude-3-7-sonnet-20250219", max_tokens=1024, messages=[{"role": "user", "content": prompt}] ) as stream: for text in stream.text_stream: yield text return # 成功则退出重试循环 except anthropic.APIConnectionError as e: if attempt < max_retries - 1: wait_time = 2 ** attempt # 指数退避 print(f"\n连接失败,{wait_time}秒后重试(第{attempt+1}次)...") time.sleep(wait_time) else: raise e except anthropic.RateLimitError: print("\n触发频率限制,等待60秒...") time.sleep(60)
4.3 流式输出的取消控制
import threadingclass StreamController: def __init__(self): self.cancelled = False def cancel(self): self.cancelled = Truedef cancellable_stream(prompt: str, controller: StreamController): """可取消的流式输出""" with client.messages.stream( model="claude-3-7-sonnet-20250219", max_tokens=2048, messages=[{"role": "user", "content": prompt}] ) as stream: for text in stream.text_stream: if controller.cancelled: print("\n[用户取消生成]") break print(text, end="", flush=True)# 使用示例:3秒后自动取消controller = StreamController()thread = threading.Thread( target=cancellable_stream, args=("请写一篇关于Python的长文章", controller))thread.start()time.sleep(3)controller.cancel()thread.join()
五、SSE vs WebSocket:方案选择
很多开发者会困惑:流式输出应该用SSE还是WebSocket?
SSE(Server-Sent Events)适用场景:
- 单向数据流(服务器→客户端)
- 实现简单,兼容性好(HTTP协议)
- 自动重连机制
- AI对话的典型场景
WebSocket适用场景:
- 双向实时通信
- 需要客户端主动推送数据
- 游戏、协作编辑等复杂场景
对于AI对话流式输出,SSE是更合适的选择:更简单、更轻量,完全满足需求。
总结
流式输出是AI对话应用中提升用户体验最有效的技术手段之一。通过本文的代码示例,你可以快速实现从基础流式调用到生产级FastAPI服务的完整链路。
对于国内开发者,直连Claude API存在网络不稳定的问题。使用 jiekou.ai 这样的国内AI API中转平台,不仅解决了访问问题,其稳定的网络连接对流式输出场景尤为重要——因为流式传输对网络延迟和稳定性要求更高,中间断流会导致用户体验大打折扣。jiekou.ai 支持Claude 3.7 Sonnet等主流模型,按量计费,非常适合产品化场景。
下一步建议:将本文的FastAPI示例部署到服务器,结合前端实现一个完整的AI对话界面,亲身体验流式输出带来的流畅感。