x

五金门店AI智能体分析体系(新架构版)

版本:v2.0(基于 MySQL + ClickHouse + Elasticsearch 三层架构)
日期:2026-04-30
概述:重构原有6个Agent,从单一ClickHouse数据湖升级为 MySQL主库(事务)+ ClickHouse(分析)+ Elasticsearch(搜索)的三层架构

一、整体架构设计

1.1 三层架构概览

┌─────────────────────────────────────────────────────────────────────────────┐
│                         数据流向与系统架构图                                 │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│   ┌──────────────┐                    ┌──────────────────────────────────┐ │
│   │   用户/店员   │                    │           业务写入                │ │
│   └──────┬───────┘                    └────────────────┬─────────────────┘ │
│          │                                       │                        │
│          ▼                                       ▼                        │
│   ┌──────────────┐                    ┌──────────────────────┐          │
│   │   前端应用    │──────────────────▶│   MySQL CDB 主库      │          │
│   │  (进销存系统) │      业务写入      │  (腾讯云·事务库)      │          │
│   └──────────────┘                    └──────────┬───────────┘          │
│                                                    │                       │
│          ┌────────────────────────────────────────┼───────────────────┐   │
│          │                                        │                    │   │
│          │                            CDC同步(腾讯云DTS)              │   │
│          │                                        │                    │   │
│          ▼                                        ▼                    │   │
│   ┌─────────────────┐                 ┌──────────────────────┐         │   │
│   │ Elasticsearch   │                 │   ClickHouse          │         │   │
│   │ TCO-ES          │                 │   TCHouse-C           │         │   │
│   │ (商品搜索层)    │                 │   (OLAP分析库)        │         │   │
│   │                 │                 │                      │         │   │
│   │ ·商品模糊搜索   │                 │ ·订单统计             │         │   │
│   │ ·拼音首字母搜索 │                 │ ·库存预测             │         │   │
│   │ ·自动补全      │                 │ ·财务月报             │         │   │
│   │ ·ELK日志      │                 │ ·供应商客户分析       │         │   │
│   └────────┬────────┘                 └──────────┬───────────┘         │   │
│            │                                     │                       │   │
│            │  查询路由                            │  查询路由              │   │
│            ▼                                     ▼                       │   │
│   ┌─────────────────┐                 ┌──────────────────────┐         │   │
│   │  搜索服务        │                 │   分析报表服务         │         │   │
│   │  ·商品查询      │                 │   ·AI智能体分析       │         │   │
│   │  ·订单检索      │                 │   ·Dashboard         │         │   │
│   └─────────────────┘                 └──────────────────────┘         │   │
│                                                                             │
│   ┌─────────────────────────────────────────────────────────────────────┐ │
│   │                     推送层(企业微信/钉钉/邮件)                      │ │
│   └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘

1.2 三层职责分工

层级 数据库 职责 特点
事务层 MySQL CDB 全部业务读写、事务处理 ACID强一致、在线事务
分析层 ClickHouse TCHouse-C 订单统计、库存预测、财务月报 列式存储、聚合极快
搜索层 Elasticsearch TCO-ES 商品模糊搜索、订单快速检索 倒排索引、全文搜索

1.3 新架构优势

对比维度 旧架构(纯ClickHouse) 新架构(MySQL+CH+ES)
事务能力 ❌ 弱 ✅ MySQL完整支持
分析查询 ✅ 快 ✅ 更快(独立资源)
商品搜索 ⚠️ 一般 ✅ 极强(ES专用)
写入性能 ⚠️ CDC压力 ✅ 主库直写,CH/ES同步
运维复杂度 中(云托管降低压力)
数据一致性 ✅ 高 ✅ 事务层保证
成本 中(略高,但可接受)

二、CDC同步方案

2.1 腾讯云DTS配置

同步路径:MySQL CDB(源库)→ ClickHouse TCHouse-C(目标库)
      MySQL CDB(源库)→ Elasticsearch TCO-ES(目标库)

同步类型:全量 + 增量(实时同步,秒级延迟)

2.2 MySQL → ClickHouse DTS配置步骤

控制台路径:腾讯云 → 数据传输服务DTS → 创建迁移/同步任务

Step 1:选择任务类型
  └─ 数据同步(不是迁移,同步支持增量)

Step 2:配置源库
  └─ 接入类型:云数据库
  └─ 实例ID:选择MySQL CDB实例
  └─ 账号:具有REPLICATION权限的账号
  └─ 密码:***

Step 3:配置目标库
  └─ 目标库类型:ClickHouse
  └─ 接入类型:云数据库CH
  └─ 实例ID:选择TCHouse-C实例
  └─ 账号:默认root
  └─ 密码:***

Step 4:选择同步对象
  └─ 数据库:inventory_db, sales_db, finance_db
  └─ 表:支持正则批量选择,如 orders*, products*, inventory*
  └─ 列过滤:支持黑名单

Step 5:增量同步配置
  └─ Binlog格式:Row(推荐)
  └─ 保留时间:≥7天
  └─ 冲突处理:Overwrite(以源库为准)

Step 6:启动任务
  └─ 预检查:通过后启动
  └─ 增量延迟监控:配置告警阈值(>300秒)

2.3 MySQL → Elasticsearch DTS配置

Step 1:创建数据同步任务
  └─ 同步类型:MySQL → ES

Step 2:索引命名规则
  └─ 建议格式:{库名}_{表名}_v1
  └─ 示例:inventory_products_v1, sales_orders_v1

Step 3:字段映射
  └─ MySQL VARCHAR(长文本) → ES text(分词)
  └─ MySQL VARCHAR(短)   → ES keyword(不分词)
  └─ MySQL DECIMAL       → ES scaled_float
  └─ MySQL DATETIME      → ES date

Step 4:文档ID
  └─ 使用主键ID作为ES文档_id
  └─ 支持嵌套文档

2.4 同步账号权限

-- MySQL源库创建DTS专用账号
CREATE USER 'dts_sync'@'%' IDENTIFIED BY 'DtsPass@2026';
GRANT SELECT, LOCK TABLES ON *.* TO 'dts_sync'@'%';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dts_sync'@'%';
GRANT ALL PRIVILEGES ON `inventory_db`.* TO 'dts_sync'@'%';
GRANT ALL PRIVILEGES ON `sales_db`.* TO 'dts_sync'@'%';
FLUSH PRIVILEGES;

2.5 同步监控与告警

监控指标 告警阈值 处理建议
增量同步延迟 > 300秒 检查源库Binlog、目标库写入
同步速率 < 100条/秒 检查网络、目标库负载
任务状态 中断 自动重试,配置断点续传
源库Binlog保留 < 3天 立即延长保留时间

三、ClickHouse数据模型

3.1 数据分层设计(ODS / DWD / DWS)

MySQL主库 ──DTS同步──▶ ODS层(原始数据)──▶ DWD层(明细宽表)──▶ DWS层(汇总物化视图)

ODS层:原始数据层

-- 订单表ODS(保留CDC操作类型)
CREATE TABLE ods.orders
(
    order_id       String,
    store_id       String,
    customer_id    String,
    salesperson_id String,
    total_amount   Decimal(12,2),
    cost_amount    Decimal(12,2),
    discount_amount Decimal(12,2),
    payment_method Enum8('cash'=1,'wechat'=2,'alipay'=3,'credit'=4,'mixed'=5),
    order_status   Enum8('pending'=1,'confirmed'=2,'shipped'=3,'completed'=4,'cancelled'=5),
    order_date     Date,
    created_at     DateTime,
    updated_at     DateTime,
    _ts_ms         DateTime64(3) DEFAULT now64(3),
    _op            Enum8('insert'=1,'update'=2,'delete'=3)
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
PARTITION BY toYYYYMM(order_date)
ORDER BY (store_id, order_id, _ts_ms)
TTL order_date + INTERVAL 2 YEAR;

-- 订单明细ODS
CREATE TABLE ods.order_detail
(
    detail_id    String,
    order_id     String,
    product_id   String,
    product_code String,
    product_name String,
    category_id  String,
    warehouse_id String,
    quantity     UInt32,
    unit_price   Decimal(10,2),
    cost_price   Decimal(10,2),
    line_amount  Decimal(12,2),
    created_at   DateTime
)
ENGINE = ReplicatedReplacingMergeTree()
PARTITION BY toYYYYMM(created_at)
ORDER BY (order_id, detail_id)
TTL created_at + INTERVAL 2 YEAR;

-- 商品表ODS
CREATE TABLE ods.products
(
    product_id    String,
    product_code  String,
    product_name  String,
    category_id   String,
    category_name String,
    brand         String,
    spec          String,
    unit          String,
    price         Decimal(10,2),
    cost_price    Decimal(10,2),
    barcode       String,
    safe_stock    UInt32,
    _ts_ms        DateTime64(3) DEFAULT now64(3),
    _op           Enum8('insert'=1,'update'=2,'delete'=3)
)
ENGINE = ReplicatedReplacingMergeMergeTree(_ts_ms)
ORDER BY (category_id, product_id);

-- 库存表ODS
CREATE TABLE ods.inventory_stock
(
    warehouse_id   String,
    warehouse_name String,
    product_id     String,
    stock_qty      Int32,
    available_qty Int32,
    reserved_qty  Int32,
    update_time    DateTime,
    _ts_ms         DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
PARTITION BY toYYYYMM(update_time)
ORDER BY (warehouse_id, product_id);

-- 客户表ODS
CREATE TABLE ods.customers
(
    customer_id   String,
    customer_code String,
    customer_name String,
    customer_type Enum8('retail'=1,'wholesale'=2,'engineering'=3),
    credit_limit  Decimal(12,2),
    contact_person String,
    phone         String,
    address       String,
    _ts_ms        DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
ORDER BY (customer_type, customer_id);

-- 供应商表ODS
CREATE TABLE ods.suppliers
(
    supplier_id   String,
    supplier_code String,
    supplier_name String,
    contact_person String,
    phone         String,
    address       String,
    _ts_ms        DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
ORDER BY (supplier_id);

DWD层:明细宽表层

-- 订单宽表(去删除标记,合并更新)
CREATE TABLE dwd.order_fact
(
    order_id         String,
    store_id         String,
    store_name       String,
    customer_id      String,
    customer_name    String,
    customer_type    String,
    salesperson_id   String,
    salesperson_name String,
    order_date       Date,
    order_hour       UInt8,
    weekday          UInt8,
    total_amount     Decimal(12,2),
    cost_amount      Decimal(12,2),
    profit_amount    Decimal(12,2),
    profit_margin    Decimal(5,2),
    discount_amount  Decimal(12,2),
    payment_method   String,
    order_status     String,
    is_credit        UInt8,
    created_at       DateTime,
    updated_at       DateTime
)
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(order_date)
ORDER BY (store_id, order_date, order_id)
TTL order_date + INTERVAL 2 YEAR;

-- 库存宽表(最新余额)
CREATE TABLE dwd.inventory_fact
(
    warehouse_id   String,
    warehouse_name String,
    product_id     String,
    product_code   String,
    product_name   String,
    category_id    String,
    stock_qty      Int32,
    available_qty  Int32,
    reserved_qty   Int32,
    update_time    DateTime
)
ENGINE = ReplacingMergeTree(update_time)
ORDER BY (warehouse_id, product_id);

DWS层:汇总物化视图

-- 日粒度订单汇总(加速日报)
CREATE MATERIALIZED VIEW dws.mv_orders_daily
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(order_date)
ORDER BY (store_id, category_id, order_date)
AS
SELECT
    store_id,
    category_id,
    order_date,
    count()            AS order_count,
    uniqExact(order_id) AS order_num,
    sum(total_amount)  AS daily_sales,
    sum(cost_amount)   AS daily_cost,
    sum(profit_amount)  AS daily_profit,
    sum(profit_amount) / NULLIF(sum(total_amount), 0) * 100 AS profit_margin_pct,
    uniqExact(customer_id) AS customer_count
FROM dwd.order_fact
WHERE order_status NOT IN ('cancelled')
GROUP BY store_id, category_id, order_date;

-- 月粒度订单汇总(加速月报)
CREATE MATERIALIZED VIEW dws.mv_orders_monthly
ENGINE = SummingMergeTree()
ORDER BY (store_id, category_id, order_year, order_month)
AS
SELECT
    store_id,
    category_id,
    toYear(order_date)    AS order_year,
    toMonth(order_date)   AS order_month,
    count()               AS order_count,
    sum(total_amount)     AS monthly_sales,
    sum(profit_amount)    AS monthly_profit
FROM dwd.order_fact
WHERE order_status NOT IN ('cancelled')
GROUP BY store_id, category_id, order_year, order_month;

-- 商品销售排行汇总(加速爆款分析)
CREATE MATERIALIZED VIEW dws.mv_product_sales_rank
ENGINE = AggregatingMergeTree()
ORDER BY (rank_date, category_id, product_id)
AS
SELECT
    toDate(created_at) AS rank_date,
    category_id,
    product_id,
    product_name,
    sum(line_amount)   AS total_sales,
    sum(quantity)      AS total_qty,
    count()            AS order_count
FROM ods.order_detail
GROUP BY rank_date, category_id, product_id, product_name;

-- 库存余额汇总
CREATE MATERIALIZED VIEW dws.mv_inventory_latest
ENGINE = ReplacingMergeTree(update_time)
ORDER BY (warehouse_id, product_id)
AS
SELECT
    warehouse_id,
    warehouse_name,
    product_id,
    product_code,
    product_name,
    category_id,
    stock_qty,
    available_qty,
    update_time
FROM ods.inventory_stock;

-- 客户应收账款汇总
CREATE MATERIALIZED VIEW dws.mv_customer_ar
ENGINE = SummingMergeTree()
ORDER BY (customer_id, stat_month)
AS
SELECT
    customer_id,
    toStartOfMonth(created_at) AS stat_month,
    sum(total_amount)     AS total_sales,
    sum(profit_amount)    AS total_profit,
    count()               AS order_count
FROM dwd.order_fact
WHERE is_credit = 1 AND order_status NOT IN ('cancelled')
GROUP BY customer_id, stat_month;

四、Elasticsearch搜索方案

4.1 商品索引设计

{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "refresh_interval": "5s",
    "analysis": {
      "analyzer": {
        "ik_max_word": {
          "type": "ik_max_word"
        },
        "ik_smart": {
          "type": "ik_smart"
        },
        "pinyin_analyzer": {
          "type": "custom",
          "tokenizer": "pinyin_tokenizer",
          "filter": ["lowercase"]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "product_id":    { "type": "keyword" },
      "product_code":  { "type": "keyword" },
      "product_name": {
        "type": "text",
        "analyzer": "ik_max_word",
        "fields": {
          "keyword":   { "type": "keyword" },
          "pinyin":    { "type": "text", "analyzer": "pinyin_analyzer" }
        }
      },
      "category_id":   { "type": "keyword" },
      "category_name": { "type": "keyword" },
      "brand":         { "type": "keyword" },
      "spec": {
        "type": "text",
        "analyzer": "ik_smart"
      },
      "unit":          { "type": "keyword" },
      "price":         { "type": "scaled_float", "scaling_factor": 100 },
      "cost_price":   { "type": "scaled_float", "scaling_factor": 100 },
      "barcode":       { "type": "keyword" },
      "safe_stock":   { "type": "integer" },
      "stock_qty":    { "type": "integer" },
      "store_id":     { "type": "keyword" },
      "suggest": {
        "type": "completion",
        "analyzer": "simple",
        "preserve_separators": true,
        "max_input_length": 50
      },
      "updated_at":   { "type": "date" }
    }
  }
}

4.2 核心搜索查询

// 1. 商品模糊搜索(商品名+规格+拼音)
GET /products_v1/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "multi_match": {
            "query": "六角螺丝刀",
            "fields": ["product_name^3", "product_name.pinyin^2", "spec"],
            "type": "best_fields",
            "fuzziness": "AUTO"
          }
        }
      ],
      "filter": [
        { "term": { "store_id": "STORE_001" }},
        { "range": { "stock_qty": { "gt": 0 }}}
      ]
    }
  },
  "highlight": {
    "fields": {
      "product_name": {
        "pre_tags": ["<em>"],
        "post_tags": ["</em>"]
      }
    }
  },
  "size": 20
}

// 2. 自动补全(用于搜索框)
GET /products_v1/_search
{
  "suggest": {
    "product_suggest": {
      "prefix": "活扳",
      "completion": {
        "field": "suggest",
        "size": 5,
        "skip_duplicates": true,
        "fuzzy": { "fuzziness": "AUTO" }
      }
    }
  }
}

// 3. 按分类筛选+价格排序
GET /products_v1/_search
{
  "query": {
    "bool": {
      "must": [
        { "term": { "category_id": "CATE_TOOLS" }}
      ],
      "filter": [
        { "range": { "price": { "gte": 10, "lte": 100 }}}
      ]
    }
  },
  "sort": [
    { "_score": "desc" },
    { "sales_monthly": "desc" },
    { "price": "asc" }
  ],
  "size": 50
}

五、Agent-01:数据采集与同步

职责:MySQL主库 → ClickHouse + ES 的CDC同步架构设计与配置

5.1 同步架构

MySQL CDB(主库)
    │
    ├── DTS同步链路1 ──▶ ClickHouse TCHouse-C(分析库)
    │                       ├── ods.*(原始层)
    │                       ├── dwd.*(宽表层)
    │                       └── dws.mv_*(物化视图)
    │
    └── DTS同步链路2 ──▶ Elasticsearch TCO-ES(搜索库)
                            ├── products_v1(商品索引)
                            └── orders_v1(订单索引,可选)

5.2 同步表配置

MySQL源表 ClickHouse目标 ES目标 同步模式
orders ods.orders orders_v1 增量
order_detail ods.order_detail - 增量
products ods.products products_v1 全量+增量
inventory_stock ods.inventory_stock - 增量
customers ods.customers - 全量+增量
suppliers ods.suppliers - 全量+增量

5.3 调度策略

同步类型 频率 说明
全量同步 每月1日 02:00 重建索引,保证数据一致
增量同步 实时(秒级) DTS持续监听Binlog
数据校验 每日 06:00 抽查表行数,确保一致

六、Agent-02:每日经营分析

职责:基于ClickHouse数据湖,生成每日经营分析日报

6.1 核心KPI计算(ClickHouse SQL)

-- 每日核心KPI(查询物化视图,秒级响应)
SELECT
    order_date,
    sum(order_num)      AS order_count,
    sum(daily_sales)    AS total_sales,
    sum(daily_profit)   AS total_profit,
    sum(daily_profit) / NULLIF(sum(daily_sales), 0) * 100 AS profit_margin_pct,
    sum(customer_count) AS customer_count
FROM dws.mv_orders_daily
WHERE order_date = today() - 1
GROUP BY order_date;

-- 环比计算
WITH today AS (
    SELECT sum(daily_sales) AS sales FROM dws.mv_orders_daily WHERE order_date = today() - 1
),
yesterday AS (
    SELECT sum(daily_sales) AS sales FROM dws.mv_orders_daily WHERE order_date = today() - 2
)
SELECT
    (t.sales - y.sales) / NULLIF(y.sales, 0) * 100 AS growth_pct
FROM today t, yesterday y;

6.2 爆款分析SQL

-- 近30天爆款商品TOP10(含库存可售天数)
SELECT
    product_id,
    product_name,
    sum(total_sales)    AS total_sales,
    sum(total_qty)      AS total_qty,
    argMax(stock_qty, update_time) AS current_stock,
    argMax(stock_qty, update_time) / NULLIF(sum(total_qty) / 30, 0) AS stock_days_remaining
FROM dws.mv_product_sales_rank d
LEFT JOIN dws.mv_inventory_latest i ON d.product_id = i.product_id
WHERE rank_date BETWEEN today() - 30 AND today() - 1
GROUP BY product_id, product_name
ORDER BY total_sales DESC
LIMIT 10;

6.3 库存预警SQL

-- 库存预警:缺货 + 临期 + 积压
SELECT
    i.product_id,
    i.product_name,
    i.stock_qty,
    p.safe_stock,
    CASE
        WHEN i.stock_qty = 0 THEN '🔴 缺货'
        WHEN i.stock_qty < p.safe_stock THEN '🟠 库存不足'
        WHEN i.stock_qty > p.safe_stock * 3 THEN '🟡 积压'
        ELSE '🟢 正常'
    END AS alert_status
FROM dws.mv_inventory_latest i
JOIN ods.products p ON i.product_id = p.product_id
WHERE i.warehouse_id = 'MAIN_WH'
  AND (
    i.stock_qty = 0
    OR i.stock_qty < p.safe_stock
    OR i.stock_qty > p.safe_stock * 3
  )
ORDER BY alert_status, i.stock_qty ASC;

6.4 员工绩效SQL

-- 员工当日绩效排名
SELECT
    e.employee_name,
    e.department,
    count()            AS order_count,
    sum(o.total_amount) AS total_sales,
    sum(o.profit_amount) AS total_profit,
    sum(o.profit_amount) / NULLIF(sum(o.total_amount), 0) * 100 AS profit_margin,
    uniqExact(o.customer_id) AS customer_count
FROM dwd.order_fact o
JOIN ods.employees e ON o.salesperson_id = e.employee_id
WHERE o.order_date = today() - 1
  AND o.order_status NOT IN ('cancelled')
GROUP BY e.employee_name, e.department
ORDER BY total_sales DESC
LIMIT 10;

七、Agent-03:库存预测与补货

职责:基于历史销量预测库存需求,生成智能补货建议

7.1 安全库存计算(ClickHouse SQL)

WITH daily_sales AS (
    SELECT
        product_id,
        toDate(created_at) AS sale_date,
        sum(quantity) AS daily_qty
    FROM ods.order_detail
    WHERE created_at >= today() - 90
    GROUP BY product_id, toDate(created_at)
),
sales_stats AS (
    SELECT
        product_id,
        avg(daily_qty)           AS avg_daily_sales,
        stddevPop(daily_qty)    AS demand_stddev
    FROM daily_sales
    GROUP BY product_id
),
lead_times AS (
    SELECT product_id, avg(lead_time_days) AS avg_lead_time
    FROM ods.supplier_lead_times
    GROUP BY product_id
),
safe_stock_calc AS (
    SELECT
        s.product_id,
        p.product_name,
        s.avg_daily_sales,
        s.demand_stddev,
        l.avg_lead_time,
        -- 安全库存 97.5%服务水平
        ROUND(1.96 * s.demand_stddev * sqrt(l.avg_lead_time)) AS safe_stock,
        -- 再订货点
        ROUND(s.avg_daily_sales * l.avg_lead_time + 1.96 * s.demand_stddev * sqrt(l.avg_lead_time)) AS reorder_point
    FROM sales_stats s
    LEFT JOIN lead_times l ON s.product_id = l.product_id
    JOIN ods.products p ON s.product_id = p.product_id
)
SELECT
    *,
    GREATEST(0, reorder_point + safe_stock - current_stock) AS suggested_order_qty
FROM safe_stock_calc
ORDER BY suggested_order_qty DESC;

7.2 缺货预测SQL

-- 7天/14天缺货风险预测
WITH future_demand AS (
    SELECT
        product_id,
        sum(quantity) / 7 AS avg_daily_7d,
        sum(quantity)     AS demand_7d
    FROM ods.order_detail
    WHERE created_at >= today() - 7
    GROUP BY product_id
)
SELECT
    p.product_name,
    i.stock_qty,
    f.avg_daily_7d,
    ROUND(i.stock_qty / NULLIF(f.avg_daily_7d, 0)) AS stock_days_remaining,
    CASE
        WHEN i.stock_qty <= f.demand_7d THEN '🔴 7天内缺货'
        WHEN i.stock_qty <= f.demand_7d * 2 THEN '🟠 14天内缺货'
        ELSE '🟢 库存充足'
    END AS stockout_risk
FROM ods.products p
LEFT JOIN dws.mv_inventory_latest i ON p.product_id = i.product_id
LEFT JOIN future_demand f ON p.product_id = f.product_id
WHERE i.stock_qty <= f.demand_7d * 2 OR i.stock_qty = 0
ORDER BY i.stock_qty ASC;

7.3 滞销识别SQL

SELECT
    p.product_name,
    i.stock_qty,
    today() - toDate(last_out_date) AS days_since_last_sale,
    today() - toDate(last_in_date)  AS days_in_stock,
    CASE
        WHEN days_since_last_sale > 180 THEN '🆕 死库存'
        WHEN days_in_stock > 365 THEN '⚠️ 超龄库存'
        WHEN days_since_last_sale > 90 THEN '🔴 长期滞销'
        WHEN days_in_stock > 90 THEN '🟡 积压'
        ELSE '🟢 正常'
    END AS slow_moving_type
FROM dws.mv_inventory_latest i
JOIN ods.products p ON i.product_id = p.product_id
WHERE i.stock_qty > 0
  AND (days_since_last_sale > 90 OR days_in_stock > 180)
ORDER BY days_since_last_sale DESC
LIMIT 50;

八、Agent-04:供应商与客户分析

职责:供应商绩效评估 + 客户分层管理与赊销分析

8.1 供应商对账单SQL

SELECT
    sp.supplier_id,
    si.supplier_name,
    sum(purchase_amount)     AS total_purchase,
    sum(return_amount)        AS total_return,
    sum(purchase_amount) - sum(return_amount) AS net_purchase,
    sum(CASE WHEN payment_status = 'unpaid' THEN purchase_amount ELSE 0 END) AS accounts_payable,
    max(due_date)             AS latest_due_date,
    today() - max(due_date)   AS aging_days,
    CASE
        WHEN aging_days <= 30 THEN '🟢 正常'
        WHEN aging_days <= 60 THEN '🟡 预警'
        WHEN aging_days <= 90 THEN '🟠 警告'
        ELSE '🔴 危险'
    END AS aging_level
FROM ods.supplier_purchases sp
JOIN ods.suppliers si ON sp.supplier_id = si.supplier_id
WHERE purchase_date BETWEEN '{start_date}' AND '{end_date}'
GROUP BY sp.supplier_id, si.supplier_name
ORDER BY net_purchase DESC;

8.2 供应商绩效评分SQL

WITH delivery_score AS (
    SELECT
        supplier_id,
        ROUND(sum(is_on_time) * 100.0 / count(), 2) AS on_time_rate
    FROM ods.supplier_delivery
    WHERE actual_delivery_date BETWEEN '{start_date}' AND '{end_date}'
    GROUP BY supplier_id
),
quality_score AS (
    SELECT
        supplier_id,
        ROUND((1 - sum(return_quantity) / sum(quantity)) * 100, 2) AS quality_rate
    FROM ods.supplier_quality
    WHERE return_date BETWEEN '{start_date}' AND '{end_date}'
    GROUP BY supplier_id
)
SELECT
    si.supplier_name,
    ds.on_time_rate,
    qs.quality_rate,
    ROUND(ds.on_time_rate * 0.3 + qs.quality_rate * 0.3 + coop_score * 0.4, 2) AS total_score,
    CASE WHEN total_score >= 90 THEN 'A' WHEN total_score >= 75 THEN 'B' WHEN total_score >= 60 THEN 'C' ELSE 'D' END AS grade
FROM ods.suppliers si
LEFT JOIN delivery_score ds ON si.supplier_id = ds.supplier_id
LEFT JOIN quality_score qs ON si.supplier_id = qs.supplier_id
ORDER BY total_score DESC;

8.3 客户ABC分类SQL

WITH yearly_sales AS (
    SELECT
        customer_id,
        sum(total_amount) AS annual_sales
    FROM dwd.order_fact
    WHERE order_date BETWEEN '{year}-01-01' AND '{year}-12-31'
      AND order_status NOT IN ('cancelled')
    GROUP BY customer_id
),
sales_with_cumsum AS (
    SELECT
        customer_id,
        annual_sales,
        sum(annual_sales) OVER() AS total_sales,
        sum(annual_sales) OVER(ORDER BY annual_sales DESC) AS cumsum_sales
    FROM yearly_sales
)
SELECT
    ci.customer_name,
    s.annual_sales,
    ROUND(s.annual_sales / s.total_sales * 100, 2) AS sales_pct,
    ROUND(s.cumsum_sales / s.total_sales * 100, 2) AS cumsum_pct,
    CASE WHEN cumsum_pct <= 80 THEN 'A' WHEN cumsum_pct <= 95 THEN 'B' ELSE 'C' END AS abc_class
FROM sales_with_cumsum s
JOIN ods.customers ci ON s.customer_id = ci.customer_id
ORDER BY s.annual_sales DESC;

8.4 客户流失预警SQL

SELECT
    ci.customer_name,
    ci.customer_type,
    max(o.order_date)  AS last_transaction_date,
    today() - max(o.order_date) AS days_since_transaction,
    CASE
        WHEN max(o.order_date) IS NULL THEN '🔴 严重预警'
        WHEN today() - max(o.order_date) > 90 THEN '🔴 严重预警'
        WHEN today() - max(o.order_date) > 60 THEN '🟠 中度预警'
        WHEN today() - max(o.order_date) > 30 THEN '🟡 轻度预警'
        ELSE '🟢 正常'
    END AS warning_level
FROM ods.customers ci
LEFT JOIN dwd.order_fact o ON ci.customer_id = o.customer_id
GROUP BY ci.customer_id, ci.customer_name, ci.customer_type
HAVING days_since_transaction > 30 OR days_since_transaction IS NULL
ORDER BY days_since_transaction DESC NULLS FIRST;

8.5 员工提成计算SQL

SELECT
    e.employee_name,
    e.department,
    sum(o.total_amount)   AS total_sales,
    sum(CASE WHEN o.payment_method != 'credit' THEN o.total_amount ELSE 0 END) AS cash_sales,
    sum(o.profit_amount)  AS total_profit,
    ROUND(sum(o.profit_amount) / NULLIF(sum(o.total_amount), 0) * 100, 2) AS profit_margin,
    -- 综合提成 = 销售额 × 回款率系数 × 毛利率系数 × 基础提成率
    ROUND(
        sum(o.total_amount)
        * (sum(CASE WHEN o.payment_method != 'credit' THEN o.total_amount ELSE 0 END) / sum(o.total_amount))
        * (sum(o.profit_amount) / sum(o.total_amount))
        * 0.15,
    2) AS commission_amount
FROM dwd.order_fact o
JOIN ods.employees e ON o.salesperson_id = e.employee_id
WHERE o.order_date BETWEEN '{start_date}' AND '{end_date}'
  AND o.order_status NOT IN ('cancelled')
GROUP BY e.employee_name, e.department
ORDER BY commission_amount DESC;

九、Agent-05:财务与月度分析

职责:月度财务分析、经营健康度诊断

9.1 月度收入分析SQL

WITH monthly_data AS (
    SELECT
        toStartOfMonth(order_date) AS month,
        sum(total_amount)  AS gross_sales,
        sum(profit_amount) AS gross_profit,
        sum(profit_amount) / NULLIF(sum(total_amount), 0) * 100 AS gross_margin_pct
    FROM dwd.order_fact
    WHERE order_date >= toStartOfMonth(addMonths(today(), -12))
      AND order_status NOT IN ('cancelled', 'returned')
    GROUP BY toStartOfMonth(order_date)
),
expense_data AS (
    SELECT
        toStartOfMonth(expense_date) AS month,
        sum(amount) AS total_expense
    FROM ods.expense_record
    WHERE expense_date >= toStartOfMonth(addMonths(today(), -12))
    GROUP BY toStartOfMonth(expense_date)
)
SELECT
    m.month,
    m.gross_sales,
    m.gross_profit,
    m.gross_margin_pct,
    (m.gross_profit - e.total_expense) AS net_profit,
    (m.gross_profit - e.total_expense) / NULLIF(m.gross_sales, 0) * 100 AS net_margin_pct,
    e.total_expense,
    e.total_expense / NULLIF(m.gross_sales, 0) * 100 AS expense_ratio_pct
FROM monthly_data m
LEFT JOIN expense_data e ON m.month = e.month
ORDER BY m.month DESC;

9.2 库存周转分析SQL

WITH sales_data AS (
    SELECT
        toStartOfMonth(created_at) AS month,
        sum(cost_amount) AS cogs
    FROM ods.order_detail
    WHERE created_at >= toStartOfMonth(addMonths(today(), -12))
    GROUP BY toStartOfMonth(created_at)
),
avg_inventory AS (
    SELECT
        toStartOfMonth(update_time) AS month,
        avg(stock_qty * cost_price) AS avg_inventory_value
    FROM dws.mv_inventory_latest i
    JOIN ods.products p ON i.product_id = p.product_id
    WHERE update_time >= toStartOfMonth(addMonths(today(), -12))
    GROUP BY toStartOfMonth(update_time)
)
SELECT
    s.month,
    a.avg_inventory_value,
    s.cogs,
    a.avg_inventory_value / NULLIF(s.cogs / 30, 0) AS inventory_turnover_days,
    s.cogs / NULLIF(a.avg_inventory_value, 0) AS inventory_turnover_rate
FROM sales_data s
LEFT JOIN avg_inventory a ON s.month = a.month
ORDER BY s.month DESC;

9.3 应收账款账龄SQL

SELECT
    c.customer_code,
    c.customer_name,
    sum(CASE WHEN aging_days <= 0 THEN outstanding_amount ELSE 0 END) AS not_due_amount,
    sum(CASE WHEN aging_days BETWEEN 1 AND 30 THEN outstanding_amount ELSE 0 END) AS overdue_1_30,
    sum(CASE WHEN aging_days BETWEEN 31 AND 60 THEN outstanding_amount ELSE 0 END) AS overdue_31_60,
    sum(CASE WHEN aging_days BETWEEN 61 AND 90 THEN outstanding_amount ELSE 0 END) AS overdue_61_90,
    sum(CASE WHEN aging_days > 90 THEN outstanding_amount ELSE 0 END) AS overdue_90_plus,
    sum(outstanding_amount) AS total_outstanding
FROM (
    SELECT
        ar.customer_id,
        ar.invoice_amount - ar.paid_amount AS outstanding_amount,
        toDate(ar.due_date) - today() AS aging_days
    FROM ods.ar_record ar
    WHERE ar.invoice_amount > ar.paid_amount
) t
JOIN ods.customers c ON t.customer_id = c.customer_id
GROUP BY c.customer_code, c.customer_name
ORDER BY total_outstanding DESC
LIMIT 50;

9.4 经营健康度仪表盘

维度 指标 健康区间 计算方式
盈利能力 毛利率 > 25% profit_amount / total_amount
盈利能力 净利率 > 10% net_profit / total_sales
运营效率 库存周转天数 < 60天 avg_inventory / cogs × 30
运营效率 应收周转天数 < 45天 avg_ar / sales × 30
财务安全 流动比率 > 1.5 current_assets / current_liabilities
费用控制 费用率 < 15% total_expense / total_sales
成长性 收入增长率 > 10% (本月 - 上月) / 上月 × 100

十、Agent-06:预警监控与报告推送

职责:预警规则引擎 + 多渠道推送 + 多Agent协作

10.1 系统架构

ClickHouse数据湖 ──▶ 规则引擎 ──▶ 预警分级处理器(🔴🟠🟡)
                            │
                    调度中心 ──▶ 推送渠道适配器(企业微信/钉钉/邮件)
                            │
                    订阅管理 ──▶ 报告渲染引擎

10.2 预警规则一览

库存类规则:

规则编码 规则名称 条件 级别
INV_001 库存为零预警 = 0 🔴紧急
INV_002 库存低于安全库存 < 安全库存 🟠重要
INV_003 库存周转率异常 > 90天 🟡关注
INV_004 超储预警 > 3倍安全库存 🟡关注

销售类规则:

规则编码 规则名称 条件 级别
SAL_001 日销售额为零 = 0 🔴紧急
SAL_002 销售额同比下降 < -20% 🟠重要
SAL_003 热销商品缺货 库存 < 3天销量 🔴紧急
SAL_004 毛利率异常 < 10% 🟠重要

财务类规则:

规则编码 规则名称 条件 级别
FIN_001 应收账款逾期 > 30天 🔴紧急
FIN_002 现金流为负 < 0 🔴紧急
FIN_003 毛利率低于警戒线 < 10% 🟠重要
FIN_004 费用超支 > 预算110% 🟠重要

客户类规则:

规则编码 规则名称 条件 级别
CUS_001 客户信用额度超限 > 100% 🔴紧急
CUS_002 客户流失预警 > 90天无交易 🟡关注
CUS_003 大客户风险预警 欠款 > 50万 🔴紧急

10.3 三级预警机制

级别 响应时效 通知范围 升级策略
🔴紧急 15分钟内 店长+区域经理+总部 30分钟未处理升级
🟠重要 1小时内 店长+区域经理 2小时未处理升级
🟡关注 当日内 相关负责人 无升级

10.4 调度策略

报告类型 执行时间 数据范围 推送渠道
日报 T+1 08:00 前一日数据 企业微信/邮件
周报 周一 09:00 上周数据 企业微信/钉钉
月报 月末 18:00 当月数据 企业微信/钉钉/邮件

10.5 推送渠道适配器

# 企业微信推送
class WeChatWorkAdapter:
    async def send(self, message, webhook_url):
        payload = {
            "msgtype": "markdown",
            "markdown": {"content": message['content']}
        }
        async with aiohttp.ClientSession() as session:
            async with session.post(webhook_url, json=payload) as resp:
                return resp.status == 200

# 钉钉推送
class DingTalkAdapter:
    async def send(self, message, webhook_url, secret):
        timestamp = str(int(time.time() * 1000))
        string_to_sign = f'{timestamp}\n{secret}'
        sign = urllib.parse.quote_plus(base64.b64encode(
            hmac.new(secret.encode(), string_to_sign.encode(),
                     digestmod=hashlib.sha256).digest()
        ).decode())
        url = f"{webhook_url}&timestamp={timestamp}&sign={sign}"
        async with aiohttp.ClientSession() as session:
            async with session.post(url, json={"msgtype": "markdown",
                                               "markdown": {"text": message['content']}}) as resp:
                return resp.status == 200

十一、部署方案与成本估算

11.1 腾讯云资源配置

组件 产品 推荐配置 月费用(参考)
主库 MySQL CDB 4核8G 200GB ~500元
只读实例 MySQL CDB RO 2核4G 100GB ~200元
分析库 TCHouse-C 4核16G 500GB SSD ~1500元
搜索服务 TCO-ES 2核4G 200GB ~800元
数据同步 DTS 按量付费 ~300元
合计 - - ~3300元/月

11.2 集群配置建议(TCHouse-C)

数据量级 分片数 副本数 节点规格 存储
<100GB 1 2 4核16G 500GB SSD
100GB-1TB 2 2 8核32G 1TB SSD

11.3 实施路线图

阶段 时间 任务
Phase 1 第1周 腾讯云CDB创建,DTS同步配置,TCHouse-C集群创建
Phase 2 第2周 ClickHouse DDL部署,物化视图创建,数据同步验证
Phase 3 第3周 ES索引设计,商品搜索功能开发
Phase 4 第4周 6个Agent功能开发,报表模板配置
Phase 5 第5周 企业微信/钉钉推送集成,全流程测试
Phase 6 第6周 上线试运行,数据一致性校验,告警规则调优

十二、总结

12.1 新架构核心价值

维度 改进说明
数据一致性 MySQL主库保证事务一致性,CDC同步保证分析库最终一致
查询性能 ClickHouse独立资源,复杂聚合查询秒级响应
搜索能力 ES专业搜索,商品模糊匹配、拼音搜索、补全推荐
架构解耦 三层独立扩缩容,故障隔离
运维简化 腾讯云托管服务,DTS一键同步

12.2 6个Agent职责回顾

Agent 核心职责 数据来源
Agent-01 CDC数据同步 MySQL → ClickHouse/ES
Agent-02 每日经营分析 ClickHouse DWS物化视图
Agent-03 库存预测补货 ClickHouse + 历史销量模型
Agent-04 供应商客户分析 ClickHouse ODS层
Agent-05 财务月报分析 ClickHouse DWS层
Agent-06 预警监控推送 ClickHouse规则引擎 + 企业微信

12.3 与旧架构对比

对比项 旧架构 新架构
搜索能力 一般 极强(ES专用)
事务支持 完整(MySQL主库)
数据同步 复杂CDC DTS托管(同云)
运维负担 中(云托管降低)
成本 中(略增)

文档版本:v2.0
更新日期:2026-04-30
主架构:MySQL + ClickHouse TCHouse-C + Elasticsearch TCO-ES + 腾讯云DTS

Left-click: follow link, Right-click: select node, Scroll: zoom
x