企业微信多租户智能体技术方案
本文档为企业微信多租户智能体的完整技术方案,整合了并发优化架构与多租户记忆隔离两大核心模块。
文档版本:v1.0
创建日期:2026-05-05
方案状态:待评审
负责人:王嘉成 / 杨彪
一、整体架构总览
1.1 系统架构图
┌─────────────────────────────────────────────────────────────────────────┐
│ 企业微信用户(10人) │
│ XiaGuoDong BoWeiYa Xiao SiNeiKe ShiHuangZhe ShuGuang │
│ ZhangYuXiaoWanZi LiuWenWei aiden YiYeZhiQiu │
└───────────────────────────────┬─────────────────────────────────────────┘
│ 回调 / 推送(双向 TLS)
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ 企业微信服务器(WXQY Server) │
│ 回调地址:https://api.5jin.top/wecom │
└───────────────────────────────┬─────────────────────────────────────────┘
│ HTTP POST (JSON) / HTTP GET (URL验证)
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Nginx 反向代理 │
│ 端口:443(公网 HTTPS)→ 8080(内网) │
│ SSL 终结 / 负载分发 / 限流 │
└───────────────────────────────┬─────────────────────────────────────────┘
│
┌───────────────┴───────────────┐
│ 内网 8080 │
▼ ▼
┌────────────────────────┐ ┌────────────────────────────────────────┐
│ 现有 Gateway │ │ wecom_bridge 桥接服务 │
│ (default Profile) │ │ (独立进程,独立部署) │
│ 处理所有平台消息 │ │ ┌────────────┐ ┌────────────┐ │
│ 不改变,继续运行 │ │ │Webhook接收 │→ │ 消息解析 │ │
│ │ │ │ (aiohttp) │ │ +路由 │ │
│ │ │ └────────────┘ └─────┬──────┘ │
│ │ │ │ │
│ │ │ ┌────────────┐ ┌─────▼──────┐ │
│ │ │ │异步回复队列 │← │ 调用api_server│ │
│ │ │ │(asyncio.Queue)│ │HTTP POST │ │
│ │ │ └────────────┘ └────────────┘ │
└────────────────────────┘ └────────────────┬──────────────────────┘
│
│ HTTP POST /v1/chat/completions
│ Headers: X-Tenant-ID, X-User-ID
│ Body: { model, messages, session_key }
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Hermes api_server │
│ (thread pool, _MAX_CONCURRENT_RUNS = 10) │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ /v1/chat/completions → _handle_chat_completions │ │
│ │ /v1/runs → _handle_runs │ │
│ │ ───────────────────────────────────────────────────────────── │ │
│ │ Session Key 构建: "{platform}:{dm|group}:{user_id}" │ │
│ │ 多租户 Header 解析: X-Tenant-ID, X-User-ID │ │
│ └────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────┬─────────────────────────────────────────┘
│ Agent 推理
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Agent 推理层 │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ 读取记忆: 全局记忆 + 用户私有记忆 (wecom/{user_id}/MEMORY.md) │ │
│ │ 写入记忆: 默认 → 用户私有记忆; [GLOBAL] → 全局记忆 │ │
│ └────────────────────────────────────────────────────────────────┘ │
└───────────────────────────────┬─────────────────────────────────────────┘
│ 回复内容
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Redis Session 存储 │
│ Key: wecom:session:{tenant_id}:{user_id} │
│ TTL: 3600s / 保留最近 50 条消息 │
└───────────────────────────────┬─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ wecom_bridge 回复路由 │
│ 异步队列推送 → 企业微信消息推送 API / 刷新 Redis TTL │
└─────────────────────────────────────────────────────────────────────────┘
1.2 核心设计原则
- 并行部署,互不影响:wecom_bridge 与现有 Gateway 共存,WeCom 流量切换到桥接服务,其他平台(WeChat 等)继续走 Gateway
- 零侵入 api_server:多租户上下文通过 HTTP Header 注入,api_server 本身不做代码改动
- 记忆隔离在 Agent 层实现:session key 路由 + 用户私有记忆文件读写,与 Bridge 解耦
- 异步非阻塞:全链路异步(aiohttp + asyncio),支持 10 用户并发
- 可秒级回滚:关闭桥接服务 → Nginx 切回 Gateway → 恢复原状
二、组件矩阵
| 组件 |
类型 |
职责 |
部署位置 |
依赖 |
状态 |
| wecom_bridge |
Python/aiohttp |
Webhook 接收、消息解析、调用 api_server、异步回复队列 |
腾讯云重庆一区(独立进程) |
aiohttp, redis, pydantic, yaml |
待开发 |
| RateLimiter |
Python |
滑动窗口限流、用户配额管理、配额余量查询 |
集成于 wecom_bridge 内 |
redis, asyncio |
待开发 |
| Logger |
Python |
全量日志采集、双目标存储、异常检测 |
集成于 wecom_bridge 内 |
redis, asyncio |
待开发 |
| Hermes api_server |
Python |
LLM 推理、thread pool 并发管理、多租户 session 路由 |
已在运行(162.14.208.87) |
— |
已有 |
| Hermes Agent |
Python |
推理、记忆读写、工具调用 |
同 api_server |
memories/, skills/, state.db |
待改造 |
| Redis |
缓存 |
多租户 Session 存储、历史消息缓存、TTL 管理 |
腾讯云重庆一区 |
— |
已有 |
| Nginx |
反向代理 |
HTTPS 公网入口、SSL 终结、/wecom 路径分发 |
腾讯云重庆一区 |
— |
已有 |
| 企业微信服务器 |
第三方 |
消息回调、消息推送 |
微信官方 |
回调签名验证 |
— |
三、数据流向详解
3.1 完整消息生命周期
[用户] → [企业微信服务器] → [Nginx 443] → [wecom_bridge :8080]
│
▼
[Step 1] 解析消息
FromUserName / Content / MsgType
│
▼
[Step 2] 构建 Session Key
"wecom:dm:{user_id}" (从消息体提取)
│
▼
[Step 3] Redis 加载历史
wecom:session:{corp_id}:{user_id}
│
▼
[Step 4] 追加用户消息
history.append({role:"user", content:"..."})
│
▼
[Step 5] 调用 api_server
POST /v1/chat/completions
Headers: X-Tenant-ID, X-User-ID
Body: {model, messages, session_key}
│
▼
[Step 6] api_server 路由
Thread Pool (上限10) → _handle_chat_completions
提取 Header 中 X-User-ID → 注入 Agent 上下文
│
▼
[Step 7] Agent 推理
读取: 全局记忆 + wecom/{user_id}/MEMORY.md
写入: wecom/{user_id}/MEMORY.md (默认)
调用 LLM (MiniMax-M2.7-highspeed)
│
▼
[Step 8] 返回回复内容
{"choices":[{"message":{"content":"..."}}]}
│
▼
[Step 9] 异步回复队列
asyncio.Queue.put({to_user, content})
│
▼
[Step 10] 推送至企业微信
POST /cgi-bin/message/send
access_token 自动刷新
│
▼
[Step 11] 更新 Redis Session
追加 assistant 回复 → setex TTL 刷新
│
▼
[用户] ← [企业微信推送消息]
| Header 名称 |
来源 |
说明 |
示例 |
X-Tenant-ID |
wecom_bridge 注入 |
企业标识,等于 corp_id |
wxcrmb00xxxx |
X-User-ID |
wecom_bridge 注入 |
用户标识,等于 FromUserName |
XiaGuoDong |
Authorization |
配置注入 |
Bearer Token 访问 api_server |
Bearer eyJ... |
3.3 Session Key 体系
| 场景 |
Session Key 格式 |
存储位置 |
| WeCom 私聊 |
wecom:dm:{user_id} |
Redis + JSONL 双写 |
| WeCom 群聊 |
wecom:group:{group_id} |
Redis + JSONL 双写 |
| 其他平台 |
weixin:dm:{user_id} 等 |
JSONL(现状) |
Redis 与 JSONL 的分工:
- Redis:wecom_bridge 写入,用于快速加载最近 50 条会话历史(短时窗口)
- JSONL 文件:Agent 写入,用于持久化长期会话历史(跨重启保留)
四、记忆体系设计(核心)
4.1 记忆分层架构
Agent 记忆读取顺序(从顶到底,依次查找):
┌─────────────────────────────────────────────────────────┐
│ 1. 系统模板(System Prompt) │
│ - 角色定义 / 行为规则 / 平台适配指令 │
├─────────────────────────────────────────────────────────┤
│ 2. 全局记忆(~/.hermes/memories/MEMORY.md) │
│ - 公司架构 / 技能文档 / 系统配置 │
│ - 所有用户共享可读 │
├─────────────────────────────────────────────────────────┤
│ 3. 用户私有记忆(~/.hermes/memories/wecom/{user_id}/) │
│ - 该用户专属的对话历史、项目上下文 │
│ - 其他用户不可见 │
├─────────────────────────────────────────────────────────┤
│ 4. 当前会话上下文(session messages) │
│ - 当前对话链中的最近消息(由 Redis / JSONL 注入) │
└─────────────────────────────────────────────────────────┘
Agent 记忆写入规则:
┌─────────────────────────────────────────────────────────┐
│ 普通对话记忆 → wecom/{user_id}/MEMORY.md │
│ 全局共享信息 → MEMORY.md(需标注 [GLOBAL]) │
│ 群聊共享信息 → wecom/group:{group_id}/MEMORY.md │
└─────────────────────────────────────────────────────────┘
4.2 记忆文件物理结构
~/.hermes/
├── memories/
│ ├── MEMORY.md # 全局记忆(所有用户共享)
│ ├── USER.md # 全局用户配置
│ ├── skills/ # 全局技能(所有用户共享)
│ └── wecom/ # WeCom 用户私有记忆
│ ├── Xiao/
│ │ ├── MEMORY.md # 徐滔私有记忆
│ │ └── USER.md # 徐滔私有配置
│ ├── SiNeiKe/
│ ├── YiYeZhiQiu/
│ ├── aiden/
│ ├── ShuGuang/
│ ├── LiuWenWei/
│ ├── ZhangYuXiaoWanZi/
│ ├── ShiHuangZhe/
│ ├── BoWeiYa/
│ └── group:xxxxx/ # 群聊记忆(按群ID隔离)
│ └── MEMORY.md
├── sessions/
│ └── (按 session_id 的 .jsonl 文件)
└── state.db # 全局共用(暂不拆分)
| WeCom ID |
真名 |
私有记忆目录 |
| XiaGuoDong |
夏雨 |
default(全局视角,不隔离) |
| Xiao |
徐滔 |
wecom/Xiao/ |
| SiNeiKe |
杨卓 |
wecom/SiNeiKe/ |
| YiYeZhiQiu |
王嘉成 |
wecom/YiYeZhiQiu/ |
| aiden |
杨彪 |
wecom/aiden/ |
| ShuGuang |
张家学 |
wecom/ShuGuang/ |
| LiuWenWei |
刘文伟 |
wecom/LiuWenWei/ |
| ZhangYuXiaoWanZi |
张玉杰 |
wecom/ZhangYuXiaoWanZi/ |
| ShiHuangZhe |
朱一花 |
wecom/ShiHuangZhe/ |
| BoWeiYa |
戴美秋 |
wecom/BoWeiYa/ |
4.4 记忆读写核心逻辑
# 记忆读取(Agent 推理前)
def read_memory(user_id: str) -> str:
global_mem = read_file("~/.hermes/memories/MEMORY.md")
user_mem_path = f"~/.hermes/memories/wecom/{user_id}/MEMORY.md"
user_mem = read_file(user_mem_path) if exists(user_mem_path) else ""
return f"{global_mem}\n\n{user_mem}"
# 记忆写入(Agent 推理后)
def write_memory(user_id: str, content: str):
if content.strip().startswith("[GLOBAL]"):
append_to_file("~/.hermes/memories/MEMORY.md", content)
else:
user_mem_path = f"~/.hermes/memories/wecom/{user_id}/MEMORY.md"
append_to_file(user_mem_path, content)
# 写入前去重(最近20条)
deduplicate(user_mem_path, keep_recent=20)
# 群聊记忆
def write_group_memory(group_id: str, content: str):
group_mem_path = f"~/.hermes/memories/wecom/group:{group_id}/MEMORY.md"
append_to_file(group_mem_path, content)
五、wecom_bridge 桥接服务详解
5.1 核心模块职责
| 模块 |
职责 |
关键技术 |
| Webhook 接收 |
接收企业微信回调,URL 验证 |
aiohttp, asyncio |
| 签名验证 |
验证消息签名合法 |
hashlib, hmac |
| 消息解析 |
提取 FromUserName/Content/MsgType |
pydantic |
| Session 管理 |
Redis 读写历史会话 |
redis.asyncio |
| API Server 客户端 |
调用 Hermes api_server |
aiohttp |
| 异步回复队列 |
批量推送企业微信消息 |
asyncio.Queue |
| Access Token 管理 |
自动获取/刷新微信 API Token |
缓存 + 过期刷新 |
5.2 异步队列设计
┌──────────────────────────────────────────────────┐
│ asyncio.Queue (maxsize=1000) │
│ │
│ put({to_user, content, agent_id}) ← 生产者 │
│ │ │
│ ▼ │
│ [_reply_worker] → 串行发送消息到企业微信 │
│ │ │
│ ▼ │
│ 失败 → 重试3次 → 记录日志 → 丢弃 │
└──────────────────────────────────────────────────┘
回复优先级:
- 文本消息:正常队列
- 消息类型的异常(如重复回调):优先处理
5.3 与现有 Gateway 的部署关系
| 维度 |
现有 Gateway |
wecom_bridge |
| 处理范围 |
所有平台(WeCom/WeChat/其他) |
仅 WeCom 消息 |
| 部署方式 |
Systemd 托管 |
独立进程 + Systemd |
| 端口 |
内部 8000 |
内部 8080(Nginx 分流) |
| Session 存储 |
JSONL 文件 |
Redis + JSONL 双写 |
| 并发模型 |
单线程同步 |
asyncio 异步多用户 |
| 回滚 |
— |
关闭 bridge → Nginx 切回 Gateway |
Nginx 路由策略:
/wecom/* → wecom_bridge:8080 (WeCom 回调)
/api/* → Hermes api_server (OpenAI 兼容 API)
/ → 现有 Gateway:8000 (其他所有)
六、api_server 端改造
在 _handle_chat_completions 入口处,从 HTTP Header 提取租户/用户 ID:
# 在现有 request handler 中增加:
tenant_id = request.headers.get("X-Tenant-ID", "default")
user_id = request.headers.get("X-User-ID", "unknown")
# 注入到 session_key 构建逻辑
session_key = f"wecom:dm:{user_id}" # 替换原有构建逻辑
6.2 Session Key 路由到 Agent
# session.py 或 run.py 改造
def build_session_key(platform: str, chat_type: str, user_id: str) -> str:
return f"{platform}:{chat_type}:{user_id}"
# Agent 推理时接收 user_id 参数
async def run_agent(user_id: str, session_key: str, messages: list):
# 读取用户私有记忆
user_mem = read_user_memory(user_id)
# 追加到 system prompt 或 messages
messages = inject_memory_context(messages, user_mem)
# 执行推理...
6.3 现有 thread pool 并发复用
api_server 已实现 _MAX_CONCURRENT_RUNS = 10 的线程池,无需额外开发,wecom_bridge 的 10 用户并发模型直接复用此线程池。
七、完整目录结构
/www/wwwroot/ma.5jin.top/ # Vault 和 wiki 发布根目录
/
├── etc/
│ └── wecom_bridge/
│ └── config.yaml # 桥接服务配置(敏感信息)
│
├── opt/
│ └── wecom_bridge/
│ ├── wecom_bridge.py # 桥接服务主程序
│ ├── requirements.txt # Python 依赖
│ └── systemd/
│ └── wecom-bridge.service # Systemd 启动文件
│
~/.hermes/ # Hermes Agent 根目录
├── memories/
│ ├── MEMORY.md # 全局记忆(共享)
│ ├── USER.md # 全局配置
│ ├── skills/ # 技能目录
│ └── wecom/ # WeCom 用户私有记忆
│ ├── Xiao/MEMORY.md
│ ├── SiNeiKe/MEMORY.md
│ ├── YiYeZhiQiu/MEMORY.md
│ ├── aiden/MEMORY.md
│ ├── ShuGuang/MEMORY.md
│ ├── LiuWenWei/MEMORY.md
│ ├── ZhangYuXiaoWanZi/MEMORY.md
│ ├── ShiHuangZhe/MEMORY.md
│ ├── BoWeiYa/MEMORY.md
│ └── group:{group_id}/MEMORY.md # 群聊记忆
├── sessions/ # JSONL 会话历史(按 session_id)
└── state.db # 全局状态 DB(暂不拆分)
八、实施步骤
Phase 1:基础设施准备(约 30 分钟)
- Nginx 配置:增加
/wecom 路径反向代理到 127.0.0.1:8080
- Redis 确认:确认 Redis 已安装并可连接(用于 Session 存储)
- Python 依赖安装:
pip install aiohttp redis pyyaml pydantic
- 配置目录创建:
/etc/wecom_bridge/config.yaml
Phase 2:wecom_bridge 开发(约 6.5 小时)
| 模块 |
预计工时 |
优先级 |
| 主程序骨架 + 配置加载 |
30 分钟 |
P0 |
| Webhook 接收 + URL 验证 |
1 小时 |
P0 |
| 签名验证 |
30 分钟 |
P0 |
| SessionManager (Redis) |
1 小时 |
P0 |
| ApiServerClient |
30 分钟 |
P0 |
| WeComReplyClient + Token 管理 |
1 小时 |
P0 |
| 异步回复队列 |
30 分钟 |
P1 |
| RateLimiter 滑动窗口限流器 |
1 小时 |
P0 |
| Logger 全量日志采集器 |
1.5 小时 |
P0 |
| Systemd 启动文件 |
15 分钟 |
P1 |
Phase 3:Agent 记忆层改造(约 2 小时)
| 模块 |
预计工时 |
优先级 |
| 目录结构创建脚本 |
15 分钟 |
P0 |
| 记忆读取改造(全局 + 用户私有合并) |
1 小时 |
P0 |
| 记忆写入改造([GLOBAL] 标签支持) |
30 分钟 |
P0 |
| 用户配置加载(USER.md) |
15 分钟 |
P1 |
| 去重机制 |
15 分钟 |
P1 |
| 群聊记忆处理 |
15 分钟 |
P1 |
| 现有全局记忆迁移到各用户 |
1 小时 |
P2 |
| 模块 |
预计工时 |
优先级 |
| X-Tenant-ID / X-User-ID 解析 |
30 分钟 |
P0 |
| session_key 构建逻辑调整 |
30 分钟 |
P0 |
Phase 5:联调测试(约 2 小时)
| 测试场景 |
预期结果 |
| 企业微信发消息 → 收到回复 |
全流程正常工作 |
| 10 用户同时发消息 |
Thread pool 限流,顺序处理不崩溃 |
张家学说"我是客服" → 写入 wecom/ShuGuang/MEMORY.md |
记忆文件正确 |
| 王嘉成发消息 → 不读取到张家学的记忆 |
记忆隔离正常 |
| 关闭 wecom_bridge → Nginx 切回 Gateway |
回滚成功,WeCom 消息仍能到达 |
| 重启后 Session 历史恢复 |
Redis TTL 刷新正常 |
九、用户限流机制
9.1 限流设计原则
| 原则 |
说明 |
| 分层限流 |
第一层:wecom_bridge 消息接收层;第二层:api_server LLM 调用层 |
| 滑动窗口 |
使用 Redis ZSET 实现精确滑动窗口,避免固定窗口边界突发问题 |
| 规则可配置 |
限流规则写在 config.yaml,不硬编码 |
| 白名单豁免 |
管理员(夏总)不受限流约束 |
| 限流不阻塞 |
超限后消息进入排队列,不直接拒绝,用户感知友好 |
9.2 限流维度与配额表
| 用户 |
角色 |
每小时上限 |
每天上限 |
备注 |
| XiaGuoDong |
管理员 |
∞ |
∞ |
不限流 |
| Xiao / SiNeiKe / YiYeZhiQiu / aiden |
技术部 |
100 条/小时 |
500 条/天 |
开发调试 |
| ShuGuang / LiuWenWei / ZhangYuXiaoWanZi / ShiHuangZhe |
客服部 |
60 条/小时 |
300 条/天 |
日常办公 |
| BoWeiYa |
招商部 |
60 条/小时 |
300 条/天 |
日常办公 |
| 新用户(试用期 30 天) |
试用 |
20 条/小时 |
100 条/天 |
限制 |
配额余量提醒:当用户剩余配额低于 20% 时,在回复末尾提示"本月还剩 X 条额度"。
9.3 Redis 滑动窗口实现
限流 Key 设计(ZSET 滑动窗口):
ratelimit:{user_id}:hourly → ZSET (score=时间戳, member=msg_id)
ratelimit:{user_id}:daily → ZSET (score=时间戳, member=msg_id)
检查逻辑:
1. ZREMRANGEBYSCORE 清除窗口外的旧记录
2. ZCARD 计数当前窗口内消息数
3. 若超限 → 消息进入 asyncio.Queue 排队等待
4. 若未超限 → ZADD 写入当前消息 → 继续处理
# 滑动窗口限流器(RateLimiter)
import time
import uuid
import redis.asyncio as redis
class RateLimiter:
"""基于 Redis ZSET 的滑动窗口限流器"""
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url, encoding="utf-8", decode_responses=True)
async def is_allowed(
self,
user_id: str,
hourly_limit: int,
daily_limit: int,
) -> tuple[bool, str]:
"""
检查用户是否在限流配额内
返回 (是否允许, 剩余配额描述)
"""
now = time.time()
hourly_key = f"ratelimit:{user_id}:hourly"
daily_key = f"ratelimit:{user_id}:daily"
msg_id = f"{now}:{uuid.uuid4().hex[:8]}"
# 清除过期记录(窗口外)
hour_ago = now - 3600
day_ago = now - 86400
await self.redis.zremrangebyscore(hourly_key, 0, hour_ago)
await self.redis.zremrangebyscore(daily_key, 0, day_ago)
# 计数
hourly_count = await self.redis.zcard(hourly_key)
daily_count = await self.redis.zcard(daily_key)
# 超限检查
if hourly_count >= hourly_limit:
return False, f"小时级限流:已达上限 {hourly_limit} 条/小时,请稍后再试"
if daily_count >= daily_limit:
return False, f"日级限流:已达上限 {daily_limit} 条/天,请明天再试"
# 写入当前消息
pipe = self.redis.pipeline()
pipe.zadd(hourly_key, {msg_id: now})
pipe.zadd(daily_key, {msg_id: now})
pipe.expire(hourly_key, 3700) # 多留100秒防边界
pipe.expire(daily_key, 86500)
await pipe.execute()
remaining_hourly = hourly_limit - hourly_count - 1
remaining_daily = daily_limit - daily_count - 1
return True, f"remaining_hourly={remaining_hourly}, remaining_daily={remaining_daily}"
async def get_remaining(self, user_id: str, hourly_limit: int, daily_limit: int) -> dict:
"""查询用户当前剩余配额"""
now = time.time()
hour_ago = now - 3600
day_ago = now - 86400
hourly_key = f"ratelimit:{user_id}:hourly"
daily_key = f"ratelimit:{user_id}:daily"
await self.redis.zremrangebyscore(hourly_key, 0, hour_ago)
await self.redis.zremrangebyscore(daily_key, 0, day_ago)
hourly_count = await self.redis.zcard(hourly_key)
daily_count = await self.redis.zcard(daily_key)
return {
"hourly_remaining": max(0, hourly_limit - hourly_count),
"daily_remaining": max(0, daily_limit - daily_count),
}
9.4 限流在 wecom_bridge 中的集成位置
消息到达 wecom_bridge
│
▼
[Step 1] 解析 FromUserName / MsgType
│
▼
[Step 2] 检查限流(RateLimiter.is_allowed)
│
├─ 允许 → 正常处理流程
│
└─ 拒绝 → 立即返回限流提示消息
"您本周期的消息额度已用完(60/60),
请联系管理员提升配额或等待配额重置。"
9.5 超出限流后的处理策略
| 策略 |
行为 |
适用场景 |
| 队列等待 |
消息进入 asyncio.Queue 排队,等配额刷新后再处理 |
临时突发、非紧急 |
| 友好拒绝 |
立即返回限流提示,不进入队列 |
配额确实耗尽 |
| 只读模式 |
限流后只能触发只读操作(查数据),不能触发 LLM 调用 |
严格成本控制 |
| 升级提示 |
限流后提示"配额即将用完,升级企业套餐请联系我们" |
商业化场景 |
推荐方案:日常用户使用"队列等待",超出日限额后使用"友好拒绝"。
9.6 特殊限流场景
突发消息(Burst):
- 部分用户偶尔需要一次性发多条消息(如批量汇报)
- 令牌桶算法支持突发:
redis.call('INCRBY', key, n) 一次消耗 n 个令牌
- 在配置中标记
burst_allowed: true 的用户可临时超配
试用期用户:
- 前 30 天受严格限制(20/100)
- 30 天后自动升级为正式用户配额(根据部门)
- 试用期标记存在
wecom/{user_id}/USER.md 的 join_date 字段
LLM Token 限流(进阶):
- 在 api_server 层增加 token 计数器(比消息数更精确)
- Key:
tokens:{user_id}:{hour},每次 LLM 调用后累加 usage.total_tokens
- 超出 token 配额时返回 429,由 wecom_bridge 展示友好提示
9.7 限流配置表(config.yaml)
ratelimit:
enabled: true
# 用户配额配置
quotas:
# 管理员不限
XiaGuoDong:
hourly: -1 # -1 表示不限
daily: -1
# 技术部
Xiao:
SiNeiKe:
YiYeZhiQiu:
aiden:
hourly: 100
daily: 500
burst_allowed: true
# 客服部
ShuGuang:
LiuWenWei:
ZhangYuXiaoWanZi:
ShiHuangZhe:
hourly: 60
daily: 300
burst_allowed: false
# 招商部
BoWeiYa:
hourly: 60
daily: 300
burst_allowed: false
# 试用期默认配额(新用户自动匹配)
trial:
hourly: 20
daily: 100
duration_days: 30
# 超限行为
overflow_action: "queue" # queue | reject | read_only
queue_maxsize: 100 # 排队列上限
warning_threshold: 0.2 # 剩余 20% 时提示
9.8 监控与告警
# 限流指标采集(Prometheus 格式)
ratelimit_total{user_id} # 用户总请求数
ratelimit_rejected{user_id} # 被限流拒绝数
ratelimit_queue_size{user_id} # 用户当前排队列长度
# 告警规则
- 当某用户被拒绝率 > 10%/小时 → 发送告警(可能是异常调用)
- 当某用户日配额使用率 > 90% → 提醒用户配额即将耗尽
- 当排队列长度 > 50 → 提示系统繁忙
十、用户日志管理
10.1 日志设计原则
| 原则 |
说明 |
| 全量采集 |
每条消息的完整生命周期都要记录,不可丢失 |
| 结构化日志 |
JSON 格式,便于后续查询和分析 |
| 用户隔离 |
各用户日志物理隔离在独立目录/Redis key |
| 脱敏合规 |
手机号、身份证等敏感字段自动打码 |
| 分层存储 |
热数据存 Redis,冷数据归档到文件 |
| 可查询 |
支持按用户/时间/事件类型/关键词查询 |
10.2 日志采集维度
每条消息从发送到回复,完整记录以下节点:
| 节点 |
字段 |
说明 |
| 消息到达 |
msg_id, user_id, timestamp, msg_type, content_preview |
消息何时来、内容摘要(脱敏) |
| 限流检查 |
rate_limit_checked, quota_hourly_used, quota_daily_used, allowed |
是否超限流 |
| Session加载 |
session_key, history_length, redis_hit |
历史会话长度 |
| LLM调用 |
model, prompt_tokens, completion_tokens, total_tokens, latency_ms |
token消耗和延迟 |
| 记忆读写 |
memory_read_global, memory_read_user, memory_written_to, memory_lines_added |
记忆操作 |
| 回复发送 |
reply_length, reply_preview, send_success, send_latency_ms |
回复结果 |
| 异常记录 |
error_type, error_message, stack_trace |
错误信息 |
10.3 日志存储架构
┌─────────────────────────────────────────────────────────────────┐
│ wecom_bridge │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 日志采集器 (Logger) │ │
│ │ 每一步操作 → 异步写入 → 双目标 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────┴─────────────┐ │
│ ▼ ▼ │
│ ┌───────────────────┐ ┌───────────────────┐ │
│ │ Redis (热存储) │ │ 文件 (冷存储) │ │
│ │ logs:{user_id}: │ │ /var/log/wecom/ │ │
│ │ {date}:{hour} │ │ {user_id}/ │ │
│ │ TTL: 7天 │ │ {date}.jsonl │ │
│ └───────────────────┘ └───────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
存储分层策略:
- Redis:最近 7 天热数据,ZSET 按时间戳排序,支持范围查询
- 文件:7 天后归档为 JSONL,按用户/日期分目录
- 保留期限:默认 90 天,到期自动删除或归档到对象存储
10.4 Redis 日志 Key 设计
# 用户日志(ZSET,按时间戳排序)
logs:{user_id}:{YYYY-MM-DD} → ZSET (score=timestamp, member=json_log)
TTL: 8天(多留1天防边界)
# 用户当日索引(方便按小时查)
logs:{user_id}:{YYYY-MM-DD}:hour:{HH} → ZSET (score=timestamp, member=msg_id)
TTL: 2天
# 全局操作日志(所有用户的汇总,用于监控)
logs:global:{YYYY-MM-DD} → ZSET (score=timestamp, member=json_log)
TTL: 30天
10.5 日志格式(JSON Schema)
{
"msg_id": "wx2msg_abc123",
"user_id": "ShuGuang",
"tenant_id": "wxcrmb00xxxx",
"timestamp": "2026-05-05T19:30:00.123+08:00",
"event": "message_sent",
"session_key": "wecom:dm:ShuGuang",
"platform": "wecom",
"data": {
"msg_type": "text",
"content_preview": "帮我查今天的销售数据",
"content_length": 50,
"has_attachment": false
},
"context": {
"rate_limit": {
"hourly_used": 12,
"hourly_limit": 60,
"daily_used": 45,
"daily_limit": 300,
"allowed": true
},
"session": {
"history_length": 38,
"redis_hit": true
},
"llm": {
"model": "MiniMax-M2.7-highspeed",
"prompt_tokens": 1200,
"completion_tokens": 180,
"total_tokens": 1380,
"latency_ms": 1250
},
"memory": {
"read_global_lines": 45,
"read_user_lines": 23,
"written_to": "wecom/ShuGuang/MEMORY.md",
"lines_added": 3
},
"reply": {
"length": 320,
"send_success": true,
"send_latency_ms": 230
}
},
"error": null
}
content_preview 脱敏规则:
def sanitize_content(content: str) -> str:
"""脱敏处理,防止敏感信息写入日志"""
# 手机号打码
content = re.sub(r'1[3-9]\d{9}', '1**********', content)
# 身份证号打码
content = re.sub(r'\d{17}[\dXx]', '**************', content)
# 银行卡号打码
content = re.sub(r'\d{12,19}', '************', content)
# 截断超长内容
if len(content) > 200:
content = content[:200] + '...[truncated]'
return content
10.6 日志查询接口
# wecom_bridge 提供内部查询接口(仅管理员可调用)
async def query_logs(
user_id: str = None,
start_time: datetime = None,
end_time: datetime = None,
event_type: str = None,
limit: int = 100,
) -> list[dict]:
"""
查询日志
- user_id: 限定用户,不填则查所有
- start_time / end_time: 时间范围
- event_type: message_sent / llm_call / error 等
- limit: 返回条数限制
"""
pass
# 示例:查询张家学今天的所有消息
GET /admin/logs?user_id=ShuGuang&date=2026-05-05&limit=50
# 示例:查询今天所有异常
GET /admin/logs?event_type=error&date=2026-05-05
# 示例:查询某条消息的完整链路
GET /admin/logs?msg_id=wx2msg_abc123
10.7 日志分析功能
用量统计Dashboard:
| 指标 |
说明 |
| 每小时消息量 |
各用户每小时消息数(折线图) |
| 日token消耗 |
按用户/部门汇总token用量 |
| 平均响应延迟 |
LLM latency P50/P95/P99 |
| 限流触发统计 |
各用户被限流次数 |
| 错误率 |
按错误类型分类统计 |
异常行为检测:
# 异常检测规则
ANOMALY_RULES = [
# 1小时内消息超过配额的80%
{"type": "high_frequency", "threshold": "quota * 0.8", "window": "1h"},
# 连续3次LLM调用失败
{"type": "llm_failure", "threshold": 3, "window": "5m"},
# 消息内容重复率>70%(可能是机器人刷屏)
{"type": "content_duplication", "threshold": 0.7, "window": "10m"},
# 单条消息token消耗异常高(>10k)
{"type": "high_token", "threshold": 10000, "window": "single"},
]
# 触发异常时:
# 1. 记录到 logs:global:{date} 的 ERROR 级别日志
# 2. 发送告警(企业微信管理员通知)
# 3. 可选:临时冻结该用户(需管理员确认)
10.8 日志配置(config.yaml)
logging:
enabled: true
# 日志级别
level: "INFO" # DEBUG | INFO | WARN | ERROR
# 存储策略
storage:
# Redis 热存储
redis:
enabled: true
ttl_days: 7
key_prefix: "logs"
# 文件冷存储
file:
enabled: true
base_path: "/var/log/wecom"
retention_days: 90
rotation: "daily" # daily | hourly | 100mb
compress: true # gzip 压缩旧日志
# 脱敏规则
sanitization:
enabled: true
rules:
- field: "content"
patterns:
- '1[3-9]\d{9}' # 手机号
- '\d{17}[\dXx]' # 身份证
- '\d{12,19}' # 银行卡
replacement: "[REDACTED]"
# 采集维度
events:
- "message_received" # 消息到达
- "rate_limit_check" # 限流检查
- "session_load" # Session加载
- "llm_call" # LLM调用
- "memory_operation" # 记忆读写
- "reply_sent" # 回复发送
- "error" # 异常
# 异常检测
anomaly_detection:
enabled: true
alert_threshold:
high_frequency_rate: 0.8 # 超过配额80%触发
error_rate_per_minute: 5 # 每分钟5次错误触发
high_token_single: 10000 # 单次超过10k token触发
10.9 安全与合规
| 维度 |
措施 |
| 访问控制 |
日志查询接口仅管理员(XiaGuoDong)可调用,需 Bearer Token 认证 |
| 数据隔离 |
各用户只能查询自己的日志,管理员可查所有人日志 |
| 传输加密 |
日志接口强制 HTTPS,Redis 连接支持 TLS |
| 敏感过滤 |
手机号、身份证、银行卡等自动脱敏 |
| 保留期限 |
Redis 7天 → 文件90天 → 自动删除 |
| 审计日志 |
管理员查询日志的行为本身也要记录(审计追踪) |
10.10 与现有系统的联动
- 与限流系统联动:限流触发时记录
rate_limit_check 事件
- 与 Session 管理联动:Session 加载/保存时记录
session_load 事件
- 与记忆系统联动:记忆读写时记录
memory_operation 事件
- 与监控告警联动:异常日志 → Prometheus metrics → 告警通知
十一、注意点与避坑指南
11.1 企业微信回调验证
- URL 验证时机:首次配置回调地址时,企业微信会发送
GET 请求验证 URL有效性,需在 5 秒内返回 echostr
- 消息签名验证:每次 POST 回调都必须验证
msg_signature,使用 hmac.compare_digest 防止时序攻击
- Token 刷新:微信
access_token 有效期 2 小时,需在过期前主动刷新,代码中以 expires_in - 60 作为提前刷新阈值
11.2 并发控制
- api_server thread pool 上限:
MAX_CONCURRENT_RUNS = 10,超过的请求在 queue 中排队
- wecom_bridge 队列上限:
asyncio.Queue(maxsize=1000),满时 put 阻塞
- Redis 连接池:
max_connections=20,确保高并发下不耗尽连接
11.3 Session Key 唯一性
- 同一条消息不要重复处理:企业微信可能因未收到确认而重发同一消息,需要在 Redis 中记录已处理的
msg_id 进行去重
- 群聊与私聊的 session key 必须严格区分:格式不同 (
wecom:dm:user vs wecom:group:groupid)
11.4 记忆文件并发写入
- 同一用户并发写入:asyncio 场景下需要文件锁(
fcntl.flock)防止多条消息同时追加导致文件损坏
- 去重检查:写入前读取最近 20 条,比对内容重复则跳过
11.5 wecom_bridge 与 Gateway 的 Session 冲突
- 双写策略:wecom_bridge 写 Redis,Gateway 写 JSONL,两者互补
- 不要让两边写同一个 Redis key,否则 Gateway 重启后会出现短暂记忆丢失(建议 Redis 作为 WeCom 专用,Gateway 保持只写 JSONL)
- 最终方案:WeCom 流量全部切换到 wecom_bridge,Gateway 只处理 WeChat 等其他平台
11.6 [GLOBAL] 标签的记忆归属判断
- Agent 在写入记忆前需要判断该记忆是否具有全局性质(公司制度、技术文档等)
- 判断规则:
- 涉及"我们公司"、"公司规定" → 写全局
- 涉及个人任务、私人偏好 → 写用户私有
- 模糊场景 → 默认写用户私有,agent 可事后纠正
11.7 新用户自动创建目录
- 当
wecom/{user_id}/ 目录不存在时,在首次消息处理时自动创建
- 初始化
USER.md 和空 MEMORY.md,避免 404 文件错误
11.8 迁移现有全局记忆
- 现有
~/.hermes/memories/MEMORY.md 内容需要按用户拆分
- 策略:根据历史会话中用户的发言模式判断归属(模糊的保留全局)
- 建议:先不动,等用户自然积累新记忆后,全局文件中与具体用户相关的条目再逐步迁移
11.9 夏总(XiaGuoDong)的特殊处理
- 夏总使用
default profile(不走 wecom/XiaGuoDong/),始终拥有全局视角
- 夏总可选择性查看指定用户的私有记忆(如主动问"张家学最近在做什么")
11.10 state.db 暂不拆分
- 当前 190MB,拆分风险高
- 按
user_id 在 SQL 层面过滤即可实现逻辑隔离
- 后续如出现性能问题,再考虑按用户拆分到独立 DB 文件
十二、风险评估
| 风险 |
概率 |
影响 |
应对 |
| 企业微信重发消息导致重复处理 |
中 |
中 |
Redis 记录 msg_id 去重 |
| 并发写入记忆文件损坏 |
中 |
高 |
文件锁 fcntl.flock |
| Redis 连接池耗尽 |
低 |
高 |
监控 + 连接池上限 20 |
| 迁移记忆时用户归属错误 |
中 |
低 |
保守策略:模糊条目保留全局 |
| Bridge 崩溃影响 WeCom 服务 |
中 |
高 |
Systemd 自动重启 + Nginx 快速切回 Gateway |
| api_server thread pool 满载 |
低 |
中 |
队列排队 + 超时熔断 |
| 限流计数器被绕过(同一msg_id重放) |
低 |
中 |
msg_id 用 UUID,每次唯一 |
| 用户配额耗尽导致队列积压 |
中 |
中 |
监控队列长度 + 限流告警 |
| 日志写入失败导致消息丢失 |
低 |
高 |
写入失败时降级到文件日志,不阻塞主流程 |
| 日志量过大撑满磁盘 |
中 |
高 |
严格按 retention_days 执行清理,配监控告警 |
| 脱敏规则不全导致敏感信息泄露 |
低 |
高 |
定期审查正则表达式,上线前 review |
十三、后续扩展方向
- 技能权限隔离:
skills/shared/ + skills/private/{user_id}/,客服岗看不到服务器操作技能
- 工具权限分级:技术部全开,客服岗禁用高危工具(服务器命令等)
- 记忆可视化面板:用户可手动查看/编辑自己的私有记忆
- 跨用户共享上下文:显式
@mention 其他用户时,可临时读取对方记忆片段
- 多企业支持:通过
X-Tenant-ID 支持多个企业微信账号接入