企业微信 api_server 并发优化方案(方案二)
文档版本:v1.0
创建日期:2026-05-05
方案类型:技术实施文档
负责人:王嘉成 / 杨彪
状态:待评审
一、技术架构总览
1.1 整体架构图
┌─────────────────────────────────────────────────────────────────────────┐
│ 企业微信用户(10人) │
│ XiaGuoDong BoWeiYa Xiao SiNeiKe ShiHuangZhe ShuGuang │
│ ZhangYuXiaoWanZi LiuWenWei aiden YiYeZhiQiu │
└───────────────────────────────┬─────────────────────────────────────────┘
│ 企业微信回调
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ 企业微信服务器(WXQY Server) │
│ 回调地址:公网 HTTPS │
└───────────────────────────────┬─────────────────────────────────────────┘
│ HTTP POST (JSON)
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ wecom_bridge 桥接服务 │
│ (独立进程,独立部署) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Webhook 接收 │→ │ 消息解析路由 │→ │ 回复推送 │ │
│ │ (aiohttp) │ │ 多租户隔离 │ │ 异步队列 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
│ 端口:8080(内部) / 443(公网 HTTPS 通过 Nginx 反代) │
└───────────────────────────────┬─────────────────────────────────────────┘
│ HTTP POST /v1/chat/completions
│ Header: X-Tenant-ID, X-User-ID, Authorization
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Hermes api_server │
│ (thread pool, _MAX_CONCURRENT_RUNS=10) │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ /v1/chat/completions → _handle_chat_completions │ │
│ │ /v1/runs → _handle_runs │ │
│ │ Session 路由:session_key = f"{tenant_id}:{user_id}" │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────┬─────────────────────────────────────────┘
│ Agent 处理
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Hermes LLM(MiniMax / DeepSeek) │
└───────────────────────────────┬─────────────────────────────────────────┘
│ 回复内容
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ wecom_bridge 回复路由 │
│ 异步推送至企业微信 / 写入 Session 历史至 Redis │
└─────────────────────────────────────────────────────────────────────────┘
1.2 组件清单
| 组件 | 类型 | 职责 | 部署位置 | 依赖 |
|---|---|---|---|---|
wecom_bridge |
Python/FastAPI | 桥接服务,Webhook 接收,消息路由 | 腾讯云重庆一区 | aiohttp, redis, pydantic |
Hermes api_server |
Python | LLM 推理,多租户 session 管理 | 已在运行 | — |
Redis |
缓存 | 多租户 Session 存储,历史消息缓存 | 腾讯云重庆一区 | — |
Nginx |
反向代理 | HTTPS 公网入口,SSL 终结 | 腾讯云重庆一区 | — |
| 企业微信服务器 | 第三方 | 消息回调,消息推送 | 微信官方 | 回调签名验证 |
1.3 数据流向
企业微信用户发消息
↓
WXQY 回调 POST → [公网] → Nginx HTTPS → [内网] → wecom_bridge:8080
↓
解析 FromUserName / Content / MsgType
↓
构建 Session Key: "{tenant_id}:{user_id}"(从 Header 或 Redis 获取)
↓
加载历史 Session → 追加用户消息
↓
HTTP POST /v1/chat/completions
Header: X-Tenant-ID, X-User-ID, Authorization: Bearer {token}
Body: { model, messages, session_key }
↓
api_server thread pool 并发处理(上限10并发)
↓
Agent 推理 → 回复内容
↓
提取 reply.text → 异步 HTTP POST → 企业微信消息推送 API
↓
更新 Session 历史(追加 assistant 回复)→ Redis TTL 刷新
↓
用户收到回复
1.4 与现有 Gateway 的关系
| 维度 | 现有 Gateway(default Profile) | 新桥接服务 |
|---|---|---|
| 定位 | 处理所有平台消息(WeCom/WeChat/其他) | 仅处理 WeCom 消息的并发路由 |
| 部署关系 | 继续运行,不修改 | 独立部署,不影响现有 Gateway |
| 用户影响 | WeCom 用户切换到桥接服务 | WeCom 用户体验到并发改善 |
| 回滚方式 | 无需改动 | 关闭桥接服务 → 恢复原 Gateway 接收即可 |
结论:并行部署,不替代现有 Gateway。 出问题可秒级切回。
1.5 技术选型依据
| 选型 | 选择理由 |
|---|---|
| FastAPI + aiohttp | 异步非阻塞,高并发支持,Python 生态成熟,与 Hermes 一致 |
| Redis Session | 利用现有 Redis,无需引入新组件,支持 TTL 自动清理 |
| 独立进程部署 | 与 Hermes Gateway 解耦,崩溃不影响主服务 |
| 多租户 Header 路由 | 零代码改造 api_server,只在 bridge 层注入 tenant/user 上下文 |
| thread pool(api_server 内置) | 已实现 _MAX_CONCURRENT_RUNS=10,直接复用,无需额外开发 |
二、代码模块详解
2.1 桥接服务主程序(wecom_bridge.py)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
企业微信桥接服务 - WeCom Bridge Service
========================================
负责:Webhook 接收、消息解析、调用 api_server、回复推送
Author: 爱优技术团队
Version: 1.0.0
"""
import asyncio
import hashlib
import hmac
import json
import logging
import os
import time
from dataclasses import dataclass
from typing import Any, Dict, Optional
import aiohttp
import redis.asyncio as redis
import yaml
from aiohttp import web
# ============================================================================
# 配置加载
# ============================================================================
CONFIG_PATH = os.environ.get("WECOM_BRIDGE_CONFIG", "/etc/wecom_bridge/config.yaml")
@dataclass
class WeComConfig:
corp_id: str
corp_secret: str
agent_id: str
token: str # 回调验证 Token
encoding_aes_key: str # AES 加密 Key(可选,本方案不启用加密模式)
api_base_url: str = "https://qyapi.weixin.qq.com"
@dataclass
class BridgeConfig:
wecom: WeComConfig
api_server_url: str # 例如:http://127.0.0.1:8000
api_key: str # 访问 api_server 的 Bearer Token
redis_url: str
redis_session_prefix: str = "wecom:session:"
redis_session_ttl: int = 3600
max_concurrent_requests: int = 10
request_timeout: int = 30
reply_queue_maxsize: int = 1000
def load_config(path: str) -> BridgeConfig:
with open(path) as f:
raw = yaml.safe_load(f)
wecom_cfg = WeComConfig(
corp_id=raw["wecom"]["corp_id"],
corp_secret=raw["wecom"]["corp_secret"],
agent_id=raw["wecom"]["agent_id"],
token=raw["wecom"]["token"],
encoding_aes_key=raw["wecom"].get("encoding_aes_key", ""),
)
return BridgeConfig(
wecom=wecom_cfg,
api_server_url=raw["api_server"]["url"],
api_key=raw["api_server"]["api_key"],
redis_url=raw["redis"]["url"],
)
# ============================================================================
# 日志配置
# ============================================================================
def setup_logging(debug: bool = False):
logging.basicConfig(
level=logging.DEBUG if debug else logging.INFO,
format="[%(asctime)s] %(levelname)-8s %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# ============================================================================
# 企业微信签名验证
# ============================================================================
class WeComSignatureVerifier:
"""企业微信回调签名验证器"""
def __init__(self, token: str):
self.token = token
def verify(self, msg_signature: str, timestamp: str, nonce: str, echostr: str = None, post_data: str = None) -> bool:
"""验证签名是否合法"""
# 将 token、timestamp、nonce、echostr(或 post_data)按字典序排序
sort_list = sorted([self.token, timestamp, nonce, echostr or post_data or ""])
sign_str = "".join(sort_list)
# SHA1 哈希
expected = hashlib.sha1(sign_str.encode()).hexdigest()
# 使用 hmac.compare_digest 防止时序攻击
return hmac.compare_digest(expected, msg_signature)
# ============================================================================
# Session 管理(基于 Redis)
# ============================================================================
class SessionManager:
"""多租户 Session 管理器"""
def __init__(self, redis_url: str, prefix: str, ttl: int):
self.redis_url = redis_url
self.prefix = prefix
self.ttl = ttl
self._redis: Optional[redis.Redis] = None
async def initialize(self):
self._redis = redis.from_url(
self.redis_url,
encoding="utf-8",
decode_responses=True,
max_connections=20,
)
async def close(self):
if self._redis:
await self._redis.close()
def _session_key(self, tenant_id: str, user_id: str) -> str:
return f"{self.prefix}{tenant_id}:{user_id}"
async def get_history(self, tenant_id: str, user_id: str) -> list:
"""获取历史消息"""
key = self._session_key(tenant_id, user_id)
data = await self._redis.get(key)
if data:
return json.loads(data)
return []
async def append_message(self, tenant_id: str, user_id: str, role: str, content: str):
"""追加消息到历史"""
key = self._session_key(tenant_id, user_id)
history = await self.get_history(tenant_id, user_id)
history.append({"role": role, "content": content})
# 保持最近 N 条消息(防止无限膨胀)
MAX_HISTORY = 50
if len(history) > MAX_HISTORY:
history = history[-MAX_HISTORY:]
await self._redis.setex(key, self.ttl, json.dumps(history, ensure_ascii=False))
async def get_tenant_user_from_msg(self, msg: Dict) -> tuple:
"""从消息体提取 tenant_id 和 user_id
企业微信回调消息中:
- FromUserName 是用户的 user_id(UserID)
- ToUserName 是企业的 corp_id 部分
- AgentID 是应用的 agent_id
这里简化处理:tenant_id = corp_id,user_id = FromUserName
"""
corp_id = msg.get("ToUserName", "")
user_id = msg.get("FromUserName", "")
return corp_id, user_id
# ============================================================================
# API Server 客户端
# ============================================================================
class ApiServerClient:
"""调用 Hermes api_server 的客户端"""
def __init__(self, base_url: str, api_key: str, timeout: int):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.timeout = aiohttp.ClientTimeout(total=timeout)
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(timeout=self.timeout)
return self._session
async def chat_completions(
self,
tenant_id: str,
user_id: str,
messages: list,
model: str = "MiniMax-M2.7-highspeed",
) -> Dict[str, Any]:
"""调用 /v1/chat/completions"""
session = await self._get_session()
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
"X-Tenant-ID": tenant_id,
"X-User-ID": user_id,
}
payload = {
"model": model,
"messages": messages,
"stream": False,
}
async with session.post(
f"{self.base_url}/v1/chat/completions",
headers=headers,
json=payload,
) as resp:
if resp.status != 200:
error_text = await resp.text()
logging.error(f"api_server error: {resp.status} {error_text}")
return {"error": f"api_server returned {resp.status}"}
return await resp.json()
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
# ============================================================================
# 企业微信消息推送
# ============================================================================
class WeComReplyClient:
"""推送回复消息到企业微信"""
def __init__(self, config: WeComConfig, redis_url: str):
self.config = config
self._redis_url = redis_url
self._access_token: Optional[str] = None
self._token_expires_at: float = 0
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def _ensure_token(self):
"""获取或刷新 access_token"""
if time.time() < self._token_expires_at - 60:
return
session = await self._get_session()
url = f"{self.config.api_base_url}/cgi-bin/gettoken"
params = {"corpid": self.config.corp_id, "corpsecret": self.config.corp_secret}
async with session.get(url, params=params) as resp:
data = await resp.json()
if data.get("errcode", 0) != 0:
raise RuntimeError(f"Failed to get token: {data}")
self._access_token = data["access_token"]
self._token_expires_at = time.time() + data["expires_in"]
async def send_text(self, to_user: str, content: str, agent_id: str = None):
"""发送文本消息"""
await self._ensure_token()
session = await self._get_session()
url = f"{self.config.api_base_url}/cgi-bin/message/send"
params = {"access_token": self._access_token}
payload = {
"touser": to_user,
"msgtype": "text",
"agentid": agent_id or self.config.agent_id,
"text": {"content": content},
}
async with session.post(url, params=params, json=payload) as resp:
data = await resp.json()
if data.get("errcode", 0) != 0:
logging.error(f"Failed to send message: {data}")
return False
return True
# ============================================================================
# Webhook 处理(核心路由)
# ============================================================================
class WeComWebhookHandler:
"""企业微信回调处理器"""
def __init__(
self,
config: BridgeConfig,
session_mgr: SessionManager,
api_client: ApiServerClient,
reply_client: WeComReplyClient,
verifier: WeComSignatureVerifier,
):
self.config = config
self.session_mgr = session_mgr
self.api_client = api_client
self.reply_client = reply_client
self.verifier = verifier
self._queue: asyncio.Queue = asyncio.Queue(maxsize=config.reply_queue_maxsize)
self._worker_task: Optional[asyncio.Task] = None
async def start(self):
"""启动异步回复 worker"""
self._worker_task = asyncio.create_task(self._reply_worker())
logging.info("回复 worker 已启动")
async def stop(self):
"""停止 worker"""
if self._worker_task:
self._worker_task.cancel()
try:
await self._worker_task
except asyncio.CancelledError:
pass
async def _reply_worker(self):
"""异步回复队列 worker"""
while True:
try:
item = await self._queue.get()
await self.reply_client.send_text(
to_user=item["to_user"],
content=item["content"],
agent_id=item.get("agent_id"),
)
self._queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logging.exception(f"回复 worker 异常: {e}")
async def handle(self, request: web.Request) -> web.Response:
"""处理企业微信回调"""
# ================================================================
# Step 1: 解析消息体
# ================================================================
try:
msg = await request.json()
except Exception:
return web.Response(text="", status=200)
msg_type = msg.get("MsgType", "")
event = msg.get("Event", "")
logging.info(f"收到消息: type={msg_type} event={event} from={msg.get('FromUserName')}")
# ================================================================
# Step 2: URL 验证(首次配置回调地址时,企业微信会发送 GET 请求)
# ================================================================
if request.method == "GET":
echo_str = msg.get("echostr", "")
msg_signature = request.query.get("msg_signature", "")
timestamp = request.query.get("timestamp", "")
nonce = request.query.get("nonce", "")
if self.verifier.verify(msg_signature, timestamp, nonce, echostr=echo_str):
logging.info("URL 验证通过")
return web.Response(text=echo_str)
else:
logging.warning("URL 验证失败")
return web.Response(status=403, text="signature verification failed")
# ================================================================
# Step 3: 消息签名验证
# ================================================================
msg_signature = request.query.get("msg_signature", "")
timestamp = request.query.get("timestamp", "")
nonce = request.query.get("nonce", "")
post_data = await request.text()
if not self.verifier.verify(msg_signature, timestamp, nonce, post_data=post_data):
logging.warning("消息签名验证失败")
return web.Response(status=403, text="signature verification failed")
# ================================================================
# Step 4: 处理不同类型的消息
# ================================================================
if msg_type == "text":
return await self._handle_text(msg)
elif msg_type == "event":
return await self._handle_event(msg)
else:
# 其他类型消息暂不处理,直接返回 success
return web.Response(text="success")
async def _handle_text(self, msg: Dict) -> web.Response:
"""处理文本消息"""
from_user = msg.get("FromUserName", "")
content = msg.get("Content", "").strip()
agent_id = msg.get("AgentID", "")
# 企业微信 corp_id 在 ToUserName 中
corp_id = msg.get("ToUserName", "")
# 获取历史会话
history = await self.session_mgr.get_history(corp_id, from_user)
# 构建消息列表(包含历史)
messages = history + [{"role": "user", "content": content}]
# 立即返回 "success"(企业微信要求 5 秒内响应)
asyncio.create_task(self._process_and_reply(corp_id, from_user, messages, agent_id))
return web.Response(text="success")
async def _handle_event(self, msg: Dict) -> web.Response:
"""处理事件消息"""
event = msg.get("Event", "")
from_user = msg.get("FromUserName", "")
# 关注/取消关注事件
if event == "subscribe":
reply_text = "🎉 感谢关注!有什么可以帮助您的吗?"
await self._queue.put({
"to_user": from_user,
"content": reply_text,
"agent_id": msg.get("AgentID", ""),
})
elif event == "unsubscribe":
logging.info(f"用户 {from_user} 取消关注")
elif event == "click":
# 点击菜单事件
event_key = msg.get("EventKey", "")
reply_text = f"您点击了菜单:{event_key}"
await self._queue.put({
"to_user": from_user,
"content": reply_text,
"agent_id": msg.get("AgentID", ""),
})
return web.Response(text="success")
async def _process_and_reply(
self,
corp_id: str,
from_user: str,
messages: list,
agent_id: str,
):
"""异步处理消息并回复(不阻塞回调)"""
try:
# 保存用户消息到历史
user_content = messages[-1]["content"]
await self.session_mgr.append_message(corp_id, from_user, "user", user_content)
# 追加历史消息(去掉最后一条用户消息,因为要在 api_server 中处理)
history_for_api = messages[:-1]
# 构建发送给 api_server 的消息(含历史)
api_messages = history_for_api + [{"role": "user", "content": user_content}]
# 调用 api_server
result = await self.api_client.chat_completions(
tenant_id=corp_id,
user_id=from_user,
messages=api_messages,
)
if "error" in result:
reply_text = f"⚠️ 处理失败:{result['error']}"
else:
# 解析回复(OpenAI 兼容格式)
choices = result.get("choices", [])
if choices:
reply_text = choices[0].get("message", {}).get("content", "收到")
reply_text = reply_text.strip()
else:
reply_text = "收到"
# 保存助手回复到历史
await self.session_mgr.append_message(corp_id, from_user, "assistant", reply_text)
# 放入回复队列
await self._queue.put({
"to_user": from_user,
"content": reply_text,
"agent_id": agent_id,
})
logging.info(f"回复已加入队列: to={from_user} len={len(reply_text)}")
except asyncio.TimeoutError:
logging.error("api_server 调用超时")
await self._queue.put({
"to_user": from_user,
"content": "⏱️ 处理超时,请稍后再试",
"agent_id": agent_id,
})
except Exception as e:
logging.exception(f"处理消息异常: {e}")
await self._queue.put({
"to_user": from_user,
"content": f"❌ 处理异常:{str(e)[:50]}",
"agent_id": agent_id,
})
# ============================================================================
# 健康检查
# ============================================================================
async def health_check(request: web.Request) -> web.Response:
return web.json_response({"status": "healthy", "service": "wecom_bridge"})
# ============================================================================
# 主程序
# ============================================================================
async def create_app() -> web.Application:
config = load_config(CONFIG_PATH)
setup_logging(debug=os.environ.get("DEBUG"))
logging.info("=" * 60)
logging.info(" 企业微信桥接服务 (WeCom Bridge Service)")
logging.info(f" api_server: {config.api_server_url}")
logging.info(f" redis: {config.redis_url}")
logging.info("=" * 60)
# 初始化组件
session_mgr = SessionManager(
redis_url=config.redis_url,
prefix=config.redis_session_prefix,
ttl=config.redis_session_ttl,
)
await session_mgr.initialize()
api_client = ApiServerClient(
base_url=config.api_server_url,
api_key=config.api_key,
timeout=config.request_timeout,
)
reply_client = WeComReplyClient(config.wecom, config.redis_url)
verifier = WeComSignatureVerifier(config.wecom.token)
handler = WeComWebhookHandler(config, session_mgr, api_client, reply_client, verifier)
await handler.start()
# 构建 app
app = web.Application()
app["handler"] = handler
app["session_mgr"] = session_mgr
app["api_client"] = api_client
app.router.add_get("/wecom/callback", handler.handle) # URL 验证
app.router.add_post("/wecom/callback", handler.handle) # 消息回调
app.router.add_get("/health", health_check)
# 清理函数
async def cleanup(app):
logging.info("关闭桥接服务...")
await handler.stop()
await session_mgr.close()
await api_client.close()
app.on_cleanup.append(cleanup)
return app
def main():
app = create_app()
port = int(os.environ.get("PORT", 8080))
logging.info(f"启动服务,监听端口 {port}")
web.run_app(app, host="0.0.0.0", port=port, access_log=None)
if __name__ == "__main__":
main()
2.2 配置文件(config.yaml)
# /etc/wecom_bridge/config.yaml
wecom:
corp_id: "YOUR_CORP_ID" # 企业微信企业 ID
corp_secret: "YOUR_CORP_SECRET" # 应用密钥
agent_id: "YOUR_AGENT_ID" # 应用 AgentID
token: "YOUR_CALLBACK_TOKEN" # 回调验证 Token(自定义随机字符串)
encoding_aes_key: "" # 回调加密 Key(可选,不启用加密模式可不填)
api_server:
url: "http://127.0.0.1:8000" # Hermes api_server 地址(本地)
api_key: "your-api-server-key" # Bearer Token
redis:
url: "redis://localhost:6379/0" # Redis 地址
# 可通过环境变量覆盖
# WECOM_BRIDGE_CONFIG=/etc/wecom_bridge/config.yaml
# PORT=8080
# DEBUG=true
2.3 多租户 Session 隔离机制
核心逻辑:
session_key = f"wecom:session:{corp_id}:{user_id}"
| 字段 | 来源 | 示例 |
|---|---|---|
corp_id |
企业微信消息 ToUserName | ww1234567890abcdef |
user_id |
企业微信消息 FromUserName | ZhangSan |
session_key |
拼接后 Redis Key | wecom:session:ww1234567890abcdef:ZhangSan |
隔离策略:
- 每个用户有独立的 Redis Key,互不干扰
- TTL 3600 秒,无交互自动过期
- 最近 50 条消息存储,防止内存膨胀
- 异步队列
asyncio.Queue保证回复顺序
三、安全与权限
3.1 企业微信回调签名验证
企业微信回调使用 HMAC-SHA256(实际为 SHA1)签名验证,确保请求来自真实的企业微信服务器:
签名原文字符串 = 字典序排序( Token + Timestamp + Nonce + EchoStr/PostData )
签名 = SHA1( 签名原文 )
验证流程:
- 配置回调 URL 时,企业微信发送 GET 请求(携带
echostr) - 使用 Token + Timestamp + Nonce + Echostr 计算签名,比对
msg_signature - 验证通过后返回解密后的
echostr - 正式消息回调使用 POST,同样需要签名验证
桥接服务配置项:
wecom:
token: "随机字符串(与企业在企业微信后台配置一致)"
encoding_aes_key: "" # 不启用加密模式填空
3.2 API Server 访问认证
桥接服务访问 Hermes api_server 使用 Bearer Token:
# 请求头注入
headers = {
"Authorization": f"Bearer {config.api_key}",
"X-Tenant-ID": corp_id, # 多租户隔离
"X-User-ID": from_user, # 用户隔离
}
Token 配置在 config.yaml 中,不硬编码在代码中:
api_server:
api_key: "your-secure-api-key" # 与 Hermes config.yaml 中的一致
3.3 敏感信息处理
| 敏感字段 | 处理方式 |
|---|---|
企业微信 corp_secret |
存入 config.yaml,不打印日志 |
api_server api_key |
存入 config.yaml,日志中脱敏 |
| 用户消息内容 | 日志仅打印前 50 字符 |
| Session 历史 | 存储在 Redis 中,设置 TTL 自动过期 |
日志脱敏规则:
# 禁止打印的字段
SENSITIVE_KEYS = ["corp_secret", "api_key", "token", "password"]
def mask_sensitive(data: dict) -> dict:
result = {}
for k, v in data.items():
if k in SENSITIVE_KEYS:
result[k] = "***REDACTED***"
elif isinstance(v, str) and len(v) > 100:
result[k] = v[:50] + "..."
else:
result[k] = v
return result
3.4 网络安全
| 防护措施 | 说明 |
|---|---|
| Nginx 反向代理 | 桥接服务仅监听内网 8080,Nginx 对外暴露 443 |
| HTTPS 强制 | 企业微信要求回调地址必须为 HTTPS |
| 最小化端口 | 仅开放 /wecom/callback 和 /health 两个端点 |
| 访问来源限制 | 可在 Nginx 层限制仅允许微信服务器 IP 访问(allow qy.weixin.qq.com;) |
四、风险评估与应对
4.1 风险矩阵
| 风险 | 概率 | 影响 | 等级 | 应对策略 |
|---|---|---|---|---|
| 桥接服务崩溃 | 低 | 高 | 🔴 P0 | 自动重启(systemd)+ 监控告警 + 回滚脚本 |
| api_server 并发打满(超过10) | 中 | 中 | 🟠 P1 | 限流排队 + 告警提前预警 |
| 企业微信回调超时(5秒限制) | 中 | 低 | 🟡 P2 | 立即返回 success + 异步处理 |
| Redis 连接耗尽 | 低 | 中 | 🟠 P1 | 连接池复用(max_connections=20)+ 限流 |
| Session 混乱(同一用户并发请求) | 低 | 中 | 🟠 P1 | Session 粒度锁 + 消息队列串号化同用户消息 |
| 消息重复推送 | 低 | 低 | 🟡 P2 | 企业微信消息 ID 去重(msg_id) |
| Token 过期(access_token 2小时) | 中 | 低 | 🟡 P2 | 自动刷新 + 过期前 60 秒提前更新 |
| LLM 推理超时(30秒) | 低 | 低 | 🟡 P2 | 超时降级返回「处理中」+ 异步队列重试 |
4.2 核心 P0 风险应对
风险 1:桥接服务崩溃
预防:systemd 守护进程,自动重启(Restart=always)
检测:/health 端点监控 + Prometheus 告警
止血:systemctl restart wecom_bridge(30秒内恢复)
回滚:关闭桥接服务 → 恢复原 Gateway WeCom 接收
MTTR:< 1 分钟
风险 2:api_server 并发超限
api_server 的 _MAX_CONCURRENT_RUNS = 10
超过后新请求在 thread pool 队列中等待
预防:桥接服务在调用前检查当前并发数
检测:监控 active_connections > 8(80%阈值)触发告警
应对:
- 并发 8-10:请求排队,延迟但可接受
- 并发 > 10:队列溢出拒绝 + 告警 + 降级提示用户
4.3 降级策略
| 降级级别 | 条件 | 动作 |
|---|---|---|
| L0 - 桥接服务崩溃 | 进程退出 / /health 失败 | 自动重启;若 3 分钟内重启 3 次则触发 P0 告警 |
| L1 - api_server 不可用 | HTTP 503 / 超时 | 返回「服务繁忙,请稍后再试」+ 队列重试 |
| L2 - Redis 不可用 | 连接失败 | 使用内存 Session(单次有效)+ 降级提示 |
| L3 - 完全回滚 | 无法在 5 分钟内恢复 | 一键切回原 Gateway 模式 |
五、快速回滚方案
5.1 回滚触发条件(量化阈值)
| 指标 | 阈值 | 触发动作 |
|---|---|---|
| 错误率(消息处理失败 / 总消息) | ≥ 5%,持续 2 分钟 | P0 → 立即回滚 |
| P99 响应时间 | ≥ 10 秒,持续 3 分钟 | P0 → 立即回滚 |
| 桥接服务 /health 失败 | 连续 3 次健康检查失败 | P0 → 立即回滚 |
| 消息送达率 | < 95%,持续 5 分钟 | P1 → 评估是否回滚 |
| Redis 连接失败 | 持续 1 分钟 | P1 → 评估是否降级 |
5.2 一键回滚脚本
#!/bin/bash
# rollback_wecom_bridge.sh - 企业微信桥接服务一键回滚
#
# 用法: ./rollback_wecom_bridge.sh
#
# 效果:
# 1. 停止 wecom_bridge 服务
# 2. 确保原 Gateway 正常运行
# 3. 验证 WeCom 回调恢复(检查 Nginx 路由)
#
# 回滚后状态:企业微信消息恢复由原 Hermes Gateway 处理
set -e
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
log_info() { echo -e "${GREEN}[INFO] $(date '+%H:%M:%S') $1${NC}"; }
log_warn() { echo -e "${YELLOW}[WARN] $(date '+%H:%M:%S') $1${NC}"; }
log_error() { echo -e "${RED}[ERROR] $(date '+%H:%M:%S') $1${NC}"; }
echo "=========================================="
echo " 企业微信桥接服务 - 一键回滚"
echo "=========================================="
# Step 1: 停止桥接服务
log_info "[1/4] 停止桥接服务..."
if systemctl is-active wecom_bridge &>/dev/null; then
systemctl stop wecom_bridge
log_info "wecom_bridge 已停止"
else
log_warn "wecom_bridge 未运行,跳过停止"
fi
# Step 2: 确认原 Gateway 正在运行
log_info "[2/4] 检查 Hermes Gateway 状态..."
if systemctl is-active hermes_gateway &>/dev/null; then
log_info "Hermes Gateway 运行正常"
else
log_warn "Hermes Gateway 未运行,尝试启动..."
systemctl start hermes_gateway || log_error "Gateway 启动失败,请手动检查"
fi
# Step 3: 检查 Nginx 路由配置
# 若原配置注释了 /wecom/callback 的桥接反代,则无需操作
# 此处检查回调是否重新路由到 Gateway
log_info "[3/4] 检查 Nginx 路由..."
NGINX_CONF="/etc/nginx/conf.d/wecom_bridge.conf"
if [ -f "$NGINX_CONF" ]; then
# 备份并禁用桥接服务的 Nginx 配置
mv "$NGINX_CONF" "${NGINX_CONF}.bak"
log_info "已禁用桥接 Nginx 配置(备份到 ${NGINX_CONF}.bak)"
# 测试 Nginx 配置
nginx -t && systemctl reload nginx
log_info "Nginx 配置已重载"
fi
# Step 4: 验证
log_info "[4/4] 验证 WeCom 回调..."
sleep 3
# 发送测试消息(或通过企业微信手机端发送测试消息)
log_info "请在企业微信中发送测试消息,验证原 Gateway 是否正常响应"
log_info "若正常,回滚完成"
log_info "若异常,请手动检查 Hermes Gateway 日志:journalctl -u hermes_gateway -f"
echo ""
log_info "=========================================="
log_info " 回滚脚本执行完成"
log_info " 桥接服务: 已停止"
log_info " Gateway: 已接管 WeCom"
log_info "=========================================="
使用方式:
# 紧急回滚(需在部署了桥接服务的机器上以 root 执行)
sudo ./rollback_wecom_bridge.sh
5.3 回滚后验证清单
□ wecom_bridge 进程已退出(ps aux | grep wecom_bridge)
□ hermes_gateway 进程运行中(systemctl status hermes_gateway)
□ 企业微信用户发送测试消息,原 Gateway 正常响应
□ 错误日志中无新错误(tail -100 /var/log/hermes/app.log)
□ 监控面板显示 WeCom 消息处理恢复正常
5.4 MTTR 预估
| 场景 | 最快 | 典型 | 目标 |
|---|---|---|---|
| 桥接服务崩溃(自动重启恢复) | 30 秒 | 1 分钟 | < 1 分钟 |
| 手动执行回滚脚本 | 2 分钟 | 3-5 分钟 | < 5 分钟 |
| 完全故障需人工介入 | 5 分钟 | 10-15 分钟 | < 15 分钟 |
六、部门协作与上线计划
6.1 角色与职责
| 角色 | 姓名 | 主要职责 | 联系方式 |
|---|---|---|---|
| 研发负责人 | 王嘉成 | 核心功能开发、架构把控、代码评审 | 18229886512 |
| 研发 | 杨彪 | 功能实现、问题修复 | 17674756108 |
| 运维 | 陈章涛 | 环境部署、发布、监控配置 | 18684580210 |
| 产品/业务 | 夏雨 | 需求确认、验收协调 | 13524603318 |
| 测试 | — | 测试用例编写、功能回归 | — |
6.2 部署步骤
环境准备(Week 1 Day 1-2)
├─ 运维:陈章涛在测试环境部署 wecom_bridge
├─ 运维:配置 config.yaml(corp_id/secret/agent_id/api_key)
├─ 运维:配置 Nginx 反向代理(/wecom/callback → bridge:8080)
└─ 研发:王嘉成确认 api_server 端点可访问
开发完成(Week 1 Day 3-5)
├─ 研发:wecom_bridge 核心功能完成
├─ 研发:单元测试 + 自测
└─ 研发:代码 review(杨彪)
测试验证(Week 2)
├─ 测试:功能测试 + 回归测试
├─ 测试:性能压测(模拟 10 用户并发)
├─ 产品:业务场景验收(夏雨)
└─ 研发:问题修复
灰度发布(Week 3)
├─ Day 1:灰度 2 用户(王嘉成 + 杨彪)
├─ Day 2:灰度 5 用户(+ 3 名测试/运维)
├─ Day 3-5:灰度 10 用户(全部)
└─ 全程:监控错误率、延迟、队列深度
全量上线(Week 4 Day 1)
├─ 运维:陈章涛执行生产部署
├─ 各方:观察生产指标
└─ Day 3:复盘会议
6.3 验收标准
| 类型 | 指标 | 验收方式 |
|---|---|---|
| 功能 | 10 个 WeCom 用户都能正常对话 | 夏雨逐一测试 |
| 功能 | 消息历史正确存储和加载 | 查看 Redis 验证 |
| 性能 | P99 响应时间 < 5 秒 | 监控面板查看 |
| 稳定性 | 连续 72 小时无崩溃 | 监控面板 + 日志 |
| 回滚 | 回滚脚本执行 < 5 分钟 | 演练测试 |
6.4 灰度策略
| 阶段 | 用户 | 时间 | 重点观察 |
|---|---|---|---|
| 阶段一 | 王嘉成、杨彪(2人) | Day 1 | 核心功能、基本流程 |
| 阶段二 | + 陈章涛/刘文伟/张家学(5人) | Day 2-3 | 并发稳定性 |
| 阶段三 | 全部 10 人 | Day 4-5 | 真实场景、压力测试 |
灰度期间错误率 > 5% 或 P99 > 10s,立即暂停灰度并回滚。
6.5 沟通机制
- 每日站会:王嘉成/杨彪/测试/夏雨 同步进度
- 灰度日报:陈章涛每日输出监控摘要至微信群
- 问题升级:P0 级 5 分钟内响应,P1 级 15 分钟内响应
七、运维监控
7.1 核心监控指标
| 指标 | 采集方式 | 正常范围 | 告警阈值(P1) |
|---|---|---|---|
| 桥接服务 /health | 健康检查 | 返回 200 | 连续 3 次失败 |
| 消息处理错误率 | 日志统计 | < 1% | > 5% |
| 消息送达率 | 成功推送数/总消息 | ≥ 99% | < 95% |
| WeCom 回调延迟 P99 | 日志埋点 | < 3s | > 5s |
| api_server 并发数 | 内部计数 | < 8 | > 8(80%) |
| Redis 连接数 | Redis info | < 15 | > 18 |
| 回复队列深度 | asyncio.Queue.qsize() | < 100 | > 500 |
| LLM 推理时间 | 日志埋点 | < 10s(P99) | > 30s |
| 企业微信 access_token | 内部状态 | 有效 | 过期前 5 分钟告警 |
7.2 告警阈值(量化)
| 级别 | 条件 | 通知方式 | 响应时效 |
|---|---|---|---|
| P0 | /health 失败连续 3 次 | 电话 + 微信群 | 5 分钟 |
| P0 | 错误率 > 5% 持续 2 分钟 | 电话 + 微信群 | 5 分钟 |
| P1 | 并发数 > 8 持续 5 分钟 | 微信群 | 15 分钟 |
| P1 | 队列深度 > 500 | 微信群 | 15 分钟 |
| P2 | P99 延迟 > 5s | 微信群 | 1 小时 |
| P2 | Token 即将过期(< 5 分钟) | 微信群 | 1 小时 |
7.3 日志规范与快速定位命令
日志路径: /var/log/wecom_bridge/app.log
日志格式:
[2026-05-05 14:30:01] INFO webhook | 收到消息: type=text from=ZhangSan
[2026-05-05 14:30:01] INFO session | 加载历史: corp=wwxxx user=ZhangSan msgs=5
[2026-05-05 14:30:02] INFO api_client | 调用 api_server: tenant=wwxxx user=ZhangSan
[2026-05-05 14:30:06] INFO reply | 回复已加入队列: to=ZhangSan len=128
[2026-05-05 14:30:06] INFO session | 保存历史: corp=wwxxx user=ZhangSan msgs=6
快速定位命令:
# 检查消息处理错误
grep -E "ERROR|FATAL" /var/log/wecom_bridge/app.log | tail -20
# 检查 api_server 调用失败
grep "api_server error" /var/log/wecom_bridge/app.log
# 检查回复队列堆积
grep "队列深度" /var/log/wecom_bridge/app.log | tail -5
# 检查 WeCom 回调延迟
grep "回调延迟" /var/log/wecom_bridge/app.log | awk '{print $NF}' | sort -rn | head -10
# 实时监控
tail -f /var/log/wecom_bridge/app.log | grep -E "ERROR|WARN"
# 检查消息送达成功率
grep "回复已加入队列" /var/log/wecom_bridge/app.log | wc -l # 成功数
grep "回复失败" /var/log/wecom_bridge/app.log | wc -l # 失败数
7.4 日常巡检清单
每日(09:00):
□ systemctl status wecom_bridge → Running
□ curl -s http://localhost:8080/health → {"status": "healthy"}
□ grep -c "ERROR" /var/log/wecom_bridge/app.log(昨日)→ < 10 条
□ 监控面板:消息送达率 ≥ 99%
□ 监控面板:P99 延迟 < 5s
□ Redis 连接数 < 18
每周(周一 10:00):
□ 日志轮转正常(logrotate)
□ 备份完整性(config.yaml + Session 数据)
□ 监控告警无未处理项
□ SSL 证书有效期 > 30 天
□ 外部依赖可用性(企业微信 API)
7.5 应急响应流程
P0 - 桥接服务崩溃:
1. [0-5min] 收到告警 → 确认 wecom_bridge 进程
2. [5-8min] systemctl restart wecom_bridge
3. [8-10min] 验证 /health 恢复
4. [10-15min] 确认消息处理恢复正常
5. 若重启失败 → 执行 rollback_wecom_bridge.sh
6. 发送故障通报
P1 - 并发打满 / 队列堆积:
1. [0-5min] 查看并发数 + 队列深度
2. [5-15min] 评估是否需要限流或回滚
3. [15-30min] 执行降级或回滚
4. 观察监控恢复情况
7.6 监控工具建议
| 工具 | 用途 | 现状 |
|---|---|---|
journalctl |
查看 wecom_bridge 系统日志 | 直接可用 |
curl /health |
健康检查 | 内置端点 |
Prometheus + Grafana |
指标采集 + 可视化大盘 | 可后续接入 |
| 企业微信群机器人 | 告警通知 | 已配置 |
附录
A. 相关文件路径
| 文件 | 路径 |
|---|---|
| 桥接服务代码 | /opt/wecom_bridge/wecom_bridge.py |
| 配置文件 | /etc/wecom_bridge/config.yaml |
| systemd 服务 | /etc/systemd/system/wecom_bridge.service |
| Nginx 配置 | /etc/nginx/conf.d/wecom_bridge.conf |
| 应用日志 | /var/log/wecom_bridge/app.log |
| 回滚脚本 | /opt/scripts/rollback_wecom_bridge.sh |
| 健康检查 | http://localhost:8080/health |
B. API Server 关键配置
# Hermes api_server 端点(已内置)
POST /v1/chat/completions
Header: Authorization: Bearer {token}
Header: X-Tenant-ID: {corp_id}
Header: X-User-ID: {user_id}
Body: { model, messages, stream }
# api_server 并发上限(已内置)
_MAX_CONCURRENT_RUNS = 10 # gateway/platforms/api_server.py:2373
C. 企业微信 API 参考
| 接口 | 用途 |
|---|---|
GET /cgi-bin/gettoken |
获取 access_token |
POST /cgi-bin/message/send |
推送消息给用户 |
POST /cgi-bin/getcallbackip |
获取企业微信服务器 IP(配置防火墙用) |
文档版本:v1.0 | 2026-05-05 | 待评审