x

企业微信并发优化方案对比

问题背景

Hermes Gateway 采用单 Agent 串行循环架构——同一时刻只有一个用户的同一消息在被处理,所有其他用户的消息都在队列中等待。

用户A ─┐
用户B ─┤
用户C ─┼──► [Gateway 队列] ──► [单 Agent 处理] ──► 响应
用户D ─┤
用户N ─┘

当前环境:微信 1 用户 + 企业微信 10 用户。如果 11 个用户同时发消息,第 2-11 名用户都需要等待第 1 名处理完才能轮到。

方案二:api_server 桥接模式

核心原理

api_server(gateway/platforms/api_server.py)是无状态 HTTP 请求/响应架构,天然支持并发:

# api_server.py 第 2373 行
_MAX_CONCURRENT_RUNS = 10  # 已内置并发限制

# 每个请求在独立线程池里跑 agent
return await loop.run_in_executor(None, _run)  # 第 2367 行

已有架构基础

端点 功能
POST /v1/chat/completions OpenAI 兼容格式,单次对话
POST /v1/runs 异步运行,立即返回 run_id,支持 SSE 事件流
GET /v1/runs/{run_id}/events SSE 事件流(tool.started/completed, agent.output)

落地步骤

第一步:企业微信消息桥接服务

企业微信的 Webhook 回调触发这个桥接服务,将消息转发到 api_server:

# wecom_bridge.py(新增服务)

from aiohttp import web
import aiohttp

API_SERVER_URL = "http://localhost:8000/v1/chat/completions"
API_SERVER_TOKEN = "your-token-here"

async def wecom_webhook(request):
    """企业微信消息回调入口"""
    msg = await request.json()
    user_id = msg.get("FromUserName")       # 发送者
    content = msg.get("Content")             # 文本内容
    chat_id = msg.get("ToUserName")         # 接收者(机器人)

    # 调用 api_server
    async with aiohttp.ClientSession() as sess:
        resp = await sess.post(
            API_SERVER_URL,
            json={
                "messages": [{"role": "user", "content": content}],
                "stream": False
            },
            headers={
                "Authorization": f"Bearer {API_SERVER_TOKEN}",
                "X-Tenant-ID": "wecom",
                "X-User-ID": user_id,
                "X-Chat-ID": chat_id,
            }
        )
        result = await resp.json()

    reply = result["choices"][0]["message"]["content"]

    # 通过企业微信主动推送回复
    await send_wecom_message(user_id, reply)
    return {"errcode": 0}

async def send_wecom_message(to_user, content):
    """调用企业微信应用消息接口"""
    # 实现参见企业微信官方API

app = web.Application()
app.router.add_post("/wecom/webhook", wecom_webhook)
web.run_app(app, host="0.0.0.0", port=8080)

第二步:多租户 Session 隔离

api_server 当前每个请求跑独立 Agent,但 session 混在一起。需要在 header 中提取租户信息:

# 在 api_server.py 的 _handle_chat_completions() 里新增

# 从 header 提取 tenant/user
tenant_id = request.headers.get("X-Tenant-ID", "default")
user_id = request.headers.get("X-User-ID", "default")
chat_id = request.headers.get("X-Chat-ID", "default")

# 构建隔离的 session_key
session_key = f"{tenant_id}:{user_id}:{chat_id}:dm"

第三步:启动桥接服务

# 独立运行桥接服务
python wecom_bridge.py &

# 或用 systemd 管理
# /etc/systemd/system/wecom-bridge.service

架构图

企业微信用户 ──► 企业微信服务器 ──► Webhook ──► wecom_bridge ──► api_server
                                              │                      │
                                              │                      ├──► Agent 实例 1
                                              │                      ├──► Agent 实例 2
                                              │                      │     ...
                                              │                      └──► Agent 实例 10(MAX)
                                              │
                                              └──► 回复路由

优点

  • 改动最小:api_server 已实现线程级并发,只需写桥接服务
  • 不影响现有架构:现有 Gateway 继续跑微信,新服务跑企业微信
  • 水平扩展:加机器加并发,上限高
  • SSE 流式响应:已有的事件流基础设施

缺点

  • 实时性挑战:企业微信 Webhook 是回调模式,响应延迟依赖桥接服务
  • 多租户隔离需开发:session 路由、记忆隔离需要额外实现
  • 独立维护:桥接服务需要自己部署和监控

方案四:Agent 池代码改造

核心问题分析

当前 Gateway 单 Agent 串行的根因在 run.py 第 5307-5314 行:

# 每个 session_key 领取一个 sentinel,表示"这个 session 正在处理"
self._running_agents[_quick_key] = _AGENT_PENDING_SENTINEL
self._running_agents_ts[_quick_key] = time.time()

# 中间大量 await 点(vision、STT、hooks)...

# 真正处理(整个过程是串行的)
_agent_result = await self._handle_message_with_agent(...)

_handle_message_with_agent(第 5568 行)是 async def,但 Gateway 主事件循环是单线程,一次只处理一个 await

SQLite WAL 并发问题

Session store 使用 SQLite WAL 模式,多并发 Agent 会同时写同一个 session.db。WAL 允许多读一写,但写操作需要排他锁:

Agent 1(写)──► SQLite 排他锁 ──► 阻塞其他写
Agent 2(等)──────────────────► 等锁
Agent 3(等)──────────────────► 等锁

改造方案:Session 级信号量

核心思路:
1. 加一个全局信号量控制并发数(如 5)
2. 每个 session_key 有自己的锁(同一 session 消息串行,不同 session 并发)

第一步:修改 Gateway 类初始化

# run.py Gateway.__init__() 里新增

# 全局并发控制信号量(可配置)
self._agent_semaphore = asyncio.Semaphore(5)

# per-session 锁字典(不同 session 不互相阻塞)
self._session_locks: Dict[str, asyncio.Lock] = {}

第二步:改造消息处理入口

# run.py _handle_message() 改造

async def _handle_message(self, event: MessageEvent) -> Optional[str]:
    session_key = self._session_key_for_source(source)

    # 每个 session 有自己的锁(同一 session 串行,不同 session 并发)
    session_lock = self._session_locks.setdefault(session_key, asyncio.Lock())

    # 全局限流:最多 5 个并发 Agent
    async with self._agent_semaphore:
        # session 级串行
        async with session_lock:
            return await self._handle_message_with_agent(
                event, source, _quick_key, run_generation
            )

第三步:解决 SQLite 并发写

# 方案A:写操作加全局锁(简单但影响性能)
self._session_write_lock = asyncio.Lock()

# 方案B:每个并发 slot 用独立 SQLite 文件
# 按 session_key 哈希到不同 DB 文件

# 方案C:改用 PostgreSQL(改动最大但最彻底)

第四步:配置化

# config.yaml
gateway:
  max_concurrent_agents: 5  # 新增配置项
# run.py __init__() 读取配置
max_concurrent = self.config.get("gateway", {}).get("max_concurrent_agents", 5)
self._agent_semaphore = asyncio.Semaphore(max_concurrent)

架构图(改造后)

用户A msg ─┐
用户B msg ─┼─► _agent_semaphore(5) ──► Agent Pool(5个并发slot)
用户C msg ─┤                       │
用户D msg ─┤                       ├──► session_lock_A ──► 同一session串行
用户E msg ─┘                       ├──► session_lock_B
                                   ├──► session_lock_C
                                   └──► ...

优点

  • 改动集中:只需改 run.py 核心逻辑
  • 精确控制:可配置并发数,粒度到 session 级别
  • 复用现有基础设施:Gateway 的 session 管理、memory、hooks 全部复用
  • 上线风险低:不影响现有平台适配器

缺点

  • SQLite 并发瓶颈:写操作需要排他锁,多并发时性能受限
  • 边缘 case 多:session 锁的生命周期管理复杂
  • 单实例瓶颈:受限于单机的 CPU 和内存
  • 测试成本高:核心代码改动,需要完整回归测试

两种方案对比

对比维度 方案二(api_server 桥接) 方案四(Agent 池)
改动范围 新增桥接服务 修改 run.py 核心逻辑
并发粒度 请求级(每个 HTTP 请求独立 Agent) Session 级(可精确控制 5 并发)
已有基础 api_server 已实现线程池 从零写并发控制
最难部分 WeCom 回调实时性 SQLite 并发写
扩展性 加机器就加并发 受限于单实例性能
上线风险 低(不影响现有架构) 中(核心代码改动)
维护成本 中(桥接服务独立维护) 低(复用现有 Gateway)
实时性 依赖 Webhook 回调延迟 Gateway 内实时处理
内存占用 每个请求独立 Agent 实例 共享 Agent 池
代码复用度 低(需要重写消息路由) 高(全部复用)

关键代码位置参考

文件 行号 内容
gateway/run.py 5307-5314 sentinel 领取逻辑(改造点)
gateway/run.py 5568 _handle_message_with_agent 主处理
gateway/run.py 2373 _MAX_CONCURRENT_RUNS(api_server 并发上限)
gateway/run.py 1012-1013 _running_agents 字典初始化
gateway/platforms/api_server.py 2318-2367 _run_agent 线程池执行
gateway/platforms/api_server.py 2438-2448 /v1/runs 端点并发限制

推荐路径

场景 推荐方案
先解决燃眉之急 方案二:微信/企业微信拆开跑两个 Profile(最简单的临时方案)
用户多、想真正并发 方案二 + 桥接服务:api_server 已实现并发,复用成本低
愿意改代码一步到位 方案四:改动集中,但需要处理 SQLite 并发
长期维护 方案二(桥接)+ 方案四(Gateway 改造)结合

相关文档

Left-click: follow link, Right-click: select node, Scroll: zoom
x