Python调用ChatGPT API实现流式输出教程

分类:技术交流发布时间:建议阅读时长:34 分钟
作者:sodope llm

你有没有注意到ChatGPT网页版的”打字机效果”——回复不是一次性出现,而是一个字一个字地显示出来?这种效果背后用的就是**流式输出(Streaming)**技术。

在实际应用中,流式输出有重要价值:

  • 用户体验更好:用户无需等待完整响应,立即看到内容开始出现
  • 长文本不超时:生成长文章时,避免等待超时问题
  • 实时反馈:用户可以提前判断输出方向,必要时提前终止

本文将完整讲解如何在Python中实现ChatGPT API的流式输出,从基础到高级。


基础:什么是流式输出?

普通请求 vs 流式请求

普通请求(一次性返回):

用户发送请求 → 等待... → 一次性收到完整回复

流式请求(逐块返回):

用户发送请求 → 立即收到第一个chunk → 收到第二个chunk → ... → 收到完整回复

底层实现上,流式输出使用的是HTTP的 Server-Sent Events (SSE) 协议,服务端持续推送数据块,直到生成完毕。


第一步:基础流式调用

from openai import OpenAI
client = OpenAI(
api_key="your-api-key",
base_url="https://api.jiekou.ai/v1" # 国内用户推荐
)
# 关键:设置 stream=True
stream = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "写一篇关于Python异步编程的介绍,大约300字"}
],
stream=True # 开启流式输出
)
# 遍历数据块
for chunk in stream:
content = chunk.choices[0].delta.content
if content is not None:
print(content, end="", flush=True) # flush=True确保立即输出
print() # 最后换行

关键点解析:

  • stream=True:开启流式模式
  • chunk.choices[0].delta.content:每个数据块中的文本片段
  • end="":不换行,内容连续拼接
  • flush=True:立即刷新输出缓冲区,确保实时显示

第二步:获取完整响应内容

流式输出时,需要手动拼接所有chunk来获取完整内容:

from openai import OpenAI
client = OpenAI(
api_key="your-api-key",
base_url="https://api.jiekou.ai/v1"
)
def stream_and_collect(prompt: str, model: str = "gpt-4o") -> str:
"""
流式输出的同时收集完整响应
:return: 完整的响应文本
"""
stream = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True
)
full_content = []
print("AI回复:", end="", flush=True)
for chunk in stream:
delta = chunk.choices[0].delta
# 提取文本内容
if delta.content is not None:
print(delta.content, end="", flush=True)
full_content.append(delta.content)
# 检查结束原因
if chunk.choices[0].finish_reason == "stop":
print() # 换行
break
return "".join(full_content)
# 使用示例
full_response = stream_and_collect("解释一下Python的asyncio模块")
print(f"\n--- 完整响应(共{len(full_response)}字符)---")
print(f"前100字符:{full_response[:100]}...")

第三步:处理流式输出的完整信息

除了文本内容,还可以获取其他元信息:

from openai import OpenAI
client = OpenAI(
api_key="your-api-key",
base_url="https://api.jiekou.ai/v1"
)
def stream_with_metadata(prompt: str):
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
stream_options={"include_usage": True} # 在最后一个chunk包含token用量
)
full_content = []
finish_reason = None
usage = None
for chunk in stream:
# 获取文本内容
if chunk.choices and chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_content.append(content)
print(content, end="", flush=True)
# 获取结束原因
if chunk.choices and chunk.choices[0].finish_reason:
finish_reason = chunk.choices[0].finish_reason
# 获取用量统计(最后一个chunk)
if chunk.usage:
usage = chunk.usage
print() # 换行
print(f"\n--- 统计信息 ---")
print(f"结束原因: {finish_reason}")
if usage:
print(f"输入tokens: {usage.prompt_tokens}")
print(f"输出tokens: {usage.completion_tokens}")
print(f"总tokens: {usage.total_tokens}")
return "".join(full_content)
stream_with_metadata("用3点总结Python的优缺点")

第四步:异步流式输出

在Web框架(FastAPI/Django等)中,通常需要使用异步流式输出:

import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI(
api_key="your-api-key",
base_url="https://api.jiekou.ai/v1"
)
async def async_stream(prompt: str):
"""异步流式输出"""
stream = await async_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
# 运行异步函数
asyncio.run(async_stream("写一个Python异步爬虫的示例"))

第五步:FastAPI流式响应接口

构建一个支持流式输出的API服务:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import asyncio
app = FastAPI()
async_client = AsyncOpenAI(
api_key="your-api-key",
base_url="https://api.jiekou.ai/v1"
)
@app.post("/chat/stream")
async def stream_chat(user_message: str):
"""
流式聊天接口
前端可以用 fetch + ReadableStream 接收
"""
async def generate():
stream = await async_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": user_message}],
stream=True
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
# SSE格式:data: {内容}\n\n
yield f"data: {content}\n\n"
yield "data: [DONE]\n\n" # 结束信号
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # 禁用Nginx缓冲
}
)
# 启动:uvicorn main:app --reload

前端对应的JavaScript代码:

async function streamChat(message) {
const response = await fetch('/chat/stream?user_message=' + message);
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const content = line.slice(6);
if (content === '[DONE]') return;
document.getElementById('output').textContent += content;
}
}
}
}

第六步:带超时控制的流式输出

import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI(
api_key="your-api-key",
base_url="https://api.jiekou.ai/v1"
)
async def stream_with_timeout(prompt: str, timeout_seconds: int = 30):
"""
带超时控制的流式输出
:param prompt: 用户输入
:param timeout_seconds: 超时时间(秒)
"""
try:
async with asyncio.timeout(timeout_seconds):
stream = await async_client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
chunks = []
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
chunks.append(content)
print(content, end="", flush=True)
print()
return "".join(chunks)
except asyncio.TimeoutError:
print(f"\n⚠️ 超时!已超过{timeout_seconds}秒")
return None
asyncio.run(stream_with_timeout("写一部长篇小说..."))

第七步:中断流式输出

允许用户提前中断生成:

from openai import OpenAI
import threading
client = OpenAI(
api_key="your-api-key",
base_url="https://api.jiekou.ai/v1"
)
# 中断标志
should_stop = threading.Event()
def stream_with_interrupt(prompt: str):
"""支持中断的流式输出"""
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True
)
content = []
try:
for chunk in stream:
# 检查是否需要中断
if should_stop.is_set():
print("\n[已中断]")
stream.close() # 关闭流连接
break
if chunk.choices[0].delta.content:
text = chunk.choices[0].delta.content
print(text, end="", flush=True)
content.append(text)
except Exception as e:
print(f"\n错误: {e}")
return "".join(content)
# 使用示例(在另一线程中设置中断信号)
def user_input_listener():
input("按回车键中断生成...\n")
should_stop.set()
interrupt_thread = threading.Thread(target=user_input_listener, daemon=True)
interrupt_thread.start()
result = stream_with_interrupt("写一篇非常长的文章...")

jiekou.ai:流式输出的最佳伙伴

流式输出对网络稳定性要求更高——连接中断意味着整个流需要重新开始。这也是推荐使用 jiekou.ai 的重要原因:

  • 国内直连:无代理转发,网络更稳定,流式输出更顺畅
  • 低延迟:减少每个chunk的传输延迟,打字机效果更流畅
  • 断线恢复:稳定的连接减少流式传输中断的概率
  • 多模型支持:同一接口,流式调用GPT-4o/Claude/Gemini
# 一行设置,解决所有网络问题
client = OpenAI(
api_key="your-jiekou-api-key",
base_url="https://api.jiekou.ai/v1"
)

常见问题

Q: 流式输出比普通输出贵吗? A: Token计费完全一样,不因为是流式输出而有额外费用。

Q: 流式输出时如何计算Token用量? A: 需要在请求中设置 stream_options={"include_usage": True},最后一个chunk会包含用量信息。

Q: 流式输出中途断了怎么办? A: 需要从头重新请求。建议做好错误处理和断点续传(记录已生成内容,断线时提示用户重新生成)。

Q: 能同时进行多个流式请求吗? A: 可以,使用异步编程(asyncio)可以并发处理多个流式请求。


总结

流式输出是AI应用必备的技术,本文覆盖了从基础到生产环境的所有场景:

场景实现方式
命令行打字机效果同步stream + print flush
收集完整响应拼接所有chunk
Web API服务FastAPI + StreamingResponse
高并发场景AsyncOpenAI + asyncio
超时控制asyncio.timeout
用户中断threading.Event

将这些技术应用到你的项目中,配合 jiekou.ai 的稳定网络,打造丝滑的AI用户体验。a’f

分享:
联系我们