x

Agent-06 预警监控与报告推送

版本: v1.0
概述: 基于ClickHouse数据湖的实时预警监控与定时报告推送系统

1. 系统架构

ClickHouse数据湖 ──▶ 规则引擎 ──▶ 预警分级处理器(🔴🟠🟡)
                            │
                    调度中心 ──▶ 推送渠道适配器(企业微信/钉钉/邮件)
                            │
                    订阅管理 ──▶ 报告渲染引擎

核心组件

组件 职责 技术选型
规则引擎 定义和管理预警规则 Python Rule Engine
调度中心 定时任务触发 APScheduler
推送适配器 多渠道消息推送 企业微信SDK / 钉钉SDK
报告渲染 Markdown模板 + 数据填充 Jinja2
订阅管理 用户-报告-渠道映射 MySQL/ClickHouse

2. 预警规则一览

库存类规则

规则编码 规则名称 条件 级别
INV_001 库存为零预警 = 0 🔴紧急
INV_002 库存低于安全库存 < 安全库存 🟠重要
INV_003 库存周转率异常 > 90天 🟡关注
INV_004 临期库存预警 到期 < 30天 🟠重要
INV_005 超储预警 > 3倍安全库存 🟡关注

销售类规则

规则编码 规则名称 条件 级别
SAL_001 日销售额为零 = 0 🔴紧急
SAL_002 销售额同比下降 < -20% 🟠重要
SAL_004 毛利率异常 < 10% 🟠重要
SAL_006 热销商品缺货 库存 < 3天销量 🔴紧急

财务类规则

规则编码 规则名称 条件 级别
FIN_001 应收账款逾期 > 30天 🔴紧急
FIN_002 现金流为负 < 0 🔴紧急
FIN_003 毛利率低于警戒线 < 10% 🟠重要
FIN_004 费用超支 > 预算110% 🟠重要

客户类规则

规则编码 规则名称 条件 级别
CUS_001 客户信用额度超限 > 100% 🔴紧急
CUS_005 客户流失预警 > 90天无交易 🟡关注
CUS_007 大客户风险预警 欠款 > 50万 🔴紧急

3. 三级预警机制

级别 响应时效 通知范围 升级策略
🔴紧急 15分钟内 店长+区域经理+总部 30分钟未处理升级
🟠重要 1小时内 店长+区域经理 2小时未处理升级
🟡关注 当日内 相关负责人 无升级

4. 调度策略

报告类型 执行时间 数据范围 推送渠道
日报 T+1 08:00 前一日数据 企业微信/邮件
周报 周一 09:00 上周数据 企业微信/邮件/钉钉
月报 月末 18:00 当月数据 企业微信/邮件/钉钉
# APScheduler配置
scheduler.add_job(
    func='report_tasks:daily_business_report',
    trigger=CronTrigger(hour=8, minute=0),
    id='daily_business_report',
    name='每日经营日报'
)

scheduler.add_job(
    func='report_tasks:weekly_summary_report',
    trigger=CronTrigger(day_of_week='mon', hour=9, minute=0),
    id='weekly_summary_report',
    name='周报-经营汇总'
)

5. 推送渠道适配器

class WeChatWorkAdapter:
    async def send(self, message, recipients):
        payload = {
            "msgtype": "markdown",
            "markdown": {"content": message['content']}
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(self.webhook_url, json=payload) as resp:
                return resp.status == 200

class DingTalkAdapter:
    async def send(self, message, recipients):
        import hmac, hashlib, base64, time, urllib.parse
        timestamp = str(int(time.time() * 1000))
        secret_enc = self.secret.encode('utf-8')
        string_to_sign = f'{timestamp}\n{self.secret}'
        sign = urllib.parse.quote_plus(base64.b64encode(
            hmac.new(secret_enc, string_to_sign.encode('utf-8'), digestmod=hashlib.sha256).digest()
        ).decode('utf-8'))
        url = f"{self.webhook_url}&timestamp={timestamp}&sign={sign}"
        async with aiohttp.ClientSession() as session:
            async with session.post(url, json={"msgtype": "markdown", "markdown": {"text": message['content']}}) as resp:
                return resp.status == 200

6. 订阅管理

-- 订阅配置表
CREATE TABLE subscription_config (
    id              UInt64,
    user_id         String,
    user_name       String,
    department      String,
    report_type     Enum('daily','weekly','monthly'),
    report_code     String,
    channels        Array(String),
    schedule_time   String,
    severity_filter Array(String),
    store_filter    Array(String),
    enabled         UInt8
) ENGINE = MergeTree() ORDER BY (user_id, report_type);

7. 告警阈值快速参考卡

【库存类】
├─ 🔴库存为零: = 0件
├─ 🟠低于安全库存: < 安全库存量
├─ 🟠临期预警: 到期 < 30天
└─ 🟡超储预警: > 3倍安全库存

【销售类】
├─ 🔴热销缺货: 库存 < 3天销量
├─ 🔴销售为零: 日销售额 = 0
├─ 🟠同比下降: 增长率 < -20%
└─ 🟠毛利率低: < 10%

【财务类】
├─ 🔴应收逾期: 逾期 > 30天
├─ 🔴现金流负: 余额 < 0
├─ 🟠费用超支: > 预算110%
└─ 🟠应付逾期: > 60天

【客户类】
├─ 🔴信用超限: 使用率 > 100%
├─ 🟠等级下降: 等级下滑1级
└─ 🟡流失预警: 90天无交易
Left-click: follow link, Right-click: select node, Scroll: zoom
x