Agent-06 预警监控与报告推送
版本: v1.0
概述: 基于ClickHouse数据湖的实时预警监控与定时报告推送系统
1. 系统架构
ClickHouse数据湖 ──▶ 规则引擎 ──▶ 预警分级处理器(🔴🟠🟡)
│
调度中心 ──▶ 推送渠道适配器(企业微信/钉钉/邮件)
│
订阅管理 ──▶ 报告渲染引擎
核心组件
| 组件 |
职责 |
技术选型 |
| 规则引擎 |
定义和管理预警规则 |
Python Rule Engine |
| 调度中心 |
定时任务触发 |
APScheduler |
| 推送适配器 |
多渠道消息推送 |
企业微信SDK / 钉钉SDK |
| 报告渲染 |
Markdown模板 + 数据填充 |
Jinja2 |
| 订阅管理 |
用户-报告-渠道映射 |
MySQL/ClickHouse |
2. 预警规则一览
库存类规则
| 规则编码 |
规则名称 |
条件 |
级别 |
| INV_001 |
库存为零预警 |
= 0 |
🔴紧急 |
| INV_002 |
库存低于安全库存 |
< 安全库存 |
🟠重要 |
| INV_003 |
库存周转率异常 |
> 90天 |
🟡关注 |
| INV_004 |
临期库存预警 |
到期 < 30天 |
🟠重要 |
| INV_005 |
超储预警 |
> 3倍安全库存 |
🟡关注 |
销售类规则
| 规则编码 |
规则名称 |
条件 |
级别 |
| SAL_001 |
日销售额为零 |
= 0 |
🔴紧急 |
| SAL_002 |
销售额同比下降 |
< -20% |
🟠重要 |
| SAL_004 |
毛利率异常 |
< 10% |
🟠重要 |
| SAL_006 |
热销商品缺货 |
库存 < 3天销量 |
🔴紧急 |
财务类规则
| 规则编码 |
规则名称 |
条件 |
级别 |
| FIN_001 |
应收账款逾期 |
> 30天 |
🔴紧急 |
| FIN_002 |
现金流为负 |
< 0 |
🔴紧急 |
| FIN_003 |
毛利率低于警戒线 |
< 10% |
🟠重要 |
| FIN_004 |
费用超支 |
> 预算110% |
🟠重要 |
客户类规则
| 规则编码 |
规则名称 |
条件 |
级别 |
| CUS_001 |
客户信用额度超限 |
> 100% |
🔴紧急 |
| CUS_005 |
客户流失预警 |
> 90天无交易 |
🟡关注 |
| CUS_007 |
大客户风险预警 |
欠款 > 50万 |
🔴紧急 |
3. 三级预警机制
| 级别 |
响应时效 |
通知范围 |
升级策略 |
| 🔴紧急 |
15分钟内 |
店长+区域经理+总部 |
30分钟未处理升级 |
| 🟠重要 |
1小时内 |
店长+区域经理 |
2小时未处理升级 |
| 🟡关注 |
当日内 |
相关负责人 |
无升级 |
4. 调度策略
| 报告类型 |
执行时间 |
数据范围 |
推送渠道 |
| 日报 |
T+1 08:00 |
前一日数据 |
企业微信/邮件 |
| 周报 |
周一 09:00 |
上周数据 |
企业微信/邮件/钉钉 |
| 月报 |
月末 18:00 |
当月数据 |
企业微信/邮件/钉钉 |
# APScheduler配置
scheduler.add_job(
func='report_tasks:daily_business_report',
trigger=CronTrigger(hour=8, minute=0),
id='daily_business_report',
name='每日经营日报'
)
scheduler.add_job(
func='report_tasks:weekly_summary_report',
trigger=CronTrigger(day_of_week='mon', hour=9, minute=0),
id='weekly_summary_report',
name='周报-经营汇总'
)
5. 推送渠道适配器
class WeChatWorkAdapter:
async def send(self, message, recipients):
payload = {
"msgtype": "markdown",
"markdown": {"content": message['content']}
}
async with aiohttp.ClientSession() as session:
async with session.post(self.webhook_url, json=payload) as resp:
return resp.status == 200
class DingTalkAdapter:
async def send(self, message, recipients):
import hmac, hashlib, base64, time, urllib.parse
timestamp = str(int(time.time() * 1000))
secret_enc = self.secret.encode('utf-8')
string_to_sign = f'{timestamp}\n{self.secret}'
sign = urllib.parse.quote_plus(base64.b64encode(
hmac.new(secret_enc, string_to_sign.encode('utf-8'), digestmod=hashlib.sha256).digest()
).decode('utf-8'))
url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={"msgtype": "markdown", "markdown": {"text": message['content']}}) as resp:
return resp.status == 200
6. 订阅管理
-- 订阅配置表
CREATE TABLE subscription_config (
id UInt64,
user_id String,
user_name String,
department String,
report_type Enum('daily','weekly','monthly'),
report_code String,
channels Array(String),
schedule_time String,
severity_filter Array(String),
store_filter Array(String),
enabled UInt8
) ENGINE = MergeTree() ORDER BY (user_id, report_type);
7. 告警阈值快速参考卡
【库存类】
├─ 🔴库存为零: = 0件
├─ 🟠低于安全库存: < 安全库存量
├─ 🟠临期预警: 到期 < 30天
└─ 🟡超储预警: > 3倍安全库存
【销售类】
├─ 🔴热销缺货: 库存 < 3天销量
├─ 🔴销售为零: 日销售额 = 0
├─ 🟠同比下降: 增长率 < -20%
└─ 🟠毛利率低: < 10%
【财务类】
├─ 🔴应收逾期: 逾期 > 30天
├─ 🔴现金流负: 余额 < 0
├─ 🟠费用超支: > 预算110%
└─ 🟠应付逾期: > 60天
【客户类】
├─ 🔴信用超限: 使用率 > 100%
├─ 🟠等级下降: 等级下滑1级
└─ 🟡流失预警: 90天无交易