Agent-01 数据采集与同步
版本: v1.0
概述: 对接进销存MySQL/SQL Server,CDC同步到ClickHouse数据湖
1. Agent职责
| 维度 | 说明 |
|---|---|
| 名称 | data-sync-agent |
| 版本 | v1.0 |
| 核心职责 | 对接进销存系统(MySQL/SQL Server),CDC同步到ClickHouse数据湖 |
核心能力
| 能力 | 说明 |
|---|---|
| MySQL CDC | Debezium Binlog 实时同步 |
| SQL Server CDC | Change Tracking 增量同步 |
| HTTP API | 第三方数据拉取 |
| 断点续传 | 自动Checkpoint,窗口期内不丢数 |
| 数据清洗 | 字段映射、类型转换、空值处理 |
| 物化视图 | ClickHouse SummingMergeTree 自动聚合 |
SLA指标
- 实时同步延迟 < 1秒
- 增量同步延迟 < 5分钟
- 可用性 99.9%
- 断点自动续传
2. 数据流架构
MySQL (Binlog) ──┐
SQL Server (CT) ─┼──▶ Kafka ──▶ Data Cleaner ──▶ ClickHouse ──▶ 数据湖
HTTP API ────────┘ └─▶ Checkpoint Manager (etcd/Redis)
ODS层 → DWD层 → DWS层
| 层级 | 说明 | 示例表 |
|---|---|---|
| ODS | 原始数据层,保留CDC操作类型 | ods_products, ods_orders |
| DWD | 明细宽表层,去除删除标记 | dwd_inventory_stock_detail |
| DWS | 汇总层,物化视图自动聚合 | mv_dws_warehouse_stock |
3. ClickHouse DDL(核心表)
-- 产品表 ODS
CREATE TABLE inventory_dw.ods_products
(
id UInt64,
product_code String,
product_name String,
price Decimal(10,2),
stock_qty Int32,
warehouse_id UInt32,
create_time DateTime,
update_time DateTime,
_ts_ms DateTime64(3) DEFAULT now64(3),
_op Enum8('insert' = 1, 'update' = 2, 'delete' = 3),
_row_key String DEFAULT concat(toString(id), '_', toString(_ts_ms))
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
PARTITION BY toYYYYMM(create_time)
ORDER BY (id, _ts_ms)
TTL create_time + INTERVAL 90 DAY;
-- 库存表 ODS(含仓库维度)
CREATE TABLE inventory_dw.ods_inventory_stock
(
id UInt64,
warehouse_id UInt32,
warehouse_code String,
warehouse_name String,
product_id UInt64,
product_code String,
stock_qty Int32,
available_qty Int32,
reserved_qty Int32,
update_time DateTime,
_ts_ms DateTime64(3) DEFAULT now64(3),
_op Enum8('insert' = 1, 'update' = 2, 'delete' = 3)
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
PARTITION BY toYYYYMM(update_time)
ORDER BY (warehouse_id, product_id, _ts_ms);
-- DWS层物化视图示例
CREATE MATERIALIZED VIEW inventory_dw.mv_dws_warehouse_stock
ENGINE = ReplicatedSummingMergeTree()
PARTITION BY toYYYYMM(stat_date)
ORDER BY (stat_date, warehouse_id, product_id)
AS
SELECT
toDate(update_time) as stat_date,
warehouse_id,
warehouse_code,
product_id,
product_code,
product_name,
sum(stock_qty) as total_stock,
sum(available_qty) as available_stock,
max(_ts_ms) as last_update
FROM ods_inventory_stock
WHERE _op != 'delete'
GROUP BY stat_date, warehouse_id, warehouse_code,
product_id, product_code, product_name;
4. 配置要点
MySQL CDC配置
mysql_cdc:
enabled: true
host: "192.168.1.100"
port: 3306
database: "inventory_db"
cdc:
server_id: 85744
snapshot_mode: "when_needed"
tables:
- schema: "inventory_db"
table: "products"
primary_key: ["id"]
ClickHouse写入配置
clickhouse:
writer:
batch_size: 5000
flush_interval_ms: 5000
compression: "lz4"
tables:
- source: "products"
target: "ods_products"
sync_mode: "cdc"
engine: "MergeTree"
order_by: ["id", "ts_ms"]
5. 伪代码核心逻辑
class DataSyncAgent:
def run_source_sync(self, source_name, connector, checkpoint):
while self.running:
# 1. 采集数据
events = connector.fetch_events(checkpoint)
# 2. 数据清洗
cleaned_records = []
for event in events:
record = self.data_cleaner.clean(event)
if record:
cleaned_records.append(record)
# 3. ClickHouse写入
write_result = self.clickhouse_writer.write_batch(cleaned_records)
# 4. 更新Checkpoint
new_checkpoint = connector.get_current_position()
self.checkpoint_manager.save_checkpoint(source_name, new_checkpoint)
6. 文件清单
ai_agent_system/data_sync_agent/
├── SPEC.md # 主设计规范文档
├── config/
│ ├── source.yml # 数据源配置
│ ├── target.yml # ClickHouse目标配置
│ └── sync_policy.yml # 同步策略配置
└── sql/
└── ddl.sql # ClickHouse DDL