x

Agent-01 数据采集与同步

版本: v1.0
概述: 对接进销存MySQL/SQL Server,CDC同步到ClickHouse数据湖

1. Agent职责

维度 说明
名称 data-sync-agent
版本 v1.0
核心职责 对接进销存系统(MySQL/SQL Server),CDC同步到ClickHouse数据湖

核心能力

能力 说明
MySQL CDC Debezium Binlog 实时同步
SQL Server CDC Change Tracking 增量同步
HTTP API 第三方数据拉取
断点续传 自动Checkpoint,窗口期内不丢数
数据清洗 字段映射、类型转换、空值处理
物化视图 ClickHouse SummingMergeTree 自动聚合

SLA指标

  • 实时同步延迟 < 1秒
  • 增量同步延迟 < 5分钟
  • 可用性 99.9%
  • 断点自动续传

2. 数据流架构

MySQL (Binlog) ──┐
SQL Server (CT) ─┼──▶ Kafka ──▶ Data Cleaner ──▶ ClickHouse ──▶ 数据湖
HTTP API ────────┘     └─▶ Checkpoint Manager (etcd/Redis)

ODS层 → DWD层 → DWS层

层级 说明 示例表
ODS 原始数据层,保留CDC操作类型 ods_products, ods_orders
DWD 明细宽表层,去除删除标记 dwd_inventory_stock_detail
DWS 汇总层,物化视图自动聚合 mv_dws_warehouse_stock

3. ClickHouse DDL(核心表)

-- 产品表 ODS
CREATE TABLE inventory_dw.ods_products
(
    id UInt64,
    product_code String,
    product_name String,
    price Decimal(10,2),
    stock_qty Int32,
    warehouse_id UInt32,
    create_time DateTime,
    update_time DateTime,
    _ts_ms DateTime64(3) DEFAULT now64(3),
    _op Enum8('insert' = 1, 'update' = 2, 'delete' = 3),
    _row_key String DEFAULT concat(toString(id), '_', toString(_ts_ms))
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
PARTITION BY toYYYYMM(create_time)
ORDER BY (id, _ts_ms)
TTL create_time + INTERVAL 90 DAY;

-- 库存表 ODS(含仓库维度)
CREATE TABLE inventory_dw.ods_inventory_stock
(
    id UInt64,
    warehouse_id UInt32,
    warehouse_code String,
    warehouse_name String,
    product_id UInt64,
    product_code String,
    stock_qty Int32,
    available_qty Int32,
    reserved_qty Int32,
    update_time DateTime,
    _ts_ms DateTime64(3) DEFAULT now64(3),
    _op Enum8('insert' = 1, 'update' = 2, 'delete' = 3)
)
ENGINE = ReplicatedReplacingMergeTree(_ts_ms)
PARTITION BY toYYYYMM(update_time)
ORDER BY (warehouse_id, product_id, _ts_ms);

-- DWS层物化视图示例
CREATE MATERIALIZED VIEW inventory_dw.mv_dws_warehouse_stock
ENGINE = ReplicatedSummingMergeTree()
PARTITION BY toYYYYMM(stat_date)
ORDER BY (stat_date, warehouse_id, product_id)
AS
SELECT 
    toDate(update_time) as stat_date,
    warehouse_id,
    warehouse_code,
    product_id,
    product_code,
    product_name,
    sum(stock_qty) as total_stock,
    sum(available_qty) as available_stock,
    max(_ts_ms) as last_update
FROM ods_inventory_stock
WHERE _op != 'delete'
GROUP BY stat_date, warehouse_id, warehouse_code, 
         product_id, product_code, product_name;

4. 配置要点

MySQL CDC配置

mysql_cdc:
  enabled: true
  host: "192.168.1.100"
  port: 3306
  database: "inventory_db"
  cdc:
    server_id: 85744
    snapshot_mode: "when_needed"
    tables:
      - schema: "inventory_db"
        table: "products"
        primary_key: ["id"]

ClickHouse写入配置

clickhouse:
  writer:
    batch_size: 5000
    flush_interval_ms: 5000
    compression: "lz4"
  tables:
    - source: "products"
      target: "ods_products"
      sync_mode: "cdc"
      engine: "MergeTree"
      order_by: ["id", "ts_ms"]

5. 伪代码核心逻辑

class DataSyncAgent:
    def run_source_sync(self, source_name, connector, checkpoint):
        while self.running:
            # 1. 采集数据
            events = connector.fetch_events(checkpoint)

            # 2. 数据清洗
            cleaned_records = []
            for event in events:
                record = self.data_cleaner.clean(event)
                if record:
                    cleaned_records.append(record)

            # 3. ClickHouse写入
            write_result = self.clickhouse_writer.write_batch(cleaned_records)

            # 4. 更新Checkpoint
            new_checkpoint = connector.get_current_position()
            self.checkpoint_manager.save_checkpoint(source_name, new_checkpoint)

6. 文件清单

ai_agent_system/data_sync_agent/
├── SPEC.md                    # 主设计规范文档
├── config/
│   ├── source.yml             # 数据源配置
│   ├── target.yml             # ClickHouse目标配置
│   └── sync_policy.yml        # 同步策略配置
└── sql/
    └── ddl.sql                # ClickHouse DDL
Left-click: follow link, Right-click: select node, Scroll: zoom
x