五金门店AI智能体分析体系(新架构版)
版本:v2.0(基于 MySQL + ClickHouse + Elasticsearch 三层架构)
日期:2026-04-30
概述:重构原有6个Agent,从单一ClickHouse数据湖升级为 MySQL主库(事务)+ ClickHouse(分析)+ Elasticsearch(搜索)的三层架构
一、整体架构设计
1.1 三层架构概览
┌─────────────────────────────────────────────────────────────────────────────┐
│ 数据流向与系统架构图 │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────────────────────────┐ │
│ │ 用户/店员 │ │ 业务写入 │ │
│ └──────┬───────┘ └────────────────┬─────────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────────────┐ │
│ │ 前端应用 │──────────────────▶│ MySQL CDB 主库 │ │
│ │ (进销存系统) │ 业务写入 │ (腾讯云·事务库) │ │
│ └──────────────┘ └──────────┬───────────┘ │
│ │ │
│ ┌────────────────────────────────────────┼───────────────────┐ │
│ │ │ │ │
│ │ CDC同步(腾讯云DTS) │ │
│ │ │ │ │
│ ▼ ▼ │ │
│ ┌─────────────────┐ ┌──────────────────────┐ │ │
│ │ Elasticsearch │ │ ClickHouse │ │ │
│ │ TCO-ES │ │ TCHouse-C │ │ │
│ │ (商品搜索层) │ │ (OLAP分析库) │ │ │
│ │ │ │ │ │ │
│ │ ·商品模糊搜索 │ │ ·订单统计 │ │ │
│ │ ·拼音首字母搜索 │ │ ·库存预测 │ │ │
│ │ ·自动补全 │ │ ·财务月报 │ │ │
│ │ ·ELK日志 │ │ ·供应商客户分析 │ │ │
│ └────────┬────────┘ └──────────┬───────────┘ │ │
│ │ │ │ │
│ │ 查询路由 │ 查询路由 │ │
│ ▼ ▼ │ │
│ ┌─────────────────┐ ┌──────────────────────┐ │ │
│ │ 搜索服务 │ │ 分析报表服务 │ │ │
│ │ ·商品查询 │ │ ·AI智能体分析 │ │ │
│ │ ·订单检索 │ │ ·Dashboard │ │ │
│ └─────────────────┘ └──────────────────────┘ │ │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ 推送层(企业微信/钉钉/邮件) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
1.2 三层职责分工
| 层级 | 数据库 | 职责 | 特点 |
|---|---|---|---|
| 事务层 | MySQL CDB | 全部业务读写、事务处理 | ACID强一致、在线事务 |
| 分析层 | ClickHouse TCHouse-C | 订单统计、库存预测、财务月报 | 列式存储、聚合极快 |
| 搜索层 | Elasticsearch TCO-ES | 商品模糊搜索、订单快速检索 | 倒排索引、全文搜索 |
1.3 新架构优势
| 对比维度 | 旧架构(纯ClickHouse) | 新架构(MySQL+CH+ES) |
|---|---|---|
| 事务能力 | ❌ 弱 | ✅ MySQL完整支持 |
| 分析查询 | ✅ 快 | ✅ 更快(独立资源) |
| 商品搜索 | ⚠️ 一般 | ✅ 极强(ES专用) |
| 写入性能 | ⚠️ CDC压力 | ✅ 主库直写,CH/ES同步 |
| 运维复杂度 | 中 | 中(云托管降低压力) |
| 数据一致性 | ✅ 高 | ✅ 事务层保证 |
| 成本 | 中 | 中(略高,但可接受) |
二、CDC同步方案
2.1 腾讯云DTS配置
同步路径:MySQL CDB(源库)→ ClickHouse TCHouse-C(目标库)
MySQL CDB(源库)→ Elasticsearch TCO-ES(目标库)
同步类型:全量 + 增量(实时同步,秒级延迟)
2.2 MySQL → ClickHouse DTS配置步骤
控制台路径:腾讯云 → 数据传输服务DTS → 创建迁移/同步任务
Step 1:选择任务类型
└─ 数据同步(不是迁移,同步支持增量)
Step 2:配置源库
└─ 接入类型:云数据库
└─ 实例ID:选择MySQL CDB实例
└─ 账号:具有REPLICATION权限的账号
└─ 密码:***
Step 3:配置目标库
└─ 目标库类型:ClickHouse
└─ 接入类型:云数据库CH
└─ 实例ID:选择TCHouse-C实例
└─ 账号:默认root
└─ 密码:***
Step 4:选择同步对象
└─ 数据库:inventory_db, sales_db, finance_db
└─ 表:支持正则批量选择,如 orders*, products*, inventory*
└─ 列过滤:支持黑名单
Step 5:增量同步配置
└─ Binlog格式:Row(推荐)
└─ 保留时间:≥7天
└─ 冲突处理:Overwrite(以源库为准)
Step 6:启动任务
└─ 预检查:通过后启动
└─ 增量延迟监控:配置告警阈值(>300秒)
2.3 MySQL → Elasticsearch DTS配置
Step 1:创建数据同步任务
└─ 同步类型:MySQL → ES
Step 2:索引命名规则
└─ 建议格式:{库名}_{表名}_v1
└─ 示例:inventory_products_v1, sales_orders_v1
Step 3:字段映射
└─ MySQL VARCHAR(长文本) → ES text(分词)
└─ MySQL VARCHAR(短) → ES keyword(不分词)
└─ MySQL DECIMAL → ES scaled_float
└─ MySQL DATETIME → ES date
Step 4:文档ID
└─ 使用主键ID作为ES文档_id
└─ 支持嵌套文档
2.4 同步账号权限
-- MySQL源库创建DTS专用账号
CREATE USER 'dts_sync'@'%' IDENTIFIED BY 'DtsPass@2026';
GRANT SELECT, LOCK TABLES ON *.* TO 'dts_sync'@'%';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dts_sync'@'%';
GRANT ALL PRIVILEGES ON `inventory_db`.* TO 'dts_sync'@'%';
GRANT ALL PRIVILEGES ON `sales_db`.* TO 'dts_sync'@'%';
FLUSH PRIVILEGES;
2.5 同步监控与告警
| 监控指标 | 告警阈值 | 处理建议 |
|---|---|---|
| 增量同步延迟 | > 300秒 | 检查源库Binlog、目标库写入 |
| 同步速率 | < 100条/秒 | 检查网络、目标库负载 |
| 任务状态 | 中断 | 自动重试,配置断点续传 |
| 源库Binlog保留 | < 3天 | 立即延长保留时间 |
三、ClickHouse数据模型
3.1 数据分层设计(ODS / DWD / DWS)
MySQL主库 ──DTS同步──▶ ODS层(原始数据)──▶ DWD层(明细宽表)──▶ DWS层(汇总物化视图)
ODS层:原始数据层
-- 订单表ODS(保留CDC操作类型)
CREATE TABLE ods.orders
(
order_id String,
store_id String,
customer_id String,
salesperson_id String,
total_amount Decimal(12,2),
cost_amount Decimal(12,2),
discount_amount Decimal(12,2),
payment_method Enum8('cash'=1,'wechat'=2,'alipay'=3,'credit'=4,'mixed'=5),
order_status Enum8('pending'=1,'confirmed'=2,'shipped'=3,'completed'=4,'cancelled'=5),
order_date Date,
created_at DateTime,
updated_at DateTime,
_ts_ms DateTime64(3) DEFAULT now64(3),
_op Enum8('insert'=1,'update'=2,'delete'=3)
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
PARTITION BY toYYYYMM(order_date)
ORDER BY (store_id, order_id, _ts_ms)
TTL order_date + INTERVAL 2 YEAR;
-- 订单明细ODS
CREATE TABLE ods.order_detail
(
detail_id String,
order_id String,
product_id String,
product_code String,
product_name String,
category_id String,
warehouse_id String,
quantity UInt32,
unit_price Decimal(10,2),
cost_price Decimal(10,2),
line_amount Decimal(12,2),
created_at DateTime
)
ENGINE = ReplicatedReplacingMergeTree()
PARTITION BY toYYYYMM(created_at)
ORDER BY (order_id, detail_id)
TTL created_at + INTERVAL 2 YEAR;
-- 商品表ODS
CREATE TABLE ods.products
(
product_id String,
product_code String,
product_name String,
category_id String,
category_name String,
brand String,
spec String,
unit String,
price Decimal(10,2),
cost_price Decimal(10,2),
barcode String,
safe_stock UInt32,
_ts_ms DateTime64(3) DEFAULT now64(3),
_op Enum8('insert'=1,'update'=2,'delete'=3)
)
ENGINE = ReplicatedReplacingMergeMergeTree(_ts_ms)
ORDER BY (category_id, product_id);
-- 库存表ODS
CREATE TABLE ods.inventory_stock
(
warehouse_id String,
warehouse_name String,
product_id String,
stock_qty Int32,
available_qty Int32,
reserved_qty Int32,
update_time DateTime,
_ts_ms DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
PARTITION BY toYYYYMM(update_time)
ORDER BY (warehouse_id, product_id);
-- 客户表ODS
CREATE TABLE ods.customers
(
customer_id String,
customer_code String,
customer_name String,
customer_type Enum8('retail'=1,'wholesale'=2,'engineering'=3),
credit_limit Decimal(12,2),
contact_person String,
phone String,
address String,
_ts_ms DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
ORDER BY (customer_type, customer_id);
-- 供应商表ODS
CREATE TABLE ods.suppliers
(
supplier_id String,
supplier_code String,
supplier_name String,
contact_person String,
phone String,
address String,
_ts_ms DateTime64(3) DEFAULT now64(3)
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
ORDER BY (supplier_id);
DWD层:明细宽表层
-- 订单宽表(去删除标记,合并更新)
CREATE TABLE dwd.order_fact
(
order_id String,
store_id String,
store_name String,
customer_id String,
customer_name String,
customer_type String,
salesperson_id String,
salesperson_name String,
order_date Date,
order_hour UInt8,
weekday UInt8,
total_amount Decimal(12,2),
cost_amount Decimal(12,2),
profit_amount Decimal(12,2),
profit_margin Decimal(5,2),
discount_amount Decimal(12,2),
payment_method String,
order_status String,
is_credit UInt8,
created_at DateTime,
updated_at DateTime
)
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(order_date)
ORDER BY (store_id, order_date, order_id)
TTL order_date + INTERVAL 2 YEAR;
-- 库存宽表(最新余额)
CREATE TABLE dwd.inventory_fact
(
warehouse_id String,
warehouse_name String,
product_id String,
product_code String,
product_name String,
category_id String,
stock_qty Int32,
available_qty Int32,
reserved_qty Int32,
update_time DateTime
)
ENGINE = ReplacingMergeTree(update_time)
ORDER BY (warehouse_id, product_id);
DWS层:汇总物化视图
-- 日粒度订单汇总(加速日报)
CREATE MATERIALIZED VIEW dws.mv_orders_daily
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(order_date)
ORDER BY (store_id, category_id, order_date)
AS
SELECT
store_id,
category_id,
order_date,
count() AS order_count,
uniqExact(order_id) AS order_num,
sum(total_amount) AS daily_sales,
sum(cost_amount) AS daily_cost,
sum(profit_amount) AS daily_profit,
sum(profit_amount) / NULLIF(sum(total_amount), 0) * 100 AS profit_margin_pct,
uniqExact(customer_id) AS customer_count
FROM dwd.order_fact
WHERE order_status NOT IN ('cancelled')
GROUP BY store_id, category_id, order_date;
-- 月粒度订单汇总(加速月报)
CREATE MATERIALIZED VIEW dws.mv_orders_monthly
ENGINE = SummingMergeTree()
ORDER BY (store_id, category_id, order_year, order_month)
AS
SELECT
store_id,
category_id,
toYear(order_date) AS order_year,
toMonth(order_date) AS order_month,
count() AS order_count,
sum(total_amount) AS monthly_sales,
sum(profit_amount) AS monthly_profit
FROM dwd.order_fact
WHERE order_status NOT IN ('cancelled')
GROUP BY store_id, category_id, order_year, order_month;
-- 商品销售排行汇总(加速爆款分析)
CREATE MATERIALIZED VIEW dws.mv_product_sales_rank
ENGINE = AggregatingMergeTree()
ORDER BY (rank_date, category_id, product_id)
AS
SELECT
toDate(created_at) AS rank_date,
category_id,
product_id,
product_name,
sum(line_amount) AS total_sales,
sum(quantity) AS total_qty,
count() AS order_count
FROM ods.order_detail
GROUP BY rank_date, category_id, product_id, product_name;
-- 库存余额汇总
CREATE MATERIALIZED VIEW dws.mv_inventory_latest
ENGINE = ReplacingMergeTree(update_time)
ORDER BY (warehouse_id, product_id)
AS
SELECT
warehouse_id,
warehouse_name,
product_id,
product_code,
product_name,
category_id,
stock_qty,
available_qty,
update_time
FROM ods.inventory_stock;
-- 客户应收账款汇总
CREATE MATERIALIZED VIEW dws.mv_customer_ar
ENGINE = SummingMergeTree()
ORDER BY (customer_id, stat_month)
AS
SELECT
customer_id,
toStartOfMonth(created_at) AS stat_month,
sum(total_amount) AS total_sales,
sum(profit_amount) AS total_profit,
count() AS order_count
FROM dwd.order_fact
WHERE is_credit = 1 AND order_status NOT IN ('cancelled')
GROUP BY customer_id, stat_month;
四、Elasticsearch搜索方案
4.1 商品索引设计
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"refresh_interval": "5s",
"analysis": {
"analyzer": {
"ik_max_word": {
"type": "ik_max_word"
},
"ik_smart": {
"type": "ik_smart"
},
"pinyin_analyzer": {
"type": "custom",
"tokenizer": "pinyin_tokenizer",
"filter": ["lowercase"]
}
}
}
},
"mappings": {
"properties": {
"product_id": { "type": "keyword" },
"product_code": { "type": "keyword" },
"product_name": {
"type": "text",
"analyzer": "ik_max_word",
"fields": {
"keyword": { "type": "keyword" },
"pinyin": { "type": "text", "analyzer": "pinyin_analyzer" }
}
},
"category_id": { "type": "keyword" },
"category_name": { "type": "keyword" },
"brand": { "type": "keyword" },
"spec": {
"type": "text",
"analyzer": "ik_smart"
},
"unit": { "type": "keyword" },
"price": { "type": "scaled_float", "scaling_factor": 100 },
"cost_price": { "type": "scaled_float", "scaling_factor": 100 },
"barcode": { "type": "keyword" },
"safe_stock": { "type": "integer" },
"stock_qty": { "type": "integer" },
"store_id": { "type": "keyword" },
"suggest": {
"type": "completion",
"analyzer": "simple",
"preserve_separators": true,
"max_input_length": 50
},
"updated_at": { "type": "date" }
}
}
}
4.2 核心搜索查询
// 1. 商品模糊搜索(商品名+规格+拼音)
GET /products_v1/_search
{
"query": {
"bool": {
"must": [
{
"multi_match": {
"query": "六角螺丝刀",
"fields": ["product_name^3", "product_name.pinyin^2", "spec"],
"type": "best_fields",
"fuzziness": "AUTO"
}
}
],
"filter": [
{ "term": { "store_id": "STORE_001" }},
{ "range": { "stock_qty": { "gt": 0 }}}
]
}
},
"highlight": {
"fields": {
"product_name": {
"pre_tags": ["<em>"],
"post_tags": ["</em>"]
}
}
},
"size": 20
}
// 2. 自动补全(用于搜索框)
GET /products_v1/_search
{
"suggest": {
"product_suggest": {
"prefix": "活扳",
"completion": {
"field": "suggest",
"size": 5,
"skip_duplicates": true,
"fuzzy": { "fuzziness": "AUTO" }
}
}
}
}
// 3. 按分类筛选+价格排序
GET /products_v1/_search
{
"query": {
"bool": {
"must": [
{ "term": { "category_id": "CATE_TOOLS" }}
],
"filter": [
{ "range": { "price": { "gte": 10, "lte": 100 }}}
]
}
},
"sort": [
{ "_score": "desc" },
{ "sales_monthly": "desc" },
{ "price": "asc" }
],
"size": 50
}
五、Agent-01:数据采集与同步
职责:MySQL主库 → ClickHouse + ES 的CDC同步架构设计与配置
5.1 同步架构
MySQL CDB(主库)
│
├── DTS同步链路1 ──▶ ClickHouse TCHouse-C(分析库)
│ ├── ods.*(原始层)
│ ├── dwd.*(宽表层)
│ └── dws.mv_*(物化视图)
│
└── DTS同步链路2 ──▶ Elasticsearch TCO-ES(搜索库)
├── products_v1(商品索引)
└── orders_v1(订单索引,可选)
5.2 同步表配置
| MySQL源表 | ClickHouse目标 | ES目标 | 同步模式 |
|---|---|---|---|
| orders | ods.orders | orders_v1 | 增量 |
| order_detail | ods.order_detail | - | 增量 |
| products | ods.products | products_v1 | 全量+增量 |
| inventory_stock | ods.inventory_stock | - | 增量 |
| customers | ods.customers | - | 全量+增量 |
| suppliers | ods.suppliers | - | 全量+增量 |
5.3 调度策略
| 同步类型 | 频率 | 说明 |
|---|---|---|
| 全量同步 | 每月1日 02:00 | 重建索引,保证数据一致 |
| 增量同步 | 实时(秒级) | DTS持续监听Binlog |
| 数据校验 | 每日 06:00 | 抽查表行数,确保一致 |
六、Agent-02:每日经营分析
职责:基于ClickHouse数据湖,生成每日经营分析日报
6.1 核心KPI计算(ClickHouse SQL)
-- 每日核心KPI(查询物化视图,秒级响应)
SELECT
order_date,
sum(order_num) AS order_count,
sum(daily_sales) AS total_sales,
sum(daily_profit) AS total_profit,
sum(daily_profit) / NULLIF(sum(daily_sales), 0) * 100 AS profit_margin_pct,
sum(customer_count) AS customer_count
FROM dws.mv_orders_daily
WHERE order_date = today() - 1
GROUP BY order_date;
-- 环比计算
WITH today AS (
SELECT sum(daily_sales) AS sales FROM dws.mv_orders_daily WHERE order_date = today() - 1
),
yesterday AS (
SELECT sum(daily_sales) AS sales FROM dws.mv_orders_daily WHERE order_date = today() - 2
)
SELECT
(t.sales - y.sales) / NULLIF(y.sales, 0) * 100 AS growth_pct
FROM today t, yesterday y;
6.2 爆款分析SQL
-- 近30天爆款商品TOP10(含库存可售天数)
SELECT
product_id,
product_name,
sum(total_sales) AS total_sales,
sum(total_qty) AS total_qty,
argMax(stock_qty, update_time) AS current_stock,
argMax(stock_qty, update_time) / NULLIF(sum(total_qty) / 30, 0) AS stock_days_remaining
FROM dws.mv_product_sales_rank d
LEFT JOIN dws.mv_inventory_latest i ON d.product_id = i.product_id
WHERE rank_date BETWEEN today() - 30 AND today() - 1
GROUP BY product_id, product_name
ORDER BY total_sales DESC
LIMIT 10;
6.3 库存预警SQL
-- 库存预警:缺货 + 临期 + 积压
SELECT
i.product_id,
i.product_name,
i.stock_qty,
p.safe_stock,
CASE
WHEN i.stock_qty = 0 THEN '🔴 缺货'
WHEN i.stock_qty < p.safe_stock THEN '🟠 库存不足'
WHEN i.stock_qty > p.safe_stock * 3 THEN '🟡 积压'
ELSE '🟢 正常'
END AS alert_status
FROM dws.mv_inventory_latest i
JOIN ods.products p ON i.product_id = p.product_id
WHERE i.warehouse_id = 'MAIN_WH'
AND (
i.stock_qty = 0
OR i.stock_qty < p.safe_stock
OR i.stock_qty > p.safe_stock * 3
)
ORDER BY alert_status, i.stock_qty ASC;
6.4 员工绩效SQL
-- 员工当日绩效排名
SELECT
e.employee_name,
e.department,
count() AS order_count,
sum(o.total_amount) AS total_sales,
sum(o.profit_amount) AS total_profit,
sum(o.profit_amount) / NULLIF(sum(o.total_amount), 0) * 100 AS profit_margin,
uniqExact(o.customer_id) AS customer_count
FROM dwd.order_fact o
JOIN ods.employees e ON o.salesperson_id = e.employee_id
WHERE o.order_date = today() - 1
AND o.order_status NOT IN ('cancelled')
GROUP BY e.employee_name, e.department
ORDER BY total_sales DESC
LIMIT 10;
七、Agent-03:库存预测与补货
职责:基于历史销量预测库存需求,生成智能补货建议
7.1 安全库存计算(ClickHouse SQL)
WITH daily_sales AS (
SELECT
product_id,
toDate(created_at) AS sale_date,
sum(quantity) AS daily_qty
FROM ods.order_detail
WHERE created_at >= today() - 90
GROUP BY product_id, toDate(created_at)
),
sales_stats AS (
SELECT
product_id,
avg(daily_qty) AS avg_daily_sales,
stddevPop(daily_qty) AS demand_stddev
FROM daily_sales
GROUP BY product_id
),
lead_times AS (
SELECT product_id, avg(lead_time_days) AS avg_lead_time
FROM ods.supplier_lead_times
GROUP BY product_id
),
safe_stock_calc AS (
SELECT
s.product_id,
p.product_name,
s.avg_daily_sales,
s.demand_stddev,
l.avg_lead_time,
-- 安全库存 97.5%服务水平
ROUND(1.96 * s.demand_stddev * sqrt(l.avg_lead_time)) AS safe_stock,
-- 再订货点
ROUND(s.avg_daily_sales * l.avg_lead_time + 1.96 * s.demand_stddev * sqrt(l.avg_lead_time)) AS reorder_point
FROM sales_stats s
LEFT JOIN lead_times l ON s.product_id = l.product_id
JOIN ods.products p ON s.product_id = p.product_id
)
SELECT
*,
GREATEST(0, reorder_point + safe_stock - current_stock) AS suggested_order_qty
FROM safe_stock_calc
ORDER BY suggested_order_qty DESC;
7.2 缺货预测SQL
-- 7天/14天缺货风险预测
WITH future_demand AS (
SELECT
product_id,
sum(quantity) / 7 AS avg_daily_7d,
sum(quantity) AS demand_7d
FROM ods.order_detail
WHERE created_at >= today() - 7
GROUP BY product_id
)
SELECT
p.product_name,
i.stock_qty,
f.avg_daily_7d,
ROUND(i.stock_qty / NULLIF(f.avg_daily_7d, 0)) AS stock_days_remaining,
CASE
WHEN i.stock_qty <= f.demand_7d THEN '🔴 7天内缺货'
WHEN i.stock_qty <= f.demand_7d * 2 THEN '🟠 14天内缺货'
ELSE '🟢 库存充足'
END AS stockout_risk
FROM ods.products p
LEFT JOIN dws.mv_inventory_latest i ON p.product_id = i.product_id
LEFT JOIN future_demand f ON p.product_id = f.product_id
WHERE i.stock_qty <= f.demand_7d * 2 OR i.stock_qty = 0
ORDER BY i.stock_qty ASC;
7.3 滞销识别SQL
SELECT
p.product_name,
i.stock_qty,
today() - toDate(last_out_date) AS days_since_last_sale,
today() - toDate(last_in_date) AS days_in_stock,
CASE
WHEN days_since_last_sale > 180 THEN '🆕 死库存'
WHEN days_in_stock > 365 THEN '⚠️ 超龄库存'
WHEN days_since_last_sale > 90 THEN '🔴 长期滞销'
WHEN days_in_stock > 90 THEN '🟡 积压'
ELSE '🟢 正常'
END AS slow_moving_type
FROM dws.mv_inventory_latest i
JOIN ods.products p ON i.product_id = p.product_id
WHERE i.stock_qty > 0
AND (days_since_last_sale > 90 OR days_in_stock > 180)
ORDER BY days_since_last_sale DESC
LIMIT 50;
八、Agent-04:供应商与客户分析
职责:供应商绩效评估 + 客户分层管理与赊销分析
8.1 供应商对账单SQL
SELECT
sp.supplier_id,
si.supplier_name,
sum(purchase_amount) AS total_purchase,
sum(return_amount) AS total_return,
sum(purchase_amount) - sum(return_amount) AS net_purchase,
sum(CASE WHEN payment_status = 'unpaid' THEN purchase_amount ELSE 0 END) AS accounts_payable,
max(due_date) AS latest_due_date,
today() - max(due_date) AS aging_days,
CASE
WHEN aging_days <= 30 THEN '🟢 正常'
WHEN aging_days <= 60 THEN '🟡 预警'
WHEN aging_days <= 90 THEN '🟠 警告'
ELSE '🔴 危险'
END AS aging_level
FROM ods.supplier_purchases sp
JOIN ods.suppliers si ON sp.supplier_id = si.supplier_id
WHERE purchase_date BETWEEN '{start_date}' AND '{end_date}'
GROUP BY sp.supplier_id, si.supplier_name
ORDER BY net_purchase DESC;
8.2 供应商绩效评分SQL
WITH delivery_score AS (
SELECT
supplier_id,
ROUND(sum(is_on_time) * 100.0 / count(), 2) AS on_time_rate
FROM ods.supplier_delivery
WHERE actual_delivery_date BETWEEN '{start_date}' AND '{end_date}'
GROUP BY supplier_id
),
quality_score AS (
SELECT
supplier_id,
ROUND((1 - sum(return_quantity) / sum(quantity)) * 100, 2) AS quality_rate
FROM ods.supplier_quality
WHERE return_date BETWEEN '{start_date}' AND '{end_date}'
GROUP BY supplier_id
)
SELECT
si.supplier_name,
ds.on_time_rate,
qs.quality_rate,
ROUND(ds.on_time_rate * 0.3 + qs.quality_rate * 0.3 + coop_score * 0.4, 2) AS total_score,
CASE WHEN total_score >= 90 THEN 'A' WHEN total_score >= 75 THEN 'B' WHEN total_score >= 60 THEN 'C' ELSE 'D' END AS grade
FROM ods.suppliers si
LEFT JOIN delivery_score ds ON si.supplier_id = ds.supplier_id
LEFT JOIN quality_score qs ON si.supplier_id = qs.supplier_id
ORDER BY total_score DESC;
8.3 客户ABC分类SQL
WITH yearly_sales AS (
SELECT
customer_id,
sum(total_amount) AS annual_sales
FROM dwd.order_fact
WHERE order_date BETWEEN '{year}-01-01' AND '{year}-12-31'
AND order_status NOT IN ('cancelled')
GROUP BY customer_id
),
sales_with_cumsum AS (
SELECT
customer_id,
annual_sales,
sum(annual_sales) OVER() AS total_sales,
sum(annual_sales) OVER(ORDER BY annual_sales DESC) AS cumsum_sales
FROM yearly_sales
)
SELECT
ci.customer_name,
s.annual_sales,
ROUND(s.annual_sales / s.total_sales * 100, 2) AS sales_pct,
ROUND(s.cumsum_sales / s.total_sales * 100, 2) AS cumsum_pct,
CASE WHEN cumsum_pct <= 80 THEN 'A' WHEN cumsum_pct <= 95 THEN 'B' ELSE 'C' END AS abc_class
FROM sales_with_cumsum s
JOIN ods.customers ci ON s.customer_id = ci.customer_id
ORDER BY s.annual_sales DESC;
8.4 客户流失预警SQL
SELECT
ci.customer_name,
ci.customer_type,
max(o.order_date) AS last_transaction_date,
today() - max(o.order_date) AS days_since_transaction,
CASE
WHEN max(o.order_date) IS NULL THEN '🔴 严重预警'
WHEN today() - max(o.order_date) > 90 THEN '🔴 严重预警'
WHEN today() - max(o.order_date) > 60 THEN '🟠 中度预警'
WHEN today() - max(o.order_date) > 30 THEN '🟡 轻度预警'
ELSE '🟢 正常'
END AS warning_level
FROM ods.customers ci
LEFT JOIN dwd.order_fact o ON ci.customer_id = o.customer_id
GROUP BY ci.customer_id, ci.customer_name, ci.customer_type
HAVING days_since_transaction > 30 OR days_since_transaction IS NULL
ORDER BY days_since_transaction DESC NULLS FIRST;
8.5 员工提成计算SQL
SELECT
e.employee_name,
e.department,
sum(o.total_amount) AS total_sales,
sum(CASE WHEN o.payment_method != 'credit' THEN o.total_amount ELSE 0 END) AS cash_sales,
sum(o.profit_amount) AS total_profit,
ROUND(sum(o.profit_amount) / NULLIF(sum(o.total_amount), 0) * 100, 2) AS profit_margin,
-- 综合提成 = 销售额 × 回款率系数 × 毛利率系数 × 基础提成率
ROUND(
sum(o.total_amount)
* (sum(CASE WHEN o.payment_method != 'credit' THEN o.total_amount ELSE 0 END) / sum(o.total_amount))
* (sum(o.profit_amount) / sum(o.total_amount))
* 0.15,
2) AS commission_amount
FROM dwd.order_fact o
JOIN ods.employees e ON o.salesperson_id = e.employee_id
WHERE o.order_date BETWEEN '{start_date}' AND '{end_date}'
AND o.order_status NOT IN ('cancelled')
GROUP BY e.employee_name, e.department
ORDER BY commission_amount DESC;
九、Agent-05:财务与月度分析
职责:月度财务分析、经营健康度诊断
9.1 月度收入分析SQL
WITH monthly_data AS (
SELECT
toStartOfMonth(order_date) AS month,
sum(total_amount) AS gross_sales,
sum(profit_amount) AS gross_profit,
sum(profit_amount) / NULLIF(sum(total_amount), 0) * 100 AS gross_margin_pct
FROM dwd.order_fact
WHERE order_date >= toStartOfMonth(addMonths(today(), -12))
AND order_status NOT IN ('cancelled', 'returned')
GROUP BY toStartOfMonth(order_date)
),
expense_data AS (
SELECT
toStartOfMonth(expense_date) AS month,
sum(amount) AS total_expense
FROM ods.expense_record
WHERE expense_date >= toStartOfMonth(addMonths(today(), -12))
GROUP BY toStartOfMonth(expense_date)
)
SELECT
m.month,
m.gross_sales,
m.gross_profit,
m.gross_margin_pct,
(m.gross_profit - e.total_expense) AS net_profit,
(m.gross_profit - e.total_expense) / NULLIF(m.gross_sales, 0) * 100 AS net_margin_pct,
e.total_expense,
e.total_expense / NULLIF(m.gross_sales, 0) * 100 AS expense_ratio_pct
FROM monthly_data m
LEFT JOIN expense_data e ON m.month = e.month
ORDER BY m.month DESC;
9.2 库存周转分析SQL
WITH sales_data AS (
SELECT
toStartOfMonth(created_at) AS month,
sum(cost_amount) AS cogs
FROM ods.order_detail
WHERE created_at >= toStartOfMonth(addMonths(today(), -12))
GROUP BY toStartOfMonth(created_at)
),
avg_inventory AS (
SELECT
toStartOfMonth(update_time) AS month,
avg(stock_qty * cost_price) AS avg_inventory_value
FROM dws.mv_inventory_latest i
JOIN ods.products p ON i.product_id = p.product_id
WHERE update_time >= toStartOfMonth(addMonths(today(), -12))
GROUP BY toStartOfMonth(update_time)
)
SELECT
s.month,
a.avg_inventory_value,
s.cogs,
a.avg_inventory_value / NULLIF(s.cogs / 30, 0) AS inventory_turnover_days,
s.cogs / NULLIF(a.avg_inventory_value, 0) AS inventory_turnover_rate
FROM sales_data s
LEFT JOIN avg_inventory a ON s.month = a.month
ORDER BY s.month DESC;
9.3 应收账款账龄SQL
SELECT
c.customer_code,
c.customer_name,
sum(CASE WHEN aging_days <= 0 THEN outstanding_amount ELSE 0 END) AS not_due_amount,
sum(CASE WHEN aging_days BETWEEN 1 AND 30 THEN outstanding_amount ELSE 0 END) AS overdue_1_30,
sum(CASE WHEN aging_days BETWEEN 31 AND 60 THEN outstanding_amount ELSE 0 END) AS overdue_31_60,
sum(CASE WHEN aging_days BETWEEN 61 AND 90 THEN outstanding_amount ELSE 0 END) AS overdue_61_90,
sum(CASE WHEN aging_days > 90 THEN outstanding_amount ELSE 0 END) AS overdue_90_plus,
sum(outstanding_amount) AS total_outstanding
FROM (
SELECT
ar.customer_id,
ar.invoice_amount - ar.paid_amount AS outstanding_amount,
toDate(ar.due_date) - today() AS aging_days
FROM ods.ar_record ar
WHERE ar.invoice_amount > ar.paid_amount
) t
JOIN ods.customers c ON t.customer_id = c.customer_id
GROUP BY c.customer_code, c.customer_name
ORDER BY total_outstanding DESC
LIMIT 50;
9.4 经营健康度仪表盘
| 维度 | 指标 | 健康区间 | 计算方式 |
|---|---|---|---|
| 盈利能力 | 毛利率 | > 25% | profit_amount / total_amount |
| 盈利能力 | 净利率 | > 10% | net_profit / total_sales |
| 运营效率 | 库存周转天数 | < 60天 | avg_inventory / cogs × 30 |
| 运营效率 | 应收周转天数 | < 45天 | avg_ar / sales × 30 |
| 财务安全 | 流动比率 | > 1.5 | current_assets / current_liabilities |
| 费用控制 | 费用率 | < 15% | total_expense / total_sales |
| 成长性 | 收入增长率 | > 10% | (本月 - 上月) / 上月 × 100 |
十、Agent-06:预警监控与报告推送
职责:预警规则引擎 + 多渠道推送 + 多Agent协作
10.1 系统架构
ClickHouse数据湖 ──▶ 规则引擎 ──▶ 预警分级处理器(🔴🟠🟡)
│
调度中心 ──▶ 推送渠道适配器(企业微信/钉钉/邮件)
│
订阅管理 ──▶ 报告渲染引擎
10.2 预警规则一览
库存类规则:
| 规则编码 | 规则名称 | 条件 | 级别 |
|---|---|---|---|
| INV_001 | 库存为零预警 | = 0 | 🔴紧急 |
| INV_002 | 库存低于安全库存 | < 安全库存 | 🟠重要 |
| INV_003 | 库存周转率异常 | > 90天 | 🟡关注 |
| INV_004 | 超储预警 | > 3倍安全库存 | 🟡关注 |
销售类规则:
| 规则编码 | 规则名称 | 条件 | 级别 |
|---|---|---|---|
| SAL_001 | 日销售额为零 | = 0 | 🔴紧急 |
| SAL_002 | 销售额同比下降 | < -20% | 🟠重要 |
| SAL_003 | 热销商品缺货 | 库存 < 3天销量 | 🔴紧急 |
| SAL_004 | 毛利率异常 | < 10% | 🟠重要 |
财务类规则:
| 规则编码 | 规则名称 | 条件 | 级别 |
|---|---|---|---|
| FIN_001 | 应收账款逾期 | > 30天 | 🔴紧急 |
| FIN_002 | 现金流为负 | < 0 | 🔴紧急 |
| FIN_003 | 毛利率低于警戒线 | < 10% | 🟠重要 |
| FIN_004 | 费用超支 | > 预算110% | 🟠重要 |
客户类规则:
| 规则编码 | 规则名称 | 条件 | 级别 |
|---|---|---|---|
| CUS_001 | 客户信用额度超限 | > 100% | 🔴紧急 |
| CUS_002 | 客户流失预警 | > 90天无交易 | 🟡关注 |
| CUS_003 | 大客户风险预警 | 欠款 > 50万 | 🔴紧急 |
10.3 三级预警机制
| 级别 | 响应时效 | 通知范围 | 升级策略 |
|---|---|---|---|
| 🔴紧急 | 15分钟内 | 店长+区域经理+总部 | 30分钟未处理升级 |
| 🟠重要 | 1小时内 | 店长+区域经理 | 2小时未处理升级 |
| 🟡关注 | 当日内 | 相关负责人 | 无升级 |
10.4 调度策略
| 报告类型 | 执行时间 | 数据范围 | 推送渠道 |
|---|---|---|---|
| 日报 | T+1 08:00 | 前一日数据 | 企业微信/邮件 |
| 周报 | 周一 09:00 | 上周数据 | 企业微信/钉钉 |
| 月报 | 月末 18:00 | 当月数据 | 企业微信/钉钉/邮件 |
10.5 推送渠道适配器
# 企业微信推送
class WeChatWorkAdapter:
async def send(self, message, webhook_url):
payload = {
"msgtype": "markdown",
"markdown": {"content": message['content']}
}
async with aiohttp.ClientSession() as session:
async with session.post(webhook_url, json=payload) as resp:
return resp.status == 200
# 钉钉推送
class DingTalkAdapter:
async def send(self, message, webhook_url, secret):
timestamp = str(int(time.time() * 1000))
string_to_sign = f'{timestamp}\n{secret}'
sign = urllib.parse.quote_plus(base64.b64encode(
hmac.new(secret.encode(), string_to_sign.encode(),
digestmod=hashlib.sha256).digest()
).decode())
url = f"{webhook_url}×tamp={timestamp}&sign={sign}"
async with aiohttp.ClientSession() as session:
async with session.post(url, json={"msgtype": "markdown",
"markdown": {"text": message['content']}}) as resp:
return resp.status == 200
十一、部署方案与成本估算
11.1 腾讯云资源配置
| 组件 | 产品 | 推荐配置 | 月费用(参考) |
|---|---|---|---|
| 主库 | MySQL CDB | 4核8G 200GB | ~500元 |
| 只读实例 | MySQL CDB RO | 2核4G 100GB | ~200元 |
| 分析库 | TCHouse-C | 4核16G 500GB SSD | ~1500元 |
| 搜索服务 | TCO-ES | 2核4G 200GB | ~800元 |
| 数据同步 | DTS | 按量付费 | ~300元 |
| 合计 | - | - | ~3300元/月 |
11.2 集群配置建议(TCHouse-C)
| 数据量级 | 分片数 | 副本数 | 节点规格 | 存储 |
|---|---|---|---|---|
| <100GB | 1 | 2 | 4核16G | 500GB SSD |
| 100GB-1TB | 2 | 2 | 8核32G | 1TB SSD |
11.3 实施路线图
| 阶段 | 时间 | 任务 |
|---|---|---|
| Phase 1 | 第1周 | 腾讯云CDB创建,DTS同步配置,TCHouse-C集群创建 |
| Phase 2 | 第2周 | ClickHouse DDL部署,物化视图创建,数据同步验证 |
| Phase 3 | 第3周 | ES索引设计,商品搜索功能开发 |
| Phase 4 | 第4周 | 6个Agent功能开发,报表模板配置 |
| Phase 5 | 第5周 | 企业微信/钉钉推送集成,全流程测试 |
| Phase 6 | 第6周 | 上线试运行,数据一致性校验,告警规则调优 |
十二、总结
12.1 新架构核心价值
| 维度 | 改进说明 |
|---|---|
| 数据一致性 | MySQL主库保证事务一致性,CDC同步保证分析库最终一致 |
| 查询性能 | ClickHouse独立资源,复杂聚合查询秒级响应 |
| 搜索能力 | ES专业搜索,商品模糊匹配、拼音搜索、补全推荐 |
| 架构解耦 | 三层独立扩缩容,故障隔离 |
| 运维简化 | 腾讯云托管服务,DTS一键同步 |
12.2 6个Agent职责回顾
| Agent | 核心职责 | 数据来源 |
|---|---|---|
| Agent-01 | CDC数据同步 | MySQL → ClickHouse/ES |
| Agent-02 | 每日经营分析 | ClickHouse DWS物化视图 |
| Agent-03 | 库存预测补货 | ClickHouse + 历史销量模型 |
| Agent-04 | 供应商客户分析 | ClickHouse ODS层 |
| Agent-05 | 财务月报分析 | ClickHouse DWS层 |
| Agent-06 | 预警监控推送 | ClickHouse规则引擎 + 企业微信 |
12.3 与旧架构对比
| 对比项 | 旧架构 | 新架构 |
|---|---|---|
| 搜索能力 | 一般 | 极强(ES专用) |
| 事务支持 | 弱 | 完整(MySQL主库) |
| 数据同步 | 复杂CDC | DTS托管(同云) |
| 运维负担 | 高 | 中(云托管降低) |
| 成本 | 中 | 中(略增) |
文档版本:v2.0
更新日期:2026-04-30
主架构:MySQL + ClickHouse TCHouse-C + Elasticsearch TCO-ES + 腾讯云DTS