LangGraph生产踩坑录:语音Agent架构实战
作者:James,公众号:James的成长日记
前情提要
一篇我们排查了 LangGraph 生产踩坑录,把真实项目里的 10 个坑都扒出来了。这一篇我们进入一个很多同学心心念念但一直没落地的主题——语音能力接入。
说说你可能踩过的坑:用 OpenAI TTS 生成了一段音频,等 API 返回完整文件再播放,用户等了 3 秒,感觉像在打电话等接通。或者直接接了 Whisper API,一次性传整段录音,说了 30 秒话,识别完要 2 秒,整个对话节奏全断了。上线之后发现这两个问题几乎无解——因为根本就搭错架了。
给 Agent 加语音能力,有两种路子:一种是「端到端语音」,一个多模态模型直接吃音频吐音频;另一种是「三明治架构」,STT + Agent + TTS 三层串联。两种都能用,但选错了场景,就是一场噩梦。
01 两种架构的三角博弈:选哪个取决于你最怕什么
端到端(S2S):用户说话 → 音频流 → 多模态大模型(如 gpt-4o-audio-preview)→ 音频流,一步到位。
三明治架构:麦克风音频先进 STT 转成文字,再由 LangGraph Agent 推理,推理结果最后进 TTS 合成音频,三层各司其职。
端到端的优点:架构简单、对简单交互延迟低、能保留语气情绪信息。
端到端的缺点:模型选择极少(目前就 gpt-4o-audio-preview 等屈指可数几个)、工具调用能力弱、出问题难 debug。
三明治的优点:每层可以独立换提供商、Agent 能用全部文本模型能力、行为透明好排查。
三明治的缺点:三层协调更复杂,STT→文本转换会丢失语气信息。
实际项目里的决策逻辑:
| 关注点 | 选端到端 | 选三明治 |
|---|---|---|
| 工具调用/复杂推理 | ❌ | ✅ |
| 语气/情绪保留 | ✅ | ❌ |
| 模型可替换性 | ❌ | ✅ |
| 快速原型 | ✅ | ❌ |
| 生产级稳定性 | ❌ | ✅ |
| 延迟极限(可达) | <500ms | <700ms |
结论很清晰:生产级语音 Agent,首选三明治架构。 本文全程用三明治讲。
02 STT 核心:流式识别才是正确姿势,一次性传文件是死路
很多同学第一次做语音识别,用的是这个模式——等录音结束,一次性传文件给 Whisper API。这种方式的问题是:三个环节(识别、Agent 思考、TTS 合成)全是串行等待,加起来很容易超过 3-4 秒。
正确姿势是流式 STT:音频边录边传,边传边识别,识别出来的文字边传边给 Agent 处理。整个管道是并发的,不是串行的。
用 AssemblyAI 的 Streaming WebSocket API 为例,关键是生产者-消费者并发模式。如果你用 OpenAI Whisper API(暂时没有 WebSocket 流式接口),工程折中方案是分段识别——每隔 2-3 秒截断一段识别,多段结果拼接。
方案一:AssemblyAI 流式 WebSocket
import { WebSocket } from 'ws';
interface STTEvent {
type: 'stt_chunk' | 'stt_output';
transcript: string;
ts: number;
}
class AssemblyAIStreaming {
private ws: WebSocket | null = null;
private buffer: STTEvent[] = [];
private resolve: ((event: STTEvent) => void) | null = null;
async connect(apiKey: string, sampleRate = 16000): Promise<void> {
const params = new URLSearchParams({
sample_rate: sampleRate.toString(),
format_turns: 'true'
});
this.ws = new WebSocket(
`wss://streaming.assemblyai.com/v3/ws?${params}`,
{ headers: { Authorization: apiKey } }
);
this.ws.on('message', (data: Buffer) => {
const msg = JSON.parse(data.toString());
if (msg.type === 'Turn') {
const event: STTEvent = {
type: msg.turn_is_formatted ? 'stt_output' : 'stt_chunk',
transcript: msg.transcript,
ts: Date.now()
};
if (this.resolve) {
this.resolve(event);
this.resolve = null;
} else {
this.buffer.push(event);
}
}
});
await new Promise<void>((resolve) => {
this.ws!.on('open', () => resolve());
});
}
sendAudio(chunk: Buffer): void {
this.ws?.send(chunk);
}
async * receiveEvents(): AsyncGenerator<STTEvent> {
while (true) {
if (this.buffer.length > 0) {
yield this.buffer.shift()!;
} else {
yield await new Promise<STTEvent>((resolve) => {
this.resolve = resolve;
});
}
}
}
close(): void {
this.ws?.close();
}
}
方案二:OpenAI Whisper 分段识别(折中方案)
const openai = new OpenAI();
async function * chunkingSTT(audioStream: AsyncIterable<Buffer>): AsyncGenerator<string> {
const CHUNK_INTERVAL_MS = 2000;
let buffer: Buffer[] = [];
let lastFlushTime = Date.now();
for await (const chunk of audioStream) {
buffer.push(chunk);
if (Date.now() - lastFlushTime >= CHUNK_INTERVAL_MS) {
const combined = Buffer.concat(buffer);
buffer = [];
lastFlushTime = Date.now();
// 指定 language: 'zh',中文识别率提升 15-20%
const result = await openai.audio.transcriptions.create({
file: new File([combined], 'audio.pcm', { type: 'audio/pcm' }),
model: 'whisper-1',
language: 'zh',
response_format: 'text',
});
if (result.trim()) yield result;
}
}
// 处理剩余音频
if (buffer.length > 0) {
const result = await openai.audio.transcriptions.create({
file: new File([Buffer.concat(buffer)], 'audio.pcm', { type: 'audio/pcm' }),
model: 'whisper-1',
language: 'zh',
response_format: 'text',
});
if (result.trim()) yield result;
}
}
03 TTS 核心:分句触发才是打字机效果的语音版
TTS 也有同样的等待问题。等 LLM 输出完整回复再整段传给 TTS,用户要等两次。
正确做法:LLM 输出一句话,立刻传给 TTS,音频边合成边播放。实现的关键是句子边界检测:token 流来了先缓存,遇到句号/问号/感叹号就切断,把这句立刻送 TTS。
import OpenAI from 'openai';
const openai = new OpenAI();
// 分句缓冲器:把 token 流切成句子流
async function * sentenceSplitter(
tokenStream: AsyncIterable<string>
): AsyncGenerator<string> {
const END_CHARS = new Set(['。', '!', '?', '.', '!', '?', '\n']);
let buffer = '';
for await (const token of tokenStream) {
buffer += token;
if (END_CHARS.has(buffer[buffer.length - 1]) && buffer.trim().length > 0) {
yield buffer.trim();
buffer = '';
}
}
if (buffer.trim().length > 0) yield buffer.trim();
}
// 流式 TTS:合成一句话,边合成边吐音频块
// 关键:用 tts-1 不用 tts-1-hd,延迟差 2-3 倍
async function * ttsStream(text: string): AsyncGenerator<Buffer> {
const response = await openai.audio.speech.create({
model: 'tts-1', // ← 流式专用,tts-1-hd 延迟翻倍
voice: 'alloy',
input: text,
response_format: 'pcm', // PCM 直接播放,不需要解码
});
const reader = response.body!.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield Buffer.from(value);
}
}
// 串联:Agent token 流 → 分句 → TTS 流 → 音频块
async function * agentToSpeech(
agentTokenStream: AsyncIterable<string>
): AsyncGenerator<Buffer> {
for await (const sentence of sentenceSplitter(agentTokenStream)) {
for await (const audioChunk of ttsStream(sentence)) {
yield audioChunk;
}
}
}
这样做,LLM 说出第一句话后,音频就开始播了,而不是等整段回复生成完毕。
04 完整三明治管道:用异步迭代器串联三层
现在把 STT、LangGraph Agent、TTS 三层串起来。核心思路:用 AsyncGenerator 做无缝衔接,每一层的输出直接是下一层的输入。
import { createReactAgent } from '@langchain/langgraph/prebuilt';
import { ChatOpenAI } from '@langchain/openai';
import { MemorySaver } from '@langchain/langgraph';
import { HumanMessage } from '@langchain/core/messages';
class VoiceAgentPipeline {
private agent: ReturnType<typeof createReactAgent>;
private memory = new MemorySaver();
private threadId: string;
constructor() {
this.agent = createReactAgent({
llm: new ChatOpenAI({ model: 'gpt-4o', streaming: true }),
tools: [], // 在这里加你的工具
checkpointSaver: this.memory,
});
// 每个会话独立 threadId,保持记忆连续
this.threadId = `voice-session-${Date.now()}`;
}
// Agent 处理文字,流式吐 token(streamMode: 'messages')
private async * agentStream(transcript: string): AsyncGenerator<string> {
const stream = await this.agent.stream(
{ messages: [new HumanMessage(transcript)] },
{ configurable: { thread_id: this.threadId }, streamMode: 'messages' }
);
for await (const [message] of stream) {
if (
message.content && typeof message.content === 'string'
&& message.getType() === 'ai'
) {
yield message.content;
}
}
}
// 完整一轮对话:音频流 → 文字 → Agent → TTS → 音频流
async * processTurn(audioStream: AsyncIterable<Buffer>): AsyncGenerator<Buffer> {
const stt = new AssemblyAIStreaming();
await stt.connect(process.env.ASSEMBLYAI_API_KEY!);
const producer = (async () => {
for await (const chunk of audioStream) { stt.sendAudio(chunk); }
stt.close();
})();
for await (const event of stt.receiveEvents()) {
if (event.type === 'stt_output') {
yield * agentToSpeech(this.agentStream(event.transcript));
}
}
await producer;
}
}
整条数据流链路:麦克风音频 → STT WebSocket(实时文本)→ LangGraph Agent(token 流)→ 分句缓冲器(句子)→ OpenAI TTS(音频块)→ 扬声器播放,每个环节都是异步迭代器,上游产出下游消费,完全并发。
05 WebSocket 服务:浏览器 ↔ 服务器实时通信封装
语音 Agent 需要用 WebSocket,因为浏览器要持续发音频、持续接收音频,HTTP 请求-响应搞不定。下面是 Node.js 服务端的封装,把音频 base64 编码通过 WebSocket 双向传输。
import { WebSocketServer, WebSocket } from 'ws';
function createVoiceServer(port = 8765): void {
const wss = new WebSocketServer({ port });
wss.on('connection', (ws: WebSocket) => {
const pipeline = new VoiceAgentPipeline();
const audioQueue: Buffer[] = [];
let audioResolve: (() => void) | null = null;
let sessionEnded = false;
// 音频 buffer → AsyncGenerator
async function * audioStream(): AsyncGenerator<Buffer> {
while (!sessionEnded || audioQueue.length > 0) {
if (audioQueue.length > 0) {
yield audioQueue.shift()!;
} else {
await new Promise<void>((r) => { audioResolve = r; });
}
}
}
ws.on('message', async (data: Buffer) => {
const msg = JSON.parse(data.toString());
if (msg.type === 'audio_chunk' && msg.data) {
audioQueue.push(Buffer.from(msg.data, 'base64'));
audioResolve?.();
audioResolve = null;
} else if (msg.type === 'audio_end') {
sessionEnded = true;
audioResolve?.();
}
});
// 处理管道:接收音频 → 发回音频
(async () => {
for await (const chunk of pipeline.processTurn(audioStream())) {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'audio_response',
data: chunk.toString('base64')
}));
}
}
ws.send(JSON.stringify({ type: 'response_end' }));
})().catch(console.error);
});
console.log(`语音 Agent 服务运行在 ws://localhost:${port}`);
}
createVoiceServer();
浏览器端配合这个服务,用 MediaRecorder API 录音,通过 WebSocket 持续发音频块,收到音频块后用 AudioContext 播放。格式注意:AssemblyAI 流式 API 要求 PCM 16kHz 单声道,浏览器录音默认是 opus 编码,需要转换。
06 延迟优化:那 700ms 是怎么做到的
生产环境不优化时延迟分解:STT 识别完成约 800ms(等说完话才开始识别)、Agent 首 token 约 1200ms(等完整回复生成)、TTS 首音频约 600ms(等整段文字合成完),总感知延迟约 2600ms。
优化之后:STT 流式实时识别、Agent 首 token 约 800ms、TTS 第一句话首音频约 200ms,总感知延迟压缩到 800-1000ms。
关键优化点汇总:
| 优化项 | 优化前 | 优化后 | 收益 |
|---|---|---|---|
| STT 方案 | 一次性传文件 | WebSocket 流式 | 节省 600-1000ms |
| TTS 触发时机 | 等完整回复 | 遇第一个句号即触发 | 节省 400ms+ |
| TTS 模型 | tts-1-hd | tts-1 | 延迟降低 50% |
| 音频格式 | mp3(需解码) | pcm(直接播放) | 节省 50-100ms |
| 中文识别 | 未指定语言 | language: 'zh' | 准确率+15-20% |
07 常见坑
坑 1:没有 VAD(语音活动检测),导致尴尬的安静
用户停顿 1 秒系统不知道他说完了没有,要么打断要么一直等。解法:前端用 hark.js 做 VAD,检测到说话停止超过 500ms 才发送 audio_end 事件。
坑 2:TTS 被工具调用打断,输出半截句子
Agent 调用工具时 token 流中断,TTS 收到半截句子合成,听起来很别扭。解法:在 sentenceSplitter 里加超时 flush,超过 300ms 没有新 token 就强制把 buffer 输出。
坑 3:对话记忆跨会话丢失
MemorySaver 是内存存储,Node.js 重启清空。解法:用 @langchain/langgraph-checkpoint-postgres 持久化。
import { PostgresSaver } from '@langchain/langgraph-checkpoint-postgres';
const checkpointer = PostgresSaver.fromConnString(process.env.DATABASE_URL!);
await checkpointer.setup();
const agent = createReactAgent({ llm, tools, checkpointSaver: checkpointer });
// threadId 和用户 ID 绑定,每个用户独立记忆
const threadId = `user-${userId}-voice`;
坑 4:中文 Whisper 识别率低
忘传 language: 'zh' 参数,模型默认自动检测,中文识别率比指定语言低 15-20%。
坑 5:多人并发时 threadId 混用
多个 WebSocket 连接共用同一个 threadId,对话记忆全乱。每个连接要用和用户 ID 挂钩的独立 threadId。
总结
- 架构选三明治:STT + LangGraph Agent + TTS,可控性高,生产稳定,<1s 延迟可达
- STT 要流式:边录边识别,生产者-消费者并发,不等录音结束
- TTS 要分句触发:第一个句号出来就合成,别等整段回复
- 格式细节很重要:PCM 16kHz 单声道、tts-1 模型、language: 'zh' 参数一个都不能少
- VAD 必须有:没有它,说话停顿时系统不知道该不该回,体验直接崩
- 持久化 checkpoint:跨会话记忆要上数据库,MemorySaver 只适合 demo
预告:下一篇是压轴篇——完整项目实战:用 LangChain + LangGraph + RAG + Tools + MCP + Memory 从零搭一个生产级 AI 助手。