x

企业微信多租户智能体技术方案

本文档为企业微信多租户智能体的完整技术方案,整合了并发优化架构与多租户记忆隔离两大核心模块。

文档版本:v1.0
创建日期:2026-05-05
方案状态:待评审
负责人:王嘉成 / 杨彪


一、整体架构总览

1.1 系统架构图

┌─────────────────────────────────────────────────────────────────────────┐
                         企业微信用户10                               
   XiaGuoDong  BoWeiYa  Xiao  SiNeiKe  ShiHuangZhe  ShuGuang            
   ZhangYuXiaoWanZi  LiuWenWei  aiden  YiYeZhiQiu                     
└───────────────────────────────┬─────────────────────────────────────────┘
                                 回调 / 推送双向 TLS
                                
┌─────────────────────────────────────────────────────────────────────────┐
                    企业微信服务器WXQY Server                          
                    回调地址https://api.5jin.top/wecom                 
└───────────────────────────────┬─────────────────────────────────────────┘
                                 HTTP POST (JSON) / HTTP GET (URL验证)
                                
┌─────────────────────────────────────────────────────────────────────────┐
                         Nginx 反向代理                                   
              端口443公网 HTTPS)→ 8080内网                       
              SSL 终结 / 负载分发 / 限流                                  
└───────────────────────────────┬─────────────────────────────────────────┘
                                
                ┌───────────────┴───────────────┐
                         内网 8080              
                                               
┌────────────────────────┐      ┌────────────────────────────────────────┐
   现有 Gateway                       wecom_bridge 桥接服务            
   (default Profile)                 (独立进程独立部署)              
   处理所有平台消息             ┌────────────┐  ┌────────────┐        
   不改变继续运行             Webhook接收 │→  消息解析           
                                (aiohttp)     +路由             
                               └────────────┘  └─────┬──────┘        
                                                                    
                               ┌────────────┐  ┌─────▼──────┐        
                               异步回复队列 │←  调用api_server     
                               (asyncio.Queue) HTTP POST         
                               └────────────┘  └────────────┘        
└────────────────────────┘      └────────────────┬──────────────────────┘
                                                  
                                                   HTTP POST /v1/chat/completions
                                                   Headers: X-Tenant-ID, X-User-ID
                                                   Body: { model, messages, session_key }
                                                  
┌─────────────────────────────────────────────────────────────────────────┐
                      Hermes api_server                                  
              (thread pool, _MAX_CONCURRENT_RUNS = 10)                  
  ┌────────────────────────────────────────────────────────────────┐  
    /v1/chat/completions    _handle_chat_completions               
    /v1/runs               _handle_runs                            
    ─────────────────────────────────────────────────────────────    
    Session Key 构建: "{platform}:{dm|group}:{user_id}"              
    多租户 Header 解析: X-Tenant-ID, X-User-ID                       
  └────────────────────────────────────────────────────────────────┘  
└───────────────────────────────┬─────────────────────────────────────────┘
                                 Agent 推理
                                
┌─────────────────────────────────────────────────────────────────────────┐
                       Agent 推理层                                       
  ┌────────────────────────────────────────────────────────────────┐  
    读取记忆: 全局记忆 + 用户私有记忆 (wecom/{user_id}/MEMORY.md)    
    写入记忆: 默认  用户私有记忆; [GLOBAL]  全局记忆               
  └────────────────────────────────────────────────────────────────┘  
└───────────────────────────────┬─────────────────────────────────────────┘
                                 回复内容
                                
┌─────────────────────────────────────────────────────────────────────────┐
                       Redis Session 存储                                 
           Key: wecom:session:{tenant_id}:{user_id}                      
           TTL: 3600s / 保留最近 50 条消息                               
└───────────────────────────────┬─────────────────────────────────────────┘
                                
                                
┌─────────────────────────────────────────────────────────────────────────┐
                     wecom_bridge 回复路由                                
          异步队列推送  企业微信消息推送 API / 刷新 Redis TTL              
└─────────────────────────────────────────────────────────────────────────┘

1.2 核心设计原则

  1. 并行部署,互不影响:wecom_bridge 与现有 Gateway 共存,WeCom 流量切换到桥接服务,其他平台(WeChat 等)继续走 Gateway
  2. 零侵入 api_server:多租户上下文通过 HTTP Header 注入,api_server 本身不做代码改动
  3. 记忆隔离在 Agent 层实现:session key 路由 + 用户私有记忆文件读写,与 Bridge 解耦
  4. 异步非阻塞:全链路异步(aiohttp + asyncio),支持 10 用户并发
  5. 可秒级回滚:关闭桥接服务 → Nginx 切回 Gateway → 恢复原状

二、组件矩阵

组件 类型 职责 部署位置 依赖 状态
wecom_bridge Python/aiohttp Webhook 接收、消息解析、调用 api_server、异步回复队列 腾讯云重庆一区(独立进程) aiohttp, redis, pydantic, yaml 待开发
RateLimiter Python 滑动窗口限流、用户配额管理、配额余量查询 集成于 wecom_bridge 内 redis, asyncio 待开发
Logger Python 全量日志采集、双目标存储、异常检测 集成于 wecom_bridge 内 redis, asyncio 待开发
Hermes api_server Python LLM 推理、thread pool 并发管理、多租户 session 路由 已在运行(162.14.208.87) 已有
Hermes Agent Python 推理、记忆读写、工具调用 同 api_server memories/, skills/, state.db 待改造
Redis 缓存 多租户 Session 存储、历史消息缓存、TTL 管理 腾讯云重庆一区 已有
Nginx 反向代理 HTTPS 公网入口、SSL 终结、/wecom 路径分发 腾讯云重庆一区 已有
企业微信服务器 第三方 消息回调、消息推送 微信官方 回调签名验证

三、数据流向详解

3.1 完整消息生命周期

[用户] → [企业微信服务器] → [Nginx 443] → [wecom_bridge :8080]
                                                          
                                                          
                                                    [Step 1] 解析消息
                                              FromUserName / Content / MsgType
                                                          
                                                          
                                                    [Step 2] 构建 Session Key
                                              "wecom:dm:{user_id}"  (从消息体提取)
                                                          
                                                          
                                                    [Step 3] Redis 加载历史
                                              wecom:session:{corp_id}:{user_id}
                                                          
                                                          
                                                    [Step 4] 追加用户消息
                                              history.append({role:"user", content:"..."})
                                                          
                                                          
                                                    [Step 5] 调用 api_server
                                              POST /v1/chat/completions
                                              Headers: X-Tenant-ID, X-User-ID
                                              Body: {model, messages, session_key}
                                                          
                                                          
                                                    [Step 6] api_server 路由
                                              Thread Pool (上限10) → _handle_chat_completions
                                              提取 Header 中 X-User-ID → 注入 Agent 上下文
                                                          
                                                          
                                                    [Step 7] Agent 推理
                                              读取: 全局记忆 + wecom/{user_id}/MEMORY.md
                                              写入: wecom/{user_id}/MEMORY.md (默认)
                                              调用 LLM (MiniMax-M2.7-highspeed)
                                                          
                                                          
                                                    [Step 8] 返回回复内容
                                              {"choices":[{"message":{"content":"..."}}]}
                                                          
                                                          
                                                    [Step 9] 异步回复队列
                                              asyncio.Queue.put({to_user, content})
                                                          
                                                          
                                                    [Step 10] 推送至企业微信
                                              POST /cgi-bin/message/send
                                              access_token 自动刷新
                                                          
                                                          
                                                    [Step 11] 更新 Redis Session
                                              追加 assistant 回复 → setex TTL 刷新
                                                          
                                                          
[用户] ← [企业微信推送消息]

3.2 多租户 Header 体系

Header 名称 来源 说明 示例
X-Tenant-ID wecom_bridge 注入 企业标识,等于 corp_id wxcrmb00xxxx
X-User-ID wecom_bridge 注入 用户标识,等于 FromUserName XiaGuoDong
Authorization 配置注入 Bearer Token 访问 api_server Bearer eyJ...

3.3 Session Key 体系

场景 Session Key 格式 存储位置
WeCom 私聊 wecom:dm:{user_id} Redis + JSONL 双写
WeCom 群聊 wecom:group:{group_id} Redis + JSONL 双写
其他平台 weixin:dm:{user_id} JSONL(现状)

Redis 与 JSONL 的分工

  • Redis:wecom_bridge 写入,用于快速加载最近 50 条会话历史(短时窗口)
  • JSONL 文件:Agent 写入,用于持久化长期会话历史(跨重启保留)

四、记忆体系设计(核心)

4.1 记忆分层架构

Agent 记忆读取顺序从顶到底依次查找):
┌─────────────────────────────────────────────────────────┐
  1. 系统模板System Prompt                            
     - 角色定义 / 行为规则 / 平台适配指令                   
├─────────────────────────────────────────────────────────┤
  2. 全局记忆~/.hermes/memories/MEMORY.md             
     - 公司架构 / 技能文档 / 系统配置                      
     - 所有用户共享可读                                   
├─────────────────────────────────────────────────────────┤
  3. 用户私有记忆~/.hermes/memories/wecom/{user_id}/  
     - 该用户专属的对话历史项目上下文                   
     - 其他用户不可见                                     
├─────────────────────────────────────────────────────────┤
  4. 当前会话上下文session messages                   
     - 当前对话链中的最近消息 Redis / JSONL 注入     
└─────────────────────────────────────────────────────────┘

Agent 记忆写入规则
┌─────────────────────────────────────────────────────────┐
  普通对话记忆  wecom/{user_id}/MEMORY.md              
  全局共享信息  MEMORY.md需标注 [GLOBAL]            
  群聊共享信息  wecom/group:{group_id}/MEMORY.md       
└─────────────────────────────────────────────────────────┘

4.2 记忆文件物理结构

~/.hermes/
├── memories/
│   ├── MEMORY.md                      # 全局记忆(所有用户共享)
│   ├── USER.md                        # 全局用户配置
│   ├── skills/                        # 全局技能(所有用户共享)
│   └── wecom/                         # WeCom 用户私有记忆
│       ├── Xiao/
│       │   ├── MEMORY.md              # 徐滔私有记忆
│       │   └── USER.md               # 徐滔私有配置
│       ├── SiNeiKe/
│       ├── YiYeZhiQiu/
│       ├── aiden/
│       ├── ShuGuang/
│       ├── LiuWenWei/
│       ├── ZhangYuXiaoWanZi/
│       ├── ShiHuangZhe/
│       ├── BoWeiYa/
│       └── group:xxxxx/              # 群聊记忆(按群ID隔离)
│           └── MEMORY.md
├── sessions/
│   └── (按 session_id 的 .jsonl 文件)
└── state.db                          # 全局共用(暂不拆分)

4.3 用户目录与 WeCom ID 映射表

WeCom ID 真名 私有记忆目录
XiaGuoDong 夏雨 default(全局视角,不隔离)
Xiao 徐滔 wecom/Xiao/
SiNeiKe 杨卓 wecom/SiNeiKe/
YiYeZhiQiu 王嘉成 wecom/YiYeZhiQiu/
aiden 杨彪 wecom/aiden/
ShuGuang 张家学 wecom/ShuGuang/
LiuWenWei 刘文伟 wecom/LiuWenWei/
ZhangYuXiaoWanZi 张玉杰 wecom/ZhangYuXiaoWanZi/
ShiHuangZhe 朱一花 wecom/ShiHuangZhe/
BoWeiYa 戴美秋 wecom/BoWeiYa/

4.4 记忆读写核心逻辑

# 记忆读取(Agent 推理前)
def read_memory(user_id: str) -> str:
    global_mem = read_file("~/.hermes/memories/MEMORY.md")
    user_mem_path = f"~/.hermes/memories/wecom/{user_id}/MEMORY.md"
    user_mem = read_file(user_mem_path) if exists(user_mem_path) else ""
    return f"{global_mem}\n\n{user_mem}"

# 记忆写入(Agent 推理后)
def write_memory(user_id: str, content: str):
    if content.strip().startswith("[GLOBAL]"):
        append_to_file("~/.hermes/memories/MEMORY.md", content)
    else:
        user_mem_path = f"~/.hermes/memories/wecom/{user_id}/MEMORY.md"
        append_to_file(user_mem_path, content)
        # 写入前去重(最近20条)
        deduplicate(user_mem_path, keep_recent=20)

# 群聊记忆
def write_group_memory(group_id: str, content: str):
    group_mem_path = f"~/.hermes/memories/wecom/group:{group_id}/MEMORY.md"
    append_to_file(group_mem_path, content)

五、wecom_bridge 桥接服务详解

5.1 核心模块职责

模块 职责 关键技术
Webhook 接收 接收企业微信回调,URL 验证 aiohttp, asyncio
签名验证 验证消息签名合法 hashlib, hmac
消息解析 提取 FromUserName/Content/MsgType pydantic
Session 管理 Redis 读写历史会话 redis.asyncio
API Server 客户端 调用 Hermes api_server aiohttp
异步回复队列 批量推送企业微信消息 asyncio.Queue
Access Token 管理 自动获取/刷新微信 API Token 缓存 + 过期刷新

5.2 异步队列设计

┌──────────────────────────────────────────────────┐
            asyncio.Queue (maxsize=1000)           
                                                   
  put({to_user, content, agent_id})   生产者     
                                                 
                                                 
  [_reply_worker]  串行发送消息到企业微信          
                                                 
                                                 
  失败  重试3次  记录日志  丢弃               
└──────────────────────────────────────────────────┘

回复优先级
- 文本消息正常队列
- 消息类型的异常如重复回调):优先处理

5.3 与现有 Gateway 的部署关系

维度 现有 Gateway wecom_bridge
处理范围 所有平台(WeCom/WeChat/其他) 仅 WeCom 消息
部署方式 Systemd 托管 独立进程 + Systemd
端口 内部 8000 内部 8080(Nginx 分流)
Session 存储 JSONL 文件 Redis + JSONL 双写
并发模型 单线程同步 asyncio 异步多用户
回滚 关闭 bridge → Nginx 切回 Gateway

Nginx 路由策略

/wecom/*  → wecom_bridge:8080  (WeCom 回调)
/api/*    → Hermes api_server  (OpenAI 兼容 API)
/         → 现有 Gateway:8000  (其他所有)

六、api_server 端改造

6.1 多租户 Header 解析

_handle_chat_completions 入口处,从 HTTP Header 提取租户/用户 ID:

# 在现有 request handler 中增加:
tenant_id = request.headers.get("X-Tenant-ID", "default")
user_id = request.headers.get("X-User-ID", "unknown")

# 注入到 session_key 构建逻辑
session_key = f"wecom:dm:{user_id}"  # 替换原有构建逻辑

6.2 Session Key 路由到 Agent

# session.py 或 run.py 改造
def build_session_key(platform: str, chat_type: str, user_id: str) -> str:
    return f"{platform}:{chat_type}:{user_id}"

# Agent 推理时接收 user_id 参数
async def run_agent(user_id: str, session_key: str, messages: list):
    # 读取用户私有记忆
    user_mem = read_user_memory(user_id)
    # 追加到 system prompt 或 messages
    messages = inject_memory_context(messages, user_mem)
    # 执行推理...

6.3 现有 thread pool 并发复用

api_server 已实现 _MAX_CONCURRENT_RUNS = 10 的线程池,无需额外开发,wecom_bridge 的 10 用户并发模型直接复用此线程池。


七、完整目录结构

/www/wwwroot/ma.5jin.top/           # Vault 和 wiki 发布根目录
/
├── etc/
│   └── wecom_bridge/
│       └── config.yaml              # 桥接服务配置(敏感信息)
│
├── opt/
│   └── wecom_bridge/
│       ├── wecom_bridge.py         # 桥接服务主程序
│       ├── requirements.txt         # Python 依赖
│       └── systemd/
│           └── wecom-bridge.service # Systemd 启动文件
│
~/.hermes/                          # Hermes Agent 根目录
├── memories/
│   ├── MEMORY.md                  # 全局记忆(共享)
│   ├── USER.md                    # 全局配置
│   ├── skills/                    # 技能目录
│   └── wecom/                     # WeCom 用户私有记忆
│       ├── Xiao/MEMORY.md
│       ├── SiNeiKe/MEMORY.md
│       ├── YiYeZhiQiu/MEMORY.md
│       ├── aiden/MEMORY.md
│       ├── ShuGuang/MEMORY.md
│       ├── LiuWenWei/MEMORY.md
│       ├── ZhangYuXiaoWanZi/MEMORY.md
│       ├── ShiHuangZhe/MEMORY.md
│       ├── BoWeiYa/MEMORY.md
│       └── group:{group_id}/MEMORY.md  # 群聊记忆
├── sessions/                      # JSONL 会话历史(按 session_id)
└── state.db                       # 全局状态 DB(暂不拆分)

八、实施步骤

Phase 1:基础设施准备(约 30 分钟)

  1. Nginx 配置:增加 /wecom 路径反向代理到 127.0.0.1:8080
  2. Redis 确认:确认 Redis 已安装并可连接(用于 Session 存储)
  3. Python 依赖安装pip install aiohttp redis pyyaml pydantic
  4. 配置目录创建/etc/wecom_bridge/config.yaml

Phase 2:wecom_bridge 开发(约 6.5 小时)

模块 预计工时 优先级
主程序骨架 + 配置加载 30 分钟 P0
Webhook 接收 + URL 验证 1 小时 P0
签名验证 30 分钟 P0
SessionManager (Redis) 1 小时 P0
ApiServerClient 30 分钟 P0
WeComReplyClient + Token 管理 1 小时 P0
异步回复队列 30 分钟 P1
RateLimiter 滑动窗口限流器 1 小时 P0
Logger 全量日志采集器 1.5 小时 P0
Systemd 启动文件 15 分钟 P1

Phase 3:Agent 记忆层改造(约 2 小时)

模块 预计工时 优先级
目录结构创建脚本 15 分钟 P0
记忆读取改造(全局 + 用户私有合并) 1 小时 P0
记忆写入改造([GLOBAL] 标签支持) 30 分钟 P0
用户配置加载(USER.md) 15 分钟 P1
去重机制 15 分钟 P1
群聊记忆处理 15 分钟 P1
现有全局记忆迁移到各用户 1 小时 P2

Phase 4:api_server Header 注入(约 1 小时)

模块 预计工时 优先级
X-Tenant-ID / X-User-ID 解析 30 分钟 P0
session_key 构建逻辑调整 30 分钟 P0

Phase 5:联调测试(约 2 小时)

测试场景 预期结果
企业微信发消息 → 收到回复 全流程正常工作
10 用户同时发消息 Thread pool 限流,顺序处理不崩溃
张家学说"我是客服" → 写入 wecom/ShuGuang/MEMORY.md 记忆文件正确
王嘉成发消息 → 不读取到张家学的记忆 记忆隔离正常
关闭 wecom_bridge → Nginx 切回 Gateway 回滚成功,WeCom 消息仍能到达
重启后 Session 历史恢复 Redis TTL 刷新正常

九、用户限流机制

9.1 限流设计原则

原则 说明
分层限流 第一层:wecom_bridge 消息接收层;第二层:api_server LLM 调用层
滑动窗口 使用 Redis ZSET 实现精确滑动窗口,避免固定窗口边界突发问题
规则可配置 限流规则写在 config.yaml,不硬编码
白名单豁免 管理员(夏总)不受限流约束
限流不阻塞 超限后消息进入排队列,不直接拒绝,用户感知友好

9.2 限流维度与配额表

用户 角色 每小时上限 每天上限 备注
XiaGuoDong 管理员 不限流
Xiao / SiNeiKe / YiYeZhiQiu / aiden 技术部 100 条/小时 500 条/天 开发调试
ShuGuang / LiuWenWei / ZhangYuXiaoWanZi / ShiHuangZhe 客服部 60 条/小时 300 条/天 日常办公
BoWeiYa 招商部 60 条/小时 300 条/天 日常办公
新用户(试用期 30 天) 试用 20 条/小时 100 条/天 限制

配额余量提醒:当用户剩余配额低于 20% 时,在回复末尾提示"本月还剩 X 条额度"。

9.3 Redis 滑动窗口实现

限流 Key 设计(ZSET 滑动窗口):
  ratelimit:{user_id}:hourly  → ZSET  (score=时间戳, member=msg_id)
  ratelimit:{user_id}:daily   → ZSET  (score=时间戳, member=msg_id)

检查逻辑:
  1. ZREMRANGEBYSCORE 清除窗口外的旧记录
  2. ZCARD 计数当前窗口内消息数
  3. 若超限 → 消息进入 asyncio.Queue 排队等待
  4. 若未超限 → ZADD 写入当前消息 → 继续处理
# 滑动窗口限流器(RateLimiter)
import time
import uuid
import redis.asyncio as redis

class RateLimiter:
    """基于 Redis ZSET 的滑动窗口限流器"""

    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url, encoding="utf-8", decode_responses=True)

    async def is_allowed(
        self,
        user_id: str,
        hourly_limit: int,
        daily_limit: int,
    ) -> tuple[bool, str]:
        """
        检查用户是否在限流配额内
        返回 (是否允许, 剩余配额描述)
        """
        now = time.time()
        hourly_key = f"ratelimit:{user_id}:hourly"
        daily_key = f"ratelimit:{user_id}:daily"
        msg_id = f"{now}:{uuid.uuid4().hex[:8]}"

        # 清除过期记录(窗口外)
        hour_ago = now - 3600
        day_ago = now - 86400
        await self.redis.zremrangebyscore(hourly_key, 0, hour_ago)
        await self.redis.zremrangebyscore(daily_key, 0, day_ago)

        # 计数
        hourly_count = await self.redis.zcard(hourly_key)
        daily_count = await self.redis.zcard(daily_key)

        # 超限检查
        if hourly_count >= hourly_limit:
            return False, f"小时级限流:已达上限 {hourly_limit} 条/小时,请稍后再试"
        if daily_count >= daily_limit:
            return False, f"日级限流:已达上限 {daily_limit} 条/天,请明天再试"

        # 写入当前消息
        pipe = self.redis.pipeline()
        pipe.zadd(hourly_key, {msg_id: now})
        pipe.zadd(daily_key, {msg_id: now})
        pipe.expire(hourly_key, 3700)  # 多留100秒防边界
        pipe.expire(daily_key, 86500)
        await pipe.execute()

        remaining_hourly = hourly_limit - hourly_count - 1
        remaining_daily = daily_limit - daily_count - 1
        return True, f"remaining_hourly={remaining_hourly}, remaining_daily={remaining_daily}"

    async def get_remaining(self, user_id: str, hourly_limit: int, daily_limit: int) -> dict:
        """查询用户当前剩余配额"""
        now = time.time()
        hour_ago = now - 3600
        day_ago = now - 86400

        hourly_key = f"ratelimit:{user_id}:hourly"
        daily_key = f"ratelimit:{user_id}:daily"

        await self.redis.zremrangebyscore(hourly_key, 0, hour_ago)
        await self.redis.zremrangebyscore(daily_key, 0, day_ago)

        hourly_count = await self.redis.zcard(hourly_key)
        daily_count = await self.redis.zcard(daily_key)

        return {
            "hourly_remaining": max(0, hourly_limit - hourly_count),
            "daily_remaining": max(0, daily_limit - daily_count),
        }

9.4 限流在 wecom_bridge 中的集成位置

消息到达 wecom_bridge
    │
    ▼
[Step 1] 解析 FromUserName / MsgType
    │
    ▼
[Step 2] 检查限流(RateLimiter.is_allowed)
    │
    ├─ 允许 → 正常处理流程
    │
    └─ 拒绝 → 立即返回限流提示消息
                  "您本周期的消息额度已用完(60/60),
                   请联系管理员提升配额或等待配额重置。"

9.5 超出限流后的处理策略

策略 行为 适用场景
队列等待 消息进入 asyncio.Queue 排队,等配额刷新后再处理 临时突发、非紧急
友好拒绝 立即返回限流提示,不进入队列 配额确实耗尽
只读模式 限流后只能触发只读操作(查数据),不能触发 LLM 调用 严格成本控制
升级提示 限流后提示"配额即将用完,升级企业套餐请联系我们" 商业化场景

推荐方案:日常用户使用"队列等待",超出日限额后使用"友好拒绝"。

9.6 特殊限流场景

突发消息(Burst)

  • 部分用户偶尔需要一次性发多条消息(如批量汇报)
  • 令牌桶算法支持突发:redis.call('INCRBY', key, n) 一次消耗 n 个令牌
  • 在配置中标记 burst_allowed: true 的用户可临时超配

试用期用户

  • 前 30 天受严格限制(20/100)
  • 30 天后自动升级为正式用户配额(根据部门)
  • 试用期标记存在 wecom/{user_id}/USER.mdjoin_date 字段

LLM Token 限流(进阶)

  • 在 api_server 层增加 token 计数器(比消息数更精确)
  • Key:tokens:{user_id}:{hour},每次 LLM 调用后累加 usage.total_tokens
  • 超出 token 配额时返回 429,由 wecom_bridge 展示友好提示

9.7 限流配置表(config.yaml)

ratelimit:
  enabled: true

  # 用户配额配置
  quotas:
    # 管理员不限
    XiaGuoDong:
      hourly: -1    # -1 表示不限
      daily: -1

    # 技术部
    Xiao:
    SiNeiKe:
    YiYeZhiQiu:
    aiden:
      hourly: 100
      daily: 500
      burst_allowed: true

    # 客服部
    ShuGuang:
    LiuWenWei:
    ZhangYuXiaoWanZi:
    ShiHuangZhe:
      hourly: 60
      daily: 300
      burst_allowed: false

    # 招商部
    BoWeiYa:
      hourly: 60
      daily: 300
      burst_allowed: false

  # 试用期默认配额(新用户自动匹配)
  trial:
    hourly: 20
    daily: 100
    duration_days: 30

  # 超限行为
  overflow_action: "queue"   # queue | reject | read_only
  queue_maxsize: 100         # 排队列上限
  warning_threshold: 0.2      # 剩余 20% 时提示

9.8 监控与告警

# 限流指标采集(Prometheus 格式)
ratelimit_total{user_id}           # 用户总请求数
ratelimit_rejected{user_id}       # 被限流拒绝数
ratelimit_queue_size{user_id}      # 用户当前排队列长度

# 告警规则
- 当某用户被拒绝率 > 10%/小时  发送告警可能是异常调用
- 当某用户日配额使用率 > 90%  提醒用户配额即将耗尽
- 当排队列长度 > 50  提示系统繁忙

十、用户日志管理

10.1 日志设计原则

原则 说明
全量采集 每条消息的完整生命周期都要记录,不可丢失
结构化日志 JSON 格式,便于后续查询和分析
用户隔离 各用户日志物理隔离在独立目录/Redis key
脱敏合规 手机号、身份证等敏感字段自动打码
分层存储 热数据存 Redis,冷数据归档到文件
可查询 支持按用户/时间/事件类型/关键词查询

10.2 日志采集维度

每条消息从发送到回复,完整记录以下节点:

节点 字段 说明
消息到达 msg_id, user_id, timestamp, msg_type, content_preview 消息何时来、内容摘要(脱敏)
限流检查 rate_limit_checked, quota_hourly_used, quota_daily_used, allowed 是否超限流
Session加载 session_key, history_length, redis_hit 历史会话长度
LLM调用 model, prompt_tokens, completion_tokens, total_tokens, latency_ms token消耗和延迟
记忆读写 memory_read_global, memory_read_user, memory_written_to, memory_lines_added 记忆操作
回复发送 reply_length, reply_preview, send_success, send_latency_ms 回复结果
异常记录 error_type, error_message, stack_trace 错误信息

10.3 日志存储架构

┌─────────────────────────────────────────────────────────────────┐
                      wecom_bridge                               
  ┌─────────────────────────────────────────────────────────┐  
    日志采集器 (Logger)                                       
    每一步操作  异步写入  双目标                            
  └─────────────────────────────────────────────────────────┘  
                                                               
          ┌─────────────┴─────────────┐                        
                                                             
  ┌───────────────────┐    ┌───────────────────┐             
    Redis (热存储)          文件 (冷存储)                  
    logs:{user_id}:        /var/log/wecom/               
      {date}:{hour}         {user_id}/                   
    TTL: 7                 {date}.jsonl                
  └───────────────────┘    └───────────────────┘             
└─────────────────────────────────────────────────────────────────┘

存储分层策略

  • Redis:最近 7 天热数据,ZSET 按时间戳排序,支持范围查询
  • 文件:7 天后归档为 JSONL,按用户/日期分目录
  • 保留期限:默认 90 天,到期自动删除或归档到对象存储

10.4 Redis 日志 Key 设计

# 用户日志(ZSET,按时间戳排序)
logs:{user_id}:{YYYY-MM-DD}     → ZSET (score=timestamp, member=json_log)
                                    TTL: 8天(多留1天防边界)

# 用户当日索引(方便按小时查)
logs:{user_id}:{YYYY-MM-DD}:hour:{HH}  → ZSET (score=timestamp, member=msg_id)
                                    TTL: 2天

# 全局操作日志(所有用户的汇总,用于监控)
logs:global:{YYYY-MM-DD}         → ZSET (score=timestamp, member=json_log)
                                    TTL: 30天

10.5 日志格式(JSON Schema)

{
  "msg_id": "wx2msg_abc123",
  "user_id": "ShuGuang",
  "tenant_id": "wxcrmb00xxxx",
  "timestamp": "2026-05-05T19:30:00.123+08:00",
  "event": "message_sent",
  "session_key": "wecom:dm:ShuGuang",
  "platform": "wecom",
  "data": {
    "msg_type": "text",
    "content_preview": "帮我查今天的销售数据",
    "content_length": 50,
    "has_attachment": false
  },
  "context": {
    "rate_limit": {
      "hourly_used": 12,
      "hourly_limit": 60,
      "daily_used": 45,
      "daily_limit": 300,
      "allowed": true
    },
    "session": {
      "history_length": 38,
      "redis_hit": true
    },
    "llm": {
      "model": "MiniMax-M2.7-highspeed",
      "prompt_tokens": 1200,
      "completion_tokens": 180,
      "total_tokens": 1380,
      "latency_ms": 1250
    },
    "memory": {
      "read_global_lines": 45,
      "read_user_lines": 23,
      "written_to": "wecom/ShuGuang/MEMORY.md",
      "lines_added": 3
    },
    "reply": {
      "length": 320,
      "send_success": true,
      "send_latency_ms": 230
    }
  },
  "error": null
}

content_preview 脱敏规则

def sanitize_content(content: str) -> str:
    """脱敏处理,防止敏感信息写入日志"""
    # 手机号打码
    content = re.sub(r'1[3-9]\d{9}', '1**********', content)
    # 身份证号打码
    content = re.sub(r'\d{17}[\dXx]', '**************', content)
    # 银行卡号打码
    content = re.sub(r'\d{12,19}', '************', content)
    # 截断超长内容
    if len(content) > 200:
        content = content[:200] + '...[truncated]'
    return content

10.6 日志查询接口

# wecom_bridge 提供内部查询接口(仅管理员可调用)

async def query_logs(
    user_id: str = None,
    start_time: datetime = None,
    end_time: datetime = None,
    event_type: str = None,
    limit: int = 100,
) -> list[dict]:
    """
    查询日志
    - user_id: 限定用户,不填则查所有
    - start_time / end_time: 时间范围
    - event_type: message_sent / llm_call / error 等
    - limit: 返回条数限制
    """
    pass

# 示例:查询张家学今天的所有消息
GET /admin/logs?user_id=ShuGuang&date=2026-05-05&limit=50

# 示例:查询今天所有异常
GET /admin/logs?event_type=error&date=2026-05-05

# 示例:查询某条消息的完整链路
GET /admin/logs?msg_id=wx2msg_abc123

10.7 日志分析功能

用量统计Dashboard

指标 说明
每小时消息量 各用户每小时消息数(折线图)
日token消耗 按用户/部门汇总token用量
平均响应延迟 LLM latency P50/P95/P99
限流触发统计 各用户被限流次数
错误率 按错误类型分类统计

异常行为检测

# 异常检测规则
ANOMALY_RULES = [
    # 1小时内消息超过配额的80%
    {"type": "high_frequency", "threshold": "quota * 0.8", "window": "1h"},
    # 连续3次LLM调用失败
    {"type": "llm_failure", "threshold": 3, "window": "5m"},
    # 消息内容重复率>70%(可能是机器人刷屏)
    {"type": "content_duplication", "threshold": 0.7, "window": "10m"},
    # 单条消息token消耗异常高(>10k)
    {"type": "high_token", "threshold": 10000, "window": "single"},
]

# 触发异常时:
# 1. 记录到 logs:global:{date} 的 ERROR 级别日志
# 2. 发送告警(企业微信管理员通知)
# 3. 可选:临时冻结该用户(需管理员确认)

10.8 日志配置(config.yaml)

logging:
  enabled: true

  # 日志级别
  level: "INFO"              # DEBUG | INFO | WARN | ERROR

  # 存储策略
  storage:
    # Redis 热存储
    redis:
      enabled: true
      ttl_days: 7
      key_prefix: "logs"

    # 文件冷存储
    file:
      enabled: true
      base_path: "/var/log/wecom"
      retention_days: 90
      rotation: "daily"       # daily | hourly | 100mb
      compress: true         # gzip 压缩旧日志

  # 脱敏规则
  sanitization:
    enabled: true
    rules:
      - field: "content"
        patterns:
          - '1[3-9]\d{9}'           # 手机号
          - '\d{17}[\dXx]'          # 身份证
          - '\d{12,19}'             # 银行卡
        replacement: "[REDACTED]"

  # 采集维度
  events:
    - "message_received"       # 消息到达
    - "rate_limit_check"       # 限流检查
    - "session_load"           # Session加载
    - "llm_call"               # LLM调用
    - "memory_operation"       # 记忆读写
    - "reply_sent"             # 回复发送
    - "error"                  # 异常

  # 异常检测
  anomaly_detection:
    enabled: true
    alert_threshold:
      high_frequency_rate: 0.8    # 超过配额80%触发
      error_rate_per_minute: 5     # 每分钟5次错误触发
      high_token_single: 10000     # 单次超过10k token触发

10.9 安全与合规

维度 措施
访问控制 日志查询接口仅管理员(XiaGuoDong)可调用,需 Bearer Token 认证
数据隔离 各用户只能查询自己的日志,管理员可查所有人日志
传输加密 日志接口强制 HTTPS,Redis 连接支持 TLS
敏感过滤 手机号、身份证、银行卡等自动脱敏
保留期限 Redis 7天 → 文件90天 → 自动删除
审计日志 管理员查询日志的行为本身也要记录(审计追踪)

10.10 与现有系统的联动

  • 与限流系统联动:限流触发时记录 rate_limit_check 事件
  • 与 Session 管理联动:Session 加载/保存时记录 session_load 事件
  • 与记忆系统联动:记忆读写时记录 memory_operation 事件
  • 与监控告警联动:异常日志 → Prometheus metrics → 告警通知

十一、注意点与避坑指南

11.1 企业微信回调验证

  • URL 验证时机:首次配置回调地址时,企业微信会发送 GET 请求验证 URL有效性,需在 5 秒内返回 echostr
  • 消息签名验证:每次 POST 回调都必须验证 msg_signature,使用 hmac.compare_digest 防止时序攻击
  • Token 刷新:微信 access_token 有效期 2 小时,需在过期前主动刷新,代码中以 expires_in - 60 作为提前刷新阈值

11.2 并发控制

  • api_server thread pool 上限MAX_CONCURRENT_RUNS = 10,超过的请求在 queue 中排队
  • wecom_bridge 队列上限asyncio.Queue(maxsize=1000),满时 put 阻塞
  • Redis 连接池max_connections=20,确保高并发下不耗尽连接

11.3 Session Key 唯一性

  • 同一条消息不要重复处理:企业微信可能因未收到确认而重发同一消息,需要在 Redis 中记录已处理的 msg_id 进行去重
  • 群聊与私聊的 session key 必须严格区分:格式不同 (wecom:dm:user vs wecom:group:groupid)

11.4 记忆文件并发写入

  • 同一用户并发写入:asyncio 场景下需要文件锁(fcntl.flock)防止多条消息同时追加导致文件损坏
  • 去重检查:写入前读取最近 20 条,比对内容重复则跳过

11.5 wecom_bridge 与 Gateway 的 Session 冲突

  • 双写策略:wecom_bridge 写 Redis,Gateway 写 JSONL,两者互补
  • 不要让两边写同一个 Redis key,否则 Gateway 重启后会出现短暂记忆丢失(建议 Redis 作为 WeCom 专用,Gateway 保持只写 JSONL)
  • 最终方案:WeCom 流量全部切换到 wecom_bridge,Gateway 只处理 WeChat 等其他平台

11.6 [GLOBAL] 标签的记忆归属判断

  • Agent 在写入记忆前需要判断该记忆是否具有全局性质(公司制度、技术文档等)
  • 判断规则
  • 涉及"我们公司"、"公司规定" → 写全局
  • 涉及个人任务、私人偏好 → 写用户私有
  • 模糊场景 → 默认写用户私有,agent 可事后纠正

11.7 新用户自动创建目录

  • wecom/{user_id}/ 目录不存在时,在首次消息处理时自动创建
  • 初始化 USER.md 和空 MEMORY.md,避免 404 文件错误

11.8 迁移现有全局记忆

  • 现有 ~/.hermes/memories/MEMORY.md 内容需要按用户拆分
  • 策略:根据历史会话中用户的发言模式判断归属(模糊的保留全局)
  • 建议:先不动,等用户自然积累新记忆后,全局文件中与具体用户相关的条目再逐步迁移

11.9 夏总(XiaGuoDong)的特殊处理

  • 夏总使用 default profile(不走 wecom/XiaGuoDong/),始终拥有全局视角
  • 夏总可选择性查看指定用户的私有记忆(如主动问"张家学最近在做什么")

11.10 state.db 暂不拆分

  • 当前 190MB,拆分风险高
  • user_id 在 SQL 层面过滤即可实现逻辑隔离
  • 后续如出现性能问题,再考虑按用户拆分到独立 DB 文件

十二、风险评估

风险 概率 影响 应对
企业微信重发消息导致重复处理 Redis 记录 msg_id 去重
并发写入记忆文件损坏 文件锁 fcntl.flock
Redis 连接池耗尽 监控 + 连接池上限 20
迁移记忆时用户归属错误 保守策略:模糊条目保留全局
Bridge 崩溃影响 WeCom 服务 Systemd 自动重启 + Nginx 快速切回 Gateway
api_server thread pool 满载 队列排队 + 超时熔断
限流计数器被绕过(同一msg_id重放) msg_id 用 UUID,每次唯一
用户配额耗尽导致队列积压 监控队列长度 + 限流告警
日志写入失败导致消息丢失 写入失败时降级到文件日志,不阻塞主流程
日志量过大撑满磁盘 严格按 retention_days 执行清理,配监控告警
脱敏规则不全导致敏感信息泄露 定期审查正则表达式,上线前 review

十三、后续扩展方向

  1. 技能权限隔离skills/shared/ + skills/private/{user_id}/,客服岗看不到服务器操作技能
  2. 工具权限分级:技术部全开,客服岗禁用高危工具(服务器命令等)
  3. 记忆可视化面板:用户可手动查看/编辑自己的私有记忆
  4. 跨用户共享上下文:显式 @mention 其他用户时,可临时读取对方记忆片段
  5. 多企业支持:通过 X-Tenant-ID 支持多个企业微信账号接入
Left-click: follow link, Right-click: select node, Scroll: zoom
x