企业微信并发优化方案对比
问题背景
Hermes Gateway 采用单 Agent 串行循环架构——同一时刻只有一个用户的同一消息在被处理,所有其他用户的消息都在队列中等待。
用户A ─┐
用户B ─┤
用户C ─┼──► [Gateway 队列] ──► [单 Agent 处理] ──► 响应
用户D ─┤
用户N ─┘
当前环境:微信 1 用户 + 企业微信 10 用户。如果 11 个用户同时发消息,第 2-11 名用户都需要等待第 1 名处理完才能轮到。
方案二:api_server 桥接模式
核心原理
api_server(gateway/platforms/api_server.py)是无状态 HTTP 请求/响应架构,天然支持并发:
# api_server.py 第 2373 行
_MAX_CONCURRENT_RUNS = 10 # 已内置并发限制
# 每个请求在独立线程池里跑 agent
return await loop.run_in_executor(None, _run) # 第 2367 行
已有架构基础
| 端点 | 功能 |
|---|---|
POST /v1/chat/completions |
OpenAI 兼容格式,单次对话 |
POST /v1/runs |
异步运行,立即返回 run_id,支持 SSE 事件流 |
GET /v1/runs/{run_id}/events |
SSE 事件流(tool.started/completed, agent.output) |
落地步骤
第一步:企业微信消息桥接服务
企业微信的 Webhook 回调触发这个桥接服务,将消息转发到 api_server:
# wecom_bridge.py(新增服务)
from aiohttp import web
import aiohttp
API_SERVER_URL = "http://localhost:8000/v1/chat/completions"
API_SERVER_TOKEN = "your-token-here"
async def wecom_webhook(request):
"""企业微信消息回调入口"""
msg = await request.json()
user_id = msg.get("FromUserName") # 发送者
content = msg.get("Content") # 文本内容
chat_id = msg.get("ToUserName") # 接收者(机器人)
# 调用 api_server
async with aiohttp.ClientSession() as sess:
resp = await sess.post(
API_SERVER_URL,
json={
"messages": [{"role": "user", "content": content}],
"stream": False
},
headers={
"Authorization": f"Bearer {API_SERVER_TOKEN}",
"X-Tenant-ID": "wecom",
"X-User-ID": user_id,
"X-Chat-ID": chat_id,
}
)
result = await resp.json()
reply = result["choices"][0]["message"]["content"]
# 通过企业微信主动推送回复
await send_wecom_message(user_id, reply)
return {"errcode": 0}
async def send_wecom_message(to_user, content):
"""调用企业微信应用消息接口"""
# 实现参见企业微信官方API
app = web.Application()
app.router.add_post("/wecom/webhook", wecom_webhook)
web.run_app(app, host="0.0.0.0", port=8080)
第二步:多租户 Session 隔离
api_server 当前每个请求跑独立 Agent,但 session 混在一起。需要在 header 中提取租户信息:
# 在 api_server.py 的 _handle_chat_completions() 里新增
# 从 header 提取 tenant/user
tenant_id = request.headers.get("X-Tenant-ID", "default")
user_id = request.headers.get("X-User-ID", "default")
chat_id = request.headers.get("X-Chat-ID", "default")
# 构建隔离的 session_key
session_key = f"{tenant_id}:{user_id}:{chat_id}:dm"
第三步:启动桥接服务
# 独立运行桥接服务
python wecom_bridge.py &
# 或用 systemd 管理
# /etc/systemd/system/wecom-bridge.service
架构图
企业微信用户 ──► 企业微信服务器 ──► Webhook ──► wecom_bridge ──► api_server
│ │
│ ├──► Agent 实例 1
│ ├──► Agent 实例 2
│ │ ...
│ └──► Agent 实例 10(MAX)
│
└──► 回复路由
优点
- 改动最小:api_server 已实现线程级并发,只需写桥接服务
- 不影响现有架构:现有 Gateway 继续跑微信,新服务跑企业微信
- 水平扩展:加机器加并发,上限高
- SSE 流式响应:已有的事件流基础设施
缺点
- 实时性挑战:企业微信 Webhook 是回调模式,响应延迟依赖桥接服务
- 多租户隔离需开发:session 路由、记忆隔离需要额外实现
- 独立维护:桥接服务需要自己部署和监控
方案四:Agent 池代码改造
核心问题分析
当前 Gateway 单 Agent 串行的根因在 run.py 第 5307-5314 行:
# 每个 session_key 领取一个 sentinel,表示"这个 session 正在处理"
self._running_agents[_quick_key] = _AGENT_PENDING_SENTINEL
self._running_agents_ts[_quick_key] = time.time()
# 中间大量 await 点(vision、STT、hooks)...
# 真正处理(整个过程是串行的)
_agent_result = await self._handle_message_with_agent(...)
_handle_message_with_agent(第 5568 行)是 async def,但 Gateway 主事件循环是单线程,一次只处理一个 await。
SQLite WAL 并发问题
Session store 使用 SQLite WAL 模式,多并发 Agent 会同时写同一个 session.db。WAL 允许多读一写,但写操作需要排他锁:
Agent 1(写)──► SQLite 排他锁 ──► 阻塞其他写
Agent 2(等)──────────────────► 等锁
Agent 3(等)──────────────────► 等锁
改造方案:Session 级信号量
核心思路:
1. 加一个全局信号量控制并发数(如 5)
2. 每个 session_key 有自己的锁(同一 session 消息串行,不同 session 并发)
第一步:修改 Gateway 类初始化
# run.py Gateway.__init__() 里新增
# 全局并发控制信号量(可配置)
self._agent_semaphore = asyncio.Semaphore(5)
# per-session 锁字典(不同 session 不互相阻塞)
self._session_locks: Dict[str, asyncio.Lock] = {}
第二步:改造消息处理入口
# run.py _handle_message() 改造
async def _handle_message(self, event: MessageEvent) -> Optional[str]:
session_key = self._session_key_for_source(source)
# 每个 session 有自己的锁(同一 session 串行,不同 session 并发)
session_lock = self._session_locks.setdefault(session_key, asyncio.Lock())
# 全局限流:最多 5 个并发 Agent
async with self._agent_semaphore:
# session 级串行
async with session_lock:
return await self._handle_message_with_agent(
event, source, _quick_key, run_generation
)
第三步:解决 SQLite 并发写
# 方案A:写操作加全局锁(简单但影响性能)
self._session_write_lock = asyncio.Lock()
# 方案B:每个并发 slot 用独立 SQLite 文件
# 按 session_key 哈希到不同 DB 文件
# 方案C:改用 PostgreSQL(改动最大但最彻底)
第四步:配置化
# config.yaml
gateway:
max_concurrent_agents: 5 # 新增配置项
# run.py __init__() 读取配置
max_concurrent = self.config.get("gateway", {}).get("max_concurrent_agents", 5)
self._agent_semaphore = asyncio.Semaphore(max_concurrent)
架构图(改造后)
用户A msg ─┐
用户B msg ─┼─► _agent_semaphore(5) ──► Agent Pool(5个并发slot)
用户C msg ─┤ │
用户D msg ─┤ ├──► session_lock_A ──► 同一session串行
用户E msg ─┘ ├──► session_lock_B
├──► session_lock_C
└──► ...
优点
- 改动集中:只需改
run.py核心逻辑 - 精确控制:可配置并发数,粒度到 session 级别
- 复用现有基础设施:Gateway 的 session 管理、memory、hooks 全部复用
- 上线风险低:不影响现有平台适配器
缺点
- SQLite 并发瓶颈:写操作需要排他锁,多并发时性能受限
- 边缘 case 多:session 锁的生命周期管理复杂
- 单实例瓶颈:受限于单机的 CPU 和内存
- 测试成本高:核心代码改动,需要完整回归测试
两种方案对比
| 对比维度 | 方案二(api_server 桥接) | 方案四(Agent 池) |
|---|---|---|
| 改动范围 | 新增桥接服务 | 修改 run.py 核心逻辑 |
| 并发粒度 | 请求级(每个 HTTP 请求独立 Agent) | Session 级(可精确控制 5 并发) |
| 已有基础 | api_server 已实现线程池 | 从零写并发控制 |
| 最难部分 | WeCom 回调实时性 | SQLite 并发写 |
| 扩展性 | 加机器就加并发 | 受限于单实例性能 |
| 上线风险 | 低(不影响现有架构) | 中(核心代码改动) |
| 维护成本 | 中(桥接服务独立维护) | 低(复用现有 Gateway) |
| 实时性 | 依赖 Webhook 回调延迟 | Gateway 内实时处理 |
| 内存占用 | 每个请求独立 Agent 实例 | 共享 Agent 池 |
| 代码复用度 | 低(需要重写消息路由) | 高(全部复用) |
关键代码位置参考
| 文件 | 行号 | 内容 |
|---|---|---|
gateway/run.py |
5307-5314 | sentinel 领取逻辑(改造点) |
gateway/run.py |
5568 | _handle_message_with_agent 主处理 |
gateway/run.py |
2373 | _MAX_CONCURRENT_RUNS(api_server 并发上限) |
gateway/run.py |
1012-1013 | _running_agents 字典初始化 |
gateway/platforms/api_server.py |
2318-2367 | _run_agent 线程池执行 |
gateway/platforms/api_server.py |
2438-2448 | /v1/runs 端点并发限制 |
推荐路径
| 场景 | 推荐方案 |
|---|---|
| 先解决燃眉之急 | 方案二:微信/企业微信拆开跑两个 Profile(最简单的临时方案) |
| 用户多、想真正并发 | 方案二 + 桥接服务:api_server 已实现并发,复用成本低 |
| 愿意改代码一步到位 | 方案四:改动集中,但需要处理 SQLite 并发 |
| 长期维护 | 方案二(桥接)+ 方案四(Gateway 改造)结合 |
相关文档
- Hermes api_server 平台适配方案 — api_server 架构详解
- Hermes-Profile隔离详解 — 多 Profile 隔离部署
- 智能子Agent协作详解 — 多 Agent 协作模式