Python调用ChatGPT API实现流式输出教程
你有没有注意到ChatGPT网页版的”打字机效果”——回复不是一次性出现,而是一个字一个字地显示出来?这种效果背后用的就是**流式输出(Streaming)**技术。
在实际应用中,流式输出有重要价值:
- 用户体验更好:用户无需等待完整响应,立即看到内容开始出现
- 长文本不超时:生成长文章时,避免等待超时问题
- 实时反馈:用户可以提前判断输出方向,必要时提前终止
本文将完整讲解如何在Python中实现ChatGPT API的流式输出,从基础到高级。
基础:什么是流式输出?
普通请求 vs 流式请求
普通请求(一次性返回):
用户发送请求 → 等待... → 一次性收到完整回复
流式请求(逐块返回):
用户发送请求 → 立即收到第一个chunk → 收到第二个chunk → ... → 收到完整回复
底层实现上,流式输出使用的是HTTP的 Server-Sent Events (SSE) 协议,服务端持续推送数据块,直到生成完毕。
第一步:基础流式调用
from openai import OpenAIclient = OpenAI( api_key="your-api-key", base_url="https://api.jiekou.ai/v1" # 国内用户推荐)# 关键:设置 stream=Truestream = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "user", "content": "写一篇关于Python异步编程的介绍,大约300字"} ], stream=True # 开启流式输出)# 遍历数据块for chunk in stream: content = chunk.choices[0].delta.content if content is not None: print(content, end="", flush=True) # flush=True确保立即输出print() # 最后换行
关键点解析:
stream=True:开启流式模式chunk.choices[0].delta.content:每个数据块中的文本片段end="":不换行,内容连续拼接flush=True:立即刷新输出缓冲区,确保实时显示
第二步:获取完整响应内容
流式输出时,需要手动拼接所有chunk来获取完整内容:
from openai import OpenAIclient = OpenAI( api_key="your-api-key", base_url="https://api.jiekou.ai/v1")def stream_and_collect(prompt: str, model: str = "gpt-4o") -> str: """ 流式输出的同时收集完整响应 :return: 完整的响应文本 """ stream = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], stream=True ) full_content = [] print("AI回复:", end="", flush=True) for chunk in stream: delta = chunk.choices[0].delta # 提取文本内容 if delta.content is not None: print(delta.content, end="", flush=True) full_content.append(delta.content) # 检查结束原因 if chunk.choices[0].finish_reason == "stop": print() # 换行 break return "".join(full_content)# 使用示例full_response = stream_and_collect("解释一下Python的asyncio模块")print(f"\n--- 完整响应(共{len(full_response)}字符)---")print(f"前100字符:{full_response[:100]}...")
第三步:处理流式输出的完整信息
除了文本内容,还可以获取其他元信息:
from openai import OpenAIclient = OpenAI( api_key="your-api-key", base_url="https://api.jiekou.ai/v1")def stream_with_metadata(prompt: str): stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True, stream_options={"include_usage": True} # 在最后一个chunk包含token用量 ) full_content = [] finish_reason = None usage = None for chunk in stream: # 获取文本内容 if chunk.choices and chunk.choices[0].delta.content: content = chunk.choices[0].delta.content full_content.append(content) print(content, end="", flush=True) # 获取结束原因 if chunk.choices and chunk.choices[0].finish_reason: finish_reason = chunk.choices[0].finish_reason # 获取用量统计(最后一个chunk) if chunk.usage: usage = chunk.usage print() # 换行 print(f"\n--- 统计信息 ---") print(f"结束原因: {finish_reason}") if usage: print(f"输入tokens: {usage.prompt_tokens}") print(f"输出tokens: {usage.completion_tokens}") print(f"总tokens: {usage.total_tokens}") return "".join(full_content)stream_with_metadata("用3点总结Python的优缺点")
第四步:异步流式输出
在Web框架(FastAPI/Django等)中,通常需要使用异步流式输出:
import asynciofrom openai import AsyncOpenAIasync_client = AsyncOpenAI( api_key="your-api-key", base_url="https://api.jiekou.ai/v1")async def async_stream(prompt: str): """异步流式输出""" stream = await async_client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True ) async for chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="", flush=True) print()# 运行异步函数asyncio.run(async_stream("写一个Python异步爬虫的示例"))
第五步:FastAPI流式响应接口
构建一个支持流式输出的API服务:
from fastapi import FastAPIfrom fastapi.responses import StreamingResponsefrom openai import AsyncOpenAIimport asyncioapp = FastAPI()async_client = AsyncOpenAI( api_key="your-api-key", base_url="https://api.jiekou.ai/v1")@app.post("/chat/stream")async def stream_chat(user_message: str): """ 流式聊天接口 前端可以用 fetch + ReadableStream 接收 """ async def generate(): stream = await async_client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": user_message}], stream=True ) async for chunk in stream: content = chunk.choices[0].delta.content if content: # SSE格式:data: {内容}\n\n yield f"data: {content}\n\n" yield "data: [DONE]\n\n" # 结束信号 return StreamingResponse( generate(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # 禁用Nginx缓冲 } )# 启动:uvicorn main:app --reload
前端对应的JavaScript代码:
async function streamChat(message) { const response = await fetch('/chat/stream?user_message=' + message); const reader = response.body.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; const text = decoder.decode(value); const lines = text.split('\n'); for (const line of lines) { if (line.startsWith('data: ')) { const content = line.slice(6); if (content === '[DONE]') return; document.getElementById('output').textContent += content; } } }}
第六步:带超时控制的流式输出
import asynciofrom openai import AsyncOpenAIasync_client = AsyncOpenAI( api_key="your-api-key", base_url="https://api.jiekou.ai/v1")async def stream_with_timeout(prompt: str, timeout_seconds: int = 30): """ 带超时控制的流式输出 :param prompt: 用户输入 :param timeout_seconds: 超时时间(秒) """ try: async with asyncio.timeout(timeout_seconds): stream = await async_client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True ) chunks = [] async for chunk in stream: content = chunk.choices[0].delta.content if content: chunks.append(content) print(content, end="", flush=True) print() return "".join(chunks) except asyncio.TimeoutError: print(f"\n⚠️ 超时!已超过{timeout_seconds}秒") return Noneasyncio.run(stream_with_timeout("写一部长篇小说..."))
第七步:中断流式输出
允许用户提前中断生成:
from openai import OpenAIimport threadingclient = OpenAI( api_key="your-api-key", base_url="https://api.jiekou.ai/v1")# 中断标志should_stop = threading.Event()def stream_with_interrupt(prompt: str): """支持中断的流式输出""" stream = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}], stream=True ) content = [] try: for chunk in stream: # 检查是否需要中断 if should_stop.is_set(): print("\n[已中断]") stream.close() # 关闭流连接 break if chunk.choices[0].delta.content: text = chunk.choices[0].delta.content print(text, end="", flush=True) content.append(text) except Exception as e: print(f"\n错误: {e}") return "".join(content)# 使用示例(在另一线程中设置中断信号)def user_input_listener(): input("按回车键中断生成...\n") should_stop.set()interrupt_thread = threading.Thread(target=user_input_listener, daemon=True)interrupt_thread.start()result = stream_with_interrupt("写一篇非常长的文章...")
jiekou.ai:流式输出的最佳伙伴
流式输出对网络稳定性要求更高——连接中断意味着整个流需要重新开始。这也是推荐使用 jiekou.ai 的重要原因:
- 国内直连:无代理转发,网络更稳定,流式输出更顺畅
- 低延迟:减少每个chunk的传输延迟,打字机效果更流畅
- 断线恢复:稳定的连接减少流式传输中断的概率
- 多模型支持:同一接口,流式调用GPT-4o/Claude/Gemini
# 一行设置,解决所有网络问题client = OpenAI( api_key="your-jiekou-api-key", base_url="https://api.jiekou.ai/v1")
常见问题
Q: 流式输出比普通输出贵吗? A: Token计费完全一样,不因为是流式输出而有额外费用。
Q: 流式输出时如何计算Token用量? A: 需要在请求中设置 stream_options={"include_usage": True},最后一个chunk会包含用量信息。
Q: 流式输出中途断了怎么办? A: 需要从头重新请求。建议做好错误处理和断点续传(记录已生成内容,断线时提示用户重新生成)。
Q: 能同时进行多个流式请求吗? A: 可以,使用异步编程(asyncio)可以并发处理多个流式请求。
总结
流式输出是AI应用必备的技术,本文覆盖了从基础到生产环境的所有场景:
| 场景 | 实现方式 |
| 命令行打字机效果 | 同步stream + print flush |
| 收集完整响应 | 拼接所有chunk |
| Web API服务 | FastAPI + StreamingResponse |
| 高并发场景 | AsyncOpenAI + asyncio |
| 超时控制 | asyncio.timeout |
| 用户中断 | threading.Event |
将这些技术应用到你的项目中,配合 jiekou.ai 的稳定网络,打造丝滑的AI用户体验。a’f