x

企业微信 api_server 并发优化方案(方案二)

文档版本:v1.0
创建日期:2026-05-05
方案类型:技术实施文档
负责人:王嘉成 / 杨彪
状态:待评审


一、技术架构总览

1.1 整体架构图

┌─────────────────────────────────────────────────────────────────────────┐
│                         企业微信用户(10人)                               │
│         XiaGuoDong BoWeiYa Xiao SiNeiKe ShiHuangZhe ShuGuang            │
│         ZhangYuXiaoWanZi LiuWenWei aiden YiYeZhiQiu                     │
└───────────────────────────────┬─────────────────────────────────────────┘
                                │ 企业微信回调
                                ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                    企业微信服务器(WXQY Server)                          │
│                         回调地址:公网 HTTPS                              │
└───────────────────────────────┬─────────────────────────────────────────┘
                                │ HTTP POST (JSON)
                                ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                      wecom_bridge 桥接服务                              │
│                    (独立进程,独立部署)                                   │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐               │
│  │ Webhook 接收  │→ │ 消息解析路由  │→ │ 回复推送     │               │
│  │  (aiohttp)    │  │  多租户隔离   │  │  异步队列    │               │
│  └──────────────┘  └──────────────┘  └──────────────┘               │
│                                                                          │
│  端口:8080(内部)  /  443(公网 HTTPS 通过 Nginx 反代)               │
└───────────────────────────────┬─────────────────────────────────────────┘
                                │ HTTP POST /v1/chat/completions
                                │ Header: X-Tenant-ID, X-User-ID, Authorization
                                ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                      Hermes api_server                                  │
│                  (thread pool, _MAX_CONCURRENT_RUNS=10)                │
│  ┌──────────────────────────────────────────────────────────────────┐  │
│  │  /v1/chat/completions  →  _handle_chat_completions              │  │
│  │  /v1/runs             →  _handle_runs                           │  │
│  │  Session 路由:session_key = f"{tenant_id}:{user_id}"            │  │
│  └──────────────────────────────────────────────────────────────────┘  │
└───────────────────────────────┬─────────────────────────────────────────┘
                                │ Agent 处理
                                ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                    Hermes LLM(MiniMax / DeepSeek)                     │
└───────────────────────────────┬─────────────────────────────────────────┘
                                │ 回复内容
                                ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                      wecom_bridge 回复路由                               │
│            异步推送至企业微信 / 写入 Session 历史至 Redis                  │
└─────────────────────────────────────────────────────────────────────────┘

1.2 组件清单

组件 类型 职责 部署位置 依赖
wecom_bridge Python/FastAPI 桥接服务,Webhook 接收,消息路由 腾讯云重庆一区 aiohttp, redis, pydantic
Hermes api_server Python LLM 推理,多租户 session 管理 已在运行
Redis 缓存 多租户 Session 存储,历史消息缓存 腾讯云重庆一区
Nginx 反向代理 HTTPS 公网入口,SSL 终结 腾讯云重庆一区
企业微信服务器 第三方 消息回调,消息推送 微信官方 回调签名验证

1.3 数据流向

企业微信用户发消息
    ↓
WXQY 回调 POST → [公网] → Nginx HTTPS → [内网] → wecom_bridge:8080
    ↓
解析 FromUserName / Content / MsgType
    ↓
构建 Session Key: "{tenant_id}:{user_id}"(从 Header 或 Redis 获取)
    ↓
加载历史 Session → 追加用户消息
    ↓
HTTP POST /v1/chat/completions
  Header: X-Tenant-ID, X-User-ID, Authorization: Bearer {token}
  Body: { model, messages, session_key }
    ↓
api_server thread pool 并发处理(上限10并发)
    ↓
Agent 推理 → 回复内容
    ↓
提取 reply.text → 异步 HTTP POST → 企业微信消息推送 API
    ↓
更新 Session 历史(追加 assistant 回复)→ Redis TTL 刷新
    ↓
用户收到回复

1.4 与现有 Gateway 的关系

维度 现有 Gateway(default Profile) 新桥接服务
定位 处理所有平台消息(WeCom/WeChat/其他) 仅处理 WeCom 消息的并发路由
部署关系 继续运行,不修改 独立部署,不影响现有 Gateway
用户影响 WeCom 用户切换到桥接服务 WeCom 用户体验到并发改善
回滚方式 无需改动 关闭桥接服务 → 恢复原 Gateway 接收即可

结论:并行部署,不替代现有 Gateway。 出问题可秒级切回。

1.5 技术选型依据

选型 选择理由
FastAPI + aiohttp 异步非阻塞,高并发支持,Python 生态成熟,与 Hermes 一致
Redis Session 利用现有 Redis,无需引入新组件,支持 TTL 自动清理
独立进程部署 与 Hermes Gateway 解耦,崩溃不影响主服务
多租户 Header 路由 零代码改造 api_server,只在 bridge 层注入 tenant/user 上下文
thread pool(api_server 内置) 已实现 _MAX_CONCURRENT_RUNS=10,直接复用,无需额外开发

二、代码模块详解

2.1 桥接服务主程序(wecom_bridge.py)

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
企业微信桥接服务 - WeCom Bridge Service
========================================
负责:Webhook 接收、消息解析、调用 api_server、回复推送

Author: 爱优技术团队
Version: 1.0.0
"""

import asyncio
import hashlib
import hmac
import json
import logging
import os
import time
from dataclasses import dataclass
from typing import Any, Dict, Optional

import aiohttp
import redis.asyncio as redis
import yaml
from aiohttp import web

# ============================================================================
# 配置加载
# ============================================================================
CONFIG_PATH = os.environ.get("WECOM_BRIDGE_CONFIG", "/etc/wecom_bridge/config.yaml")


@dataclass
class WeComConfig:
    corp_id: str
    corp_secret: str
    agent_id: str
    token: str  # 回调验证 Token
    encoding_aes_key: str  # AES 加密 Key(可选,本方案不启用加密模式)
    api_base_url: str = "https://qyapi.weixin.qq.com"


@dataclass
class BridgeConfig:
    wecom: WeComConfig
    api_server_url: str  # 例如:http://127.0.0.1:8000
    api_key: str  # 访问 api_server 的 Bearer Token
    redis_url: str
    redis_session_prefix: str = "wecom:session:"
    redis_session_ttl: int = 3600
    max_concurrent_requests: int = 10
    request_timeout: int = 30
    reply_queue_maxsize: int = 1000


def load_config(path: str) -> BridgeConfig:
    with open(path) as f:
        raw = yaml.safe_load(f)

    wecom_cfg = WeComConfig(
        corp_id=raw["wecom"]["corp_id"],
        corp_secret=raw["wecom"]["corp_secret"],
        agent_id=raw["wecom"]["agent_id"],
        token=raw["wecom"]["token"],
        encoding_aes_key=raw["wecom"].get("encoding_aes_key", ""),
    )
    return BridgeConfig(
        wecom=wecom_cfg,
        api_server_url=raw["api_server"]["url"],
        api_key=raw["api_server"]["api_key"],
        redis_url=raw["redis"]["url"],
    )


# ============================================================================
# 日志配置
# ============================================================================
def setup_logging(debug: bool = False):
    logging.basicConfig(
        level=logging.DEBUG if debug else logging.INFO,
        format="[%(asctime)s] %(levelname)-8s %(name)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )


# ============================================================================
# 企业微信签名验证
# ============================================================================
class WeComSignatureVerifier:
    """企业微信回调签名验证器"""

    def __init__(self, token: str):
        self.token = token

    def verify(self, msg_signature: str, timestamp: str, nonce: str, echostr: str = None, post_data: str = None) -> bool:
        """验证签名是否合法"""
        # 将 token、timestamp、nonce、echostr(或 post_data)按字典序排序
        sort_list = sorted([self.token, timestamp, nonce, echostr or post_data or ""])
        sign_str = "".join(sort_list)

        # SHA1 哈希
        expected = hashlib.sha1(sign_str.encode()).hexdigest()

        # 使用 hmac.compare_digest 防止时序攻击
        return hmac.compare_digest(expected, msg_signature)


# ============================================================================
# Session 管理(基于 Redis)
# ============================================================================
class SessionManager:
    """多租户 Session 管理器"""

    def __init__(self, redis_url: str, prefix: str, ttl: int):
        self.redis_url = redis_url
        self.prefix = prefix
        self.ttl = ttl
        self._redis: Optional[redis.Redis] = None

    async def initialize(self):
        self._redis = redis.from_url(
            self.redis_url,
            encoding="utf-8",
            decode_responses=True,
            max_connections=20,
        )

    async def close(self):
        if self._redis:
            await self._redis.close()

    def _session_key(self, tenant_id: str, user_id: str) -> str:
        return f"{self.prefix}{tenant_id}:{user_id}"

    async def get_history(self, tenant_id: str, user_id: str) -> list:
        """获取历史消息"""
        key = self._session_key(tenant_id, user_id)
        data = await self._redis.get(key)
        if data:
            return json.loads(data)
        return []

    async def append_message(self, tenant_id: str, user_id: str, role: str, content: str):
        """追加消息到历史"""
        key = self._session_key(tenant_id, user_id)
        history = await self.get_history(tenant_id, user_id)
        history.append({"role": role, "content": content})

        # 保持最近 N 条消息(防止无限膨胀)
        MAX_HISTORY = 50
        if len(history) > MAX_HISTORY:
            history = history[-MAX_HISTORY:]

        await self._redis.setex(key, self.ttl, json.dumps(history, ensure_ascii=False))

    async def get_tenant_user_from_msg(self, msg: Dict) -> tuple:
        """从消息体提取 tenant_id 和 user_id

        企业微信回调消息中:
        - FromUserName 是用户的 user_id(UserID)
        - ToUserName 是企业的 corp_id 部分
        - AgentID 是应用的 agent_id

        这里简化处理:tenant_id = corp_id,user_id = FromUserName
        """
        corp_id = msg.get("ToUserName", "")
        user_id = msg.get("FromUserName", "")
        return corp_id, user_id


# ============================================================================
# API Server 客户端
# ============================================================================
class ApiServerClient:
    """调用 Hermes api_server 的客户端"""

    def __init__(self, base_url: str, api_key: str, timeout: int):
        self.base_url = base_url.rstrip("/")
        self.api_key = api_key
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self._session: Optional[aiohttp.ClientSession] = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession(timeout=self.timeout)
        return self._session

    async def chat_completions(
        self,
        tenant_id: str,
        user_id: str,
        messages: list,
        model: str = "MiniMax-M2.7-highspeed",
    ) -> Dict[str, Any]:
        """调用 /v1/chat/completions"""
        session = await self._get_session()

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}",
            "X-Tenant-ID": tenant_id,
            "X-User-ID": user_id,
        }

        payload = {
            "model": model,
            "messages": messages,
            "stream": False,
        }

        async with session.post(
            f"{self.base_url}/v1/chat/completions",
            headers=headers,
            json=payload,
        ) as resp:
            if resp.status != 200:
                error_text = await resp.text()
                logging.error(f"api_server error: {resp.status} {error_text}")
                return {"error": f"api_server returned {resp.status}"}

            return await resp.json()

    async def close(self):
        if self._session and not self._session.closed:
            await self._session.close()


# ============================================================================
# 企业微信消息推送
# ============================================================================
class WeComReplyClient:
    """推送回复消息到企业微信"""

    def __init__(self, config: WeComConfig, redis_url: str):
        self.config = config
        self._redis_url = redis_url
        self._access_token: Optional[str] = None
        self._token_expires_at: float = 0
        self._session: Optional[aiohttp.ClientSession] = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()
        return self._session

    async def _ensure_token(self):
        """获取或刷新 access_token"""
        if time.time() < self._token_expires_at - 60:
            return

        session = await self._get_session()
        url = f"{self.config.api_base_url}/cgi-bin/gettoken"
        params = {"corpid": self.config.corp_id, "corpsecret": self.config.corp_secret}

        async with session.get(url, params=params) as resp:
            data = await resp.json()
            if data.get("errcode", 0) != 0:
                raise RuntimeError(f"Failed to get token: {data}")
            self._access_token = data["access_token"]
            self._token_expires_at = time.time() + data["expires_in"]

    async def send_text(self, to_user: str, content: str, agent_id: str = None):
        """发送文本消息"""
        await self._ensure_token()
        session = await self._get_session()

        url = f"{self.config.api_base_url}/cgi-bin/message/send"
        params = {"access_token": self._access_token}

        payload = {
            "touser": to_user,
            "msgtype": "text",
            "agentid": agent_id or self.config.agent_id,
            "text": {"content": content},
        }

        async with session.post(url, params=params, json=payload) as resp:
            data = await resp.json()
            if data.get("errcode", 0) != 0:
                logging.error(f"Failed to send message: {data}")
                return False
            return True


# ============================================================================
# Webhook 处理(核心路由)
# ============================================================================
class WeComWebhookHandler:
    """企业微信回调处理器"""

    def __init__(
        self,
        config: BridgeConfig,
        session_mgr: SessionManager,
        api_client: ApiServerClient,
        reply_client: WeComReplyClient,
        verifier: WeComSignatureVerifier,
    ):
        self.config = config
        self.session_mgr = session_mgr
        self.api_client = api_client
        self.reply_client = reply_client
        self.verifier = verifier
        self._queue: asyncio.Queue = asyncio.Queue(maxsize=config.reply_queue_maxsize)
        self._worker_task: Optional[asyncio.Task] = None

    async def start(self):
        """启动异步回复 worker"""
        self._worker_task = asyncio.create_task(self._reply_worker())
        logging.info("回复 worker 已启动")

    async def stop(self):
        """停止 worker"""
        if self._worker_task:
            self._worker_task.cancel()
            try:
                await self._worker_task
            except asyncio.CancelledError:
                pass

    async def _reply_worker(self):
        """异步回复队列 worker"""
        while True:
            try:
                item = await self._queue.get()
                await self.reply_client.send_text(
                    to_user=item["to_user"],
                    content=item["content"],
                    agent_id=item.get("agent_id"),
                )
                self._queue.task_done()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logging.exception(f"回复 worker 异常: {e}")

    async def handle(self, request: web.Request) -> web.Response:
        """处理企业微信回调"""
        # ================================================================
        # Step 1: 解析消息体
        # ================================================================
        try:
            msg = await request.json()
        except Exception:
            return web.Response(text="", status=200)

        msg_type = msg.get("MsgType", "")
        event = msg.get("Event", "")

        logging.info(f"收到消息: type={msg_type} event={event} from={msg.get('FromUserName')}")

        # ================================================================
        # Step 2: URL 验证(首次配置回调地址时,企业微信会发送 GET 请求)
        # ================================================================
        if request.method == "GET":
            echo_str = msg.get("echostr", "")
            msg_signature = request.query.get("msg_signature", "")
            timestamp = request.query.get("timestamp", "")
            nonce = request.query.get("nonce", "")

            if self.verifier.verify(msg_signature, timestamp, nonce, echostr=echo_str):
                logging.info("URL 验证通过")
                return web.Response(text=echo_str)
            else:
                logging.warning("URL 验证失败")
                return web.Response(status=403, text="signature verification failed")

        # ================================================================
        # Step 3: 消息签名验证
        # ================================================================
        msg_signature = request.query.get("msg_signature", "")
        timestamp = request.query.get("timestamp", "")
        nonce = request.query.get("nonce", "")
        post_data = await request.text()

        if not self.verifier.verify(msg_signature, timestamp, nonce, post_data=post_data):
            logging.warning("消息签名验证失败")
            return web.Response(status=403, text="signature verification failed")

        # ================================================================
        # Step 4: 处理不同类型的消息
        # ================================================================
        if msg_type == "text":
            return await self._handle_text(msg)
        elif msg_type == "event":
            return await self._handle_event(msg)
        else:
            # 其他类型消息暂不处理,直接返回 success
            return web.Response(text="success")

    async def _handle_text(self, msg: Dict) -> web.Response:
        """处理文本消息"""
        from_user = msg.get("FromUserName", "")
        content = msg.get("Content", "").strip()
        agent_id = msg.get("AgentID", "")

        # 企业微信 corp_id 在 ToUserName 中
        corp_id = msg.get("ToUserName", "")

        # 获取历史会话
        history = await self.session_mgr.get_history(corp_id, from_user)

        # 构建消息列表(包含历史)
        messages = history + [{"role": "user", "content": content}]

        # 立即返回 "success"(企业微信要求 5 秒内响应)
        asyncio.create_task(self._process_and_reply(corp_id, from_user, messages, agent_id))

        return web.Response(text="success")

    async def _handle_event(self, msg: Dict) -> web.Response:
        """处理事件消息"""
        event = msg.get("Event", "")
        from_user = msg.get("FromUserName", "")

        # 关注/取消关注事件
        if event == "subscribe":
            reply_text = "🎉 感谢关注!有什么可以帮助您的吗?"
            await self._queue.put({
                "to_user": from_user,
                "content": reply_text,
                "agent_id": msg.get("AgentID", ""),
            })
        elif event == "unsubscribe":
            logging.info(f"用户 {from_user} 取消关注")
        elif event == "click":
            # 点击菜单事件
            event_key = msg.get("EventKey", "")
            reply_text = f"您点击了菜单:{event_key}"
            await self._queue.put({
                "to_user": from_user,
                "content": reply_text,
                "agent_id": msg.get("AgentID", ""),
            })

        return web.Response(text="success")

    async def _process_and_reply(
        self,
        corp_id: str,
        from_user: str,
        messages: list,
        agent_id: str,
    ):
        """异步处理消息并回复(不阻塞回调)"""
        try:
            # 保存用户消息到历史
            user_content = messages[-1]["content"]
            await self.session_mgr.append_message(corp_id, from_user, "user", user_content)

            # 追加历史消息(去掉最后一条用户消息,因为要在 api_server 中处理)
            history_for_api = messages[:-1]

            # 构建发送给 api_server 的消息(含历史)
            api_messages = history_for_api + [{"role": "user", "content": user_content}]

            # 调用 api_server
            result = await self.api_client.chat_completions(
                tenant_id=corp_id,
                user_id=from_user,
                messages=api_messages,
            )

            if "error" in result:
                reply_text = f"⚠️ 处理失败:{result['error']}"
            else:
                # 解析回复(OpenAI 兼容格式)
                choices = result.get("choices", [])
                if choices:
                    reply_text = choices[0].get("message", {}).get("content", "收到")
                    reply_text = reply_text.strip()
                else:
                    reply_text = "收到"

            # 保存助手回复到历史
            await self.session_mgr.append_message(corp_id, from_user, "assistant", reply_text)

            # 放入回复队列
            await self._queue.put({
                "to_user": from_user,
                "content": reply_text,
                "agent_id": agent_id,
            })

            logging.info(f"回复已加入队列: to={from_user} len={len(reply_text)}")

        except asyncio.TimeoutError:
            logging.error("api_server 调用超时")
            await self._queue.put({
                "to_user": from_user,
                "content": "⏱️ 处理超时,请稍后再试",
                "agent_id": agent_id,
            })
        except Exception as e:
            logging.exception(f"处理消息异常: {e}")
            await self._queue.put({
                "to_user": from_user,
                "content": f"❌ 处理异常:{str(e)[:50]}",
                "agent_id": agent_id,
            })


# ============================================================================
# 健康检查
# ============================================================================
async def health_check(request: web.Request) -> web.Response:
    return web.json_response({"status": "healthy", "service": "wecom_bridge"})


# ============================================================================
# 主程序
# ============================================================================
async def create_app() -> web.Application:
    config = load_config(CONFIG_PATH)
    setup_logging(debug=os.environ.get("DEBUG"))

    logging.info("=" * 60)
    logging.info("  企业微信桥接服务 (WeCom Bridge Service)")
    logging.info(f"  api_server: {config.api_server_url}")
    logging.info(f"  redis: {config.redis_url}")
    logging.info("=" * 60)

    # 初始化组件
    session_mgr = SessionManager(
        redis_url=config.redis_url,
        prefix=config.redis_session_prefix,
        ttl=config.redis_session_ttl,
    )
    await session_mgr.initialize()

    api_client = ApiServerClient(
        base_url=config.api_server_url,
        api_key=config.api_key,
        timeout=config.request_timeout,
    )

    reply_client = WeComReplyClient(config.wecom, config.redis_url)

    verifier = WeComSignatureVerifier(config.wecom.token)

    handler = WeComWebhookHandler(config, session_mgr, api_client, reply_client, verifier)
    await handler.start()

    # 构建 app
    app = web.Application()
    app["handler"] = handler
    app["session_mgr"] = session_mgr
    app["api_client"] = api_client

    app.router.add_get("/wecom/callback", handler.handle)       # URL 验证
    app.router.add_post("/wecom/callback", handler.handle)      # 消息回调
    app.router.add_get("/health", health_check)

    # 清理函数
    async def cleanup(app):
        logging.info("关闭桥接服务...")
        await handler.stop()
        await session_mgr.close()
        await api_client.close()

    app.on_cleanup.append(cleanup)

    return app


def main():
    app = create_app()
    port = int(os.environ.get("PORT", 8080))
    logging.info(f"启动服务,监听端口 {port}")
    web.run_app(app, host="0.0.0.0", port=port, access_log=None)


if __name__ == "__main__":
    main()

2.2 配置文件(config.yaml)

# /etc/wecom_bridge/config.yaml

wecom:
  corp_id: "YOUR_CORP_ID"           # 企业微信企业 ID
  corp_secret: "YOUR_CORP_SECRET"   # 应用密钥
  agent_id: "YOUR_AGENT_ID"         # 应用 AgentID
  token: "YOUR_CALLBACK_TOKEN"      # 回调验证 Token(自定义随机字符串)
  encoding_aes_key: ""              # 回调加密 Key(可选,不启用加密模式可不填)

api_server:
  url: "http://127.0.0.1:8000"     # Hermes api_server 地址(本地)
  api_key: "your-api-server-key"   # Bearer Token

redis:
  url: "redis://localhost:6379/0"   # Redis 地址

# 可通过环境变量覆盖
# WECOM_BRIDGE_CONFIG=/etc/wecom_bridge/config.yaml
# PORT=8080
# DEBUG=true

2.3 多租户 Session 隔离机制

核心逻辑:

session_key = f"wecom:session:{corp_id}:{user_id}"
字段 来源 示例
corp_id 企业微信消息 ToUserName ww1234567890abcdef
user_id 企业微信消息 FromUserName ZhangSan
session_key 拼接后 Redis Key wecom:session:ww1234567890abcdef:ZhangSan

隔离策略:

  • 每个用户有独立的 Redis Key,互不干扰
  • TTL 3600 秒,无交互自动过期
  • 最近 50 条消息存储,防止内存膨胀
  • 异步队列 asyncio.Queue 保证回复顺序

三、安全与权限

3.1 企业微信回调签名验证

企业微信回调使用 HMAC-SHA256(实际为 SHA1)签名验证,确保请求来自真实的企业微信服务器:

签名原文字符串 = 字典序排序( Token + Timestamp + Nonce + EchoStr/PostData )
签名 = SHA1( 签名原文 )

验证流程:

  1. 配置回调 URL 时,企业微信发送 GET 请求(携带 echostr
  2. 使用 Token + Timestamp + Nonce + Echostr 计算签名,比对 msg_signature
  3. 验证通过后返回解密后的 echostr
  4. 正式消息回调使用 POST,同样需要签名验证

桥接服务配置项:

wecom:
  token: "随机字符串(与企业在企业微信后台配置一致)"
  encoding_aes_key: ""  # 不启用加密模式填空

3.2 API Server 访问认证

桥接服务访问 Hermes api_server 使用 Bearer Token

# 请求头注入
headers = {
    "Authorization": f"Bearer {config.api_key}",
    "X-Tenant-ID": corp_id,      # 多租户隔离
    "X-User-ID": from_user,      # 用户隔离
}

Token 配置在 config.yaml 中,不硬编码在代码中

api_server:
  api_key: "your-secure-api-key"  # 与 Hermes config.yaml 中的一致

3.3 敏感信息处理

敏感字段 处理方式
企业微信 corp_secret 存入 config.yaml,不打印日志
api_server api_key 存入 config.yaml,日志中脱敏
用户消息内容 日志仅打印前 50 字符
Session 历史 存储在 Redis 中,设置 TTL 自动过期

日志脱敏规则:

# 禁止打印的字段
SENSITIVE_KEYS = ["corp_secret", "api_key", "token", "password"]

def mask_sensitive(data: dict) -> dict:
    result = {}
    for k, v in data.items():
        if k in SENSITIVE_KEYS:
            result[k] = "***REDACTED***"
        elif isinstance(v, str) and len(v) > 100:
            result[k] = v[:50] + "..."
        else:
            result[k] = v
    return result

3.4 网络安全

防护措施 说明
Nginx 反向代理 桥接服务仅监听内网 8080,Nginx 对外暴露 443
HTTPS 强制 企业微信要求回调地址必须为 HTTPS
最小化端口 仅开放 /wecom/callback/health 两个端点
访问来源限制 可在 Nginx 层限制仅允许微信服务器 IP 访问(allow qy.weixin.qq.com;

四、风险评估与应对

4.1 风险矩阵

风险 概率 影响 等级 应对策略
桥接服务崩溃 🔴 P0 自动重启(systemd)+ 监控告警 + 回滚脚本
api_server 并发打满(超过10) 🟠 P1 限流排队 + 告警提前预警
企业微信回调超时(5秒限制) 🟡 P2 立即返回 success + 异步处理
Redis 连接耗尽 🟠 P1 连接池复用(max_connections=20)+ 限流
Session 混乱(同一用户并发请求) 🟠 P1 Session 粒度锁 + 消息队列串号化同用户消息
消息重复推送 🟡 P2 企业微信消息 ID 去重(msg_id)
Token 过期(access_token 2小时) 🟡 P2 自动刷新 + 过期前 60 秒提前更新
LLM 推理超时(30秒) 🟡 P2 超时降级返回「处理中」+ 异步队列重试

4.2 核心 P0 风险应对

风险 1:桥接服务崩溃

预防:systemd 守护进程,自动重启(Restart=always)
检测:/health 端点监控 + Prometheus 告警
止血:systemctl restart wecom_bridge(30秒内恢复)
回滚:关闭桥接服务 → 恢复原 Gateway WeCom 接收
MTTR:< 1 分钟

风险 2:api_server 并发超限

api_server 的 _MAX_CONCURRENT_RUNS = 10
超过后新请求在 thread pool 队列中等待

预防:桥接服务在调用前检查当前并发数
检测:监控 active_connections > 8(80%阈值)触发告警
应对:
  - 并发 8-10:请求排队,延迟但可接受
  - 并发 > 10:队列溢出拒绝 + 告警 + 降级提示用户

4.3 降级策略

降级级别 条件 动作
L0 - 桥接服务崩溃 进程退出 / /health 失败 自动重启;若 3 分钟内重启 3 次则触发 P0 告警
L1 - api_server 不可用 HTTP 503 / 超时 返回「服务繁忙,请稍后再试」+ 队列重试
L2 - Redis 不可用 连接失败 使用内存 Session(单次有效)+ 降级提示
L3 - 完全回滚 无法在 5 分钟内恢复 一键切回原 Gateway 模式

五、快速回滚方案

5.1 回滚触发条件(量化阈值)

指标 阈值 触发动作
错误率(消息处理失败 / 总消息) ≥ 5%,持续 2 分钟 P0 → 立即回滚
P99 响应时间 ≥ 10 秒,持续 3 分钟 P0 → 立即回滚
桥接服务 /health 失败 连续 3 次健康检查失败 P0 → 立即回滚
消息送达率 < 95%,持续 5 分钟 P1 → 评估是否回滚
Redis 连接失败 持续 1 分钟 P1 → 评估是否降级

5.2 一键回滚脚本

#!/bin/bash
# rollback_wecom_bridge.sh - 企业微信桥接服务一键回滚
#
# 用法: ./rollback_wecom_bridge.sh
#
# 效果:
#   1. 停止 wecom_bridge 服务
#   2. 确保原 Gateway 正常运行
#   3. 验证 WeCom 回调恢复(检查 Nginx 路由)
#
# 回滚后状态:企业微信消息恢复由原 Hermes Gateway 处理

set -e

RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'

log_info()  { echo -e "${GREEN}[INFO] $(date '+%H:%M:%S') $1${NC}"; }
log_warn()  { echo -e "${YELLOW}[WARN] $(date '+%H:%M:%S') $1${NC}"; }
log_error() { echo -e "${RED}[ERROR] $(date '+%H:%M:%S') $1${NC}"; }

echo "=========================================="
echo "  企业微信桥接服务 - 一键回滚"
echo "=========================================="

# Step 1: 停止桥接服务
log_info "[1/4] 停止桥接服务..."
if systemctl is-active wecom_bridge &>/dev/null; then
    systemctl stop wecom_bridge
    log_info "wecom_bridge 已停止"
else
    log_warn "wecom_bridge 未运行,跳过停止"
fi

# Step 2: 确认原 Gateway 正在运行
log_info "[2/4] 检查 Hermes Gateway 状态..."
if systemctl is-active hermes_gateway &>/dev/null; then
    log_info "Hermes Gateway 运行正常"
else
    log_warn "Hermes Gateway 未运行,尝试启动..."
    systemctl start hermes_gateway || log_error "Gateway 启动失败,请手动检查"
fi

# Step 3: 检查 Nginx 路由配置
# 若原配置注释了 /wecom/callback 的桥接反代,则无需操作
# 此处检查回调是否重新路由到 Gateway
log_info "[3/4] 检查 Nginx 路由..."
NGINX_CONF="/etc/nginx/conf.d/wecom_bridge.conf"
if [ -f "$NGINX_CONF" ]; then
    # 备份并禁用桥接服务的 Nginx 配置
    mv "$NGINX_CONF" "${NGINX_CONF}.bak"
    log_info "已禁用桥接 Nginx 配置(备份到 ${NGINX_CONF}.bak)"

    # 测试 Nginx 配置
    nginx -t && systemctl reload nginx
    log_info "Nginx 配置已重载"
fi

# Step 4: 验证
log_info "[4/4] 验证 WeCom 回调..."
sleep 3

# 发送测试消息(或通过企业微信手机端发送测试消息)
log_info "请在企业微信中发送测试消息,验证原 Gateway 是否正常响应"
log_info "若正常,回滚完成"
log_info "若异常,请手动检查 Hermes Gateway 日志:journalctl -u hermes_gateway -f"

echo ""
log_info "=========================================="
log_info "  回滚脚本执行完成"
log_info "  桥接服务: 已停止"
log_info "  Gateway:  已接管 WeCom"
log_info "=========================================="

使用方式:

# 紧急回滚(需在部署了桥接服务的机器上以 root 执行)
sudo ./rollback_wecom_bridge.sh

5.3 回滚后验证清单

 wecom_bridge 进程已退出(ps aux | grep wecom_bridge
 hermes_gateway 进程运行中(systemctl status hermes_gateway
 企业微信用户发送测试消息,原 Gateway 正常响应
 错误日志中无新错误(tail -100 /var/log/hermes/app.log
 监控面板显示 WeCom 消息处理恢复正常

5.4 MTTR 预估

场景 最快 典型 目标
桥接服务崩溃(自动重启恢复) 30 秒 1 分钟 < 1 分钟
手动执行回滚脚本 2 分钟 3-5 分钟 < 5 分钟
完全故障需人工介入 5 分钟 10-15 分钟 < 15 分钟

六、部门协作与上线计划

6.1 角色与职责

角色 姓名 主要职责 联系方式
研发负责人 王嘉成 核心功能开发、架构把控、代码评审 18229886512
研发 杨彪 功能实现、问题修复 17674756108
运维 陈章涛 环境部署、发布、监控配置 18684580210
产品/业务 夏雨 需求确认、验收协调 13524603318
测试 测试用例编写、功能回归

6.2 部署步骤

环境准备(Week 1 Day 1-2)
  ├─ 运维:陈章涛在测试环境部署 wecom_bridge
  ├─ 运维:配置 config.yaml(corp_id/secret/agent_id/api_key)
  ├─ 运维:配置 Nginx 反向代理(/wecom/callback → bridge:8080)
  └─ 研发:王嘉成确认 api_server 端点可访问

开发完成(Week 1 Day 3-5)
  ├─ 研发:wecom_bridge 核心功能完成
  ├─ 研发:单元测试 + 自测
  └─ 研发:代码 review(杨彪)

测试验证(Week 2)
  ├─ 测试:功能测试 + 回归测试
  ├─ 测试:性能压测(模拟 10 用户并发)
  ├─ 产品:业务场景验收(夏雨)
  └─ 研发:问题修复

灰度发布(Week 3)
  ├─ Day 1:灰度 2 用户(王嘉成 + 杨彪)
  ├─ Day 2:灰度 5 用户(+ 3 名测试/运维)
  ├─ Day 3-5:灰度 10 用户(全部)
  └─ 全程:监控错误率、延迟、队列深度

全量上线(Week 4 Day 1)
  ├─ 运维:陈章涛执行生产部署
  ├─ 各方:观察生产指标
  └─ Day 3:复盘会议

6.3 验收标准

类型 指标 验收方式
功能 10 个 WeCom 用户都能正常对话 夏雨逐一测试
功能 消息历史正确存储和加载 查看 Redis 验证
性能 P99 响应时间 < 5 秒 监控面板查看
稳定性 连续 72 小时无崩溃 监控面板 + 日志
回滚 回滚脚本执行 < 5 分钟 演练测试

6.4 灰度策略

阶段 用户 时间 重点观察
阶段一 王嘉成、杨彪(2人) Day 1 核心功能、基本流程
阶段二 + 陈章涛/刘文伟/张家学(5人) Day 2-3 并发稳定性
阶段三 全部 10 人 Day 4-5 真实场景、压力测试

灰度期间错误率 > 5% 或 P99 > 10s,立即暂停灰度并回滚。

6.5 沟通机制

  • 每日站会:王嘉成/杨彪/测试/夏雨 同步进度
  • 灰度日报:陈章涛每日输出监控摘要至微信群
  • 问题升级:P0 级 5 分钟内响应,P1 级 15 分钟内响应

七、运维监控

7.1 核心监控指标

指标 采集方式 正常范围 告警阈值(P1)
桥接服务 /health 健康检查 返回 200 连续 3 次失败
消息处理错误率 日志统计 < 1% > 5%
消息送达率 成功推送数/总消息 ≥ 99% < 95%
WeCom 回调延迟 P99 日志埋点 < 3s > 5s
api_server 并发数 内部计数 < 8 > 8(80%)
Redis 连接数 Redis info < 15 > 18
回复队列深度 asyncio.Queue.qsize() < 100 > 500
LLM 推理时间 日志埋点 < 10s(P99) > 30s
企业微信 access_token 内部状态 有效 过期前 5 分钟告警

7.2 告警阈值(量化)

级别 条件 通知方式 响应时效
P0 /health 失败连续 3 次 电话 + 微信群 5 分钟
P0 错误率 > 5% 持续 2 分钟 电话 + 微信群 5 分钟
P1 并发数 > 8 持续 5 分钟 微信群 15 分钟
P1 队列深度 > 500 微信群 15 分钟
P2 P99 延迟 > 5s 微信群 1 小时
P2 Token 即将过期(< 5 分钟) 微信群 1 小时

7.3 日志规范与快速定位命令

日志路径: /var/log/wecom_bridge/app.log

日志格式:

[2026-05-05 14:30:01] INFO     webhook     | 收到消息: type=text from=ZhangSan
[2026-05-05 14:30:01] INFO     session    | 加载历史: corp=wwxxx user=ZhangSan msgs=5
[2026-05-05 14:30:02] INFO     api_client | 调用 api_server: tenant=wwxxx user=ZhangSan
[2026-05-05 14:30:06] INFO     reply      | 回复已加入队列: to=ZhangSan len=128
[2026-05-05 14:30:06] INFO     session    | 保存历史: corp=wwxxx user=ZhangSan msgs=6

快速定位命令:

# 检查消息处理错误
grep -E "ERROR|FATAL" /var/log/wecom_bridge/app.log | tail -20

# 检查 api_server 调用失败
grep "api_server error" /var/log/wecom_bridge/app.log

# 检查回复队列堆积
grep "队列深度" /var/log/wecom_bridge/app.log | tail -5

# 检查 WeCom 回调延迟
grep "回调延迟" /var/log/wecom_bridge/app.log | awk '{print $NF}' | sort -rn | head -10

# 实时监控
tail -f /var/log/wecom_bridge/app.log | grep -E "ERROR|WARN"

# 检查消息送达成功率
grep "回复已加入队列" /var/log/wecom_bridge/app.log | wc -l  # 成功数
grep "回复失败" /var/log/wecom_bridge/app.log | wc -l        # 失败数

7.4 日常巡检清单

每日(09:00):

 systemctl status wecom_bridge    Running
 curl -s http://localhost:8080/health    {"status": "healthy"}
 grep -c "ERROR" /var/log/wecom_bridge/app.log(昨日)→ < 10 
 监控面板:消息送达率  99%
 监控面板:P99 延迟 < 5s
 Redis 连接数 < 18

每周(周一 10:00):

 日志轮转正常(logrotate)
□ 备份完整性(config.yaml + Session 数据)
□ 监控告警无未处理项
□ SSL 证书有效期 > 30 天
□ 外部依赖可用性(企业微信 API

7.5 应急响应流程

P0 - 桥接服务崩溃:

1. [0-5min] 收到告警  确认 wecom_bridge 进程
2. [5-8min] systemctl restart wecom_bridge
3. [8-10min] 验证 /health 恢复
4. [10-15min] 确认消息处理恢复正常
5. 若重启失败  执行 rollback_wecom_bridge.sh
6. 发送故障通报

P1 - 并发打满 / 队列堆积:

1. [0-5min] 查看并发数 + 队列深度
2. [5-15min] 评估是否需要限流或回滚
3. [15-30min] 执行降级或回滚
4. 观察监控恢复情况

7.6 监控工具建议

工具 用途 现状
journalctl 查看 wecom_bridge 系统日志 直接可用
curl /health 健康检查 内置端点
Prometheus + Grafana 指标采集 + 可视化大盘 可后续接入
企业微信群机器人 告警通知 已配置

附录

A. 相关文件路径

文件 路径
桥接服务代码 /opt/wecom_bridge/wecom_bridge.py
配置文件 /etc/wecom_bridge/config.yaml
systemd 服务 /etc/systemd/system/wecom_bridge.service
Nginx 配置 /etc/nginx/conf.d/wecom_bridge.conf
应用日志 /var/log/wecom_bridge/app.log
回滚脚本 /opt/scripts/rollback_wecom_bridge.sh
健康检查 http://localhost:8080/health

B. API Server 关键配置

# Hermes api_server 端点(已内置)
POST /v1/chat/completions
Header: Authorization: Bearer {token}
Header: X-Tenant-ID: {corp_id}
Header: X-User-ID: {user_id}
Body: { model, messages, stream }

# api_server 并发上限(已内置)
_MAX_CONCURRENT_RUNS = 10  # gateway/platforms/api_server.py:2373

C. 企业微信 API 参考

接口 用途
GET /cgi-bin/gettoken 获取 access_token
POST /cgi-bin/message/send 推送消息给用户
POST /cgi-bin/getcallbackip 获取企业微信服务器 IP(配置防火墙用)

文档版本:v1.0 | 2026-05-05 | 待评审

Left-click: follow link, Right-click: select node, Scroll: zoom
x