x

CDC(Change Data Capture)工具全景解析

CDC 是现代数据同步架构的核心组件,本文系统性介绍 CDC 的定义、工作原理、主流工具对比,以及工具之间的协作关系与典型数据流。


一、CDC 是什么

CDC(Change Data Capture,变更数据捕获) 是一种监听数据库数据变更(INSERT / UPDATE / DELETE),并将变更内容以事件流的形式向外推送的技术方案。

核心价值

传统方案的问题 CDC 带来的改变
定时批量轮询,全量同步,延迟高 监听 binlog/redo log,实时增量捕获,延迟低至毫秒级
对源库产生额外查询压力 读取数据库日志,不锁表,不影响源库业务
数据重复采集,浪费资源 仅捕获变更数据,精确按需同步
无法捕获删除操作和旧值 完整记录变更前后状态,支持回滚和审计

典型应用场景

业务库(MySQL) ──binlog──► CDC工具 ──► Kafka ──► 数据仓库/ES/Redis/消息队列
  • 实时数据同步:MySQL → Elasticsearch(搜索加速)
  • 数据仓库供给:业务库 → Kafka → ClickHouse / BigQuery / Snowflake
  • 缓存更新:数据库变更 → Redis 缓存同步
  • 微服务事件驱动:数据库变更 → 消息总线 → 下游服务响应
  • 数据库迁移:不停服迁移,CDC 做增量补偿

二、CDC 工作原理

CDC 工具的核心是读取数据库的变更日志,而非直接查询数据表。

MySQL 侧:binlog(二进制日志)

MySQL 在执行数据变更时会同时写入 binlog,记录了每条变更的详细信息:

# binlog 记录内容示例
Table: orders
Operation: UPDATE
Before: {id: 1, status: "pending", amount: 100}
After:  {id: 1, status: "paid",    amount: 100}
Timestamp: 2026-01-01 10:00:01
Server ID: 1
Binlog Position: 45678
binlog 格式 说明
ROW(推荐) 记录变更后的完整行数据,与 SQL 无关,解析简单
STATEMENT 记录执行的 SQL 语句,可能有函数不确定性问题
MIXED 混用,MySQL 自动选择更安全的格式

⚠️ CDC 工具必须使用 ROW 格式的 binlog,否则无法准确捕获所有变更。

CDC 工具侧:解析 + 转换 + 推送

binlog 日志流
    │
    ▼
┌─────────────────┐
│  CDC 工具        │  ← 伪装成 MySQL 从库,订阅 binlog
│  (解析 + 封装)   │
└────────┬────────┘
         │  封装为统一格式(JSON/Avro/Protobuf)
         ▼
┌─────────────────┐
│  Kafka / 消息队列 │  ← 变更事件进入消息总线
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  下游消费者       │  ← ES / ClickHouse / Redis / 下游服务
└─────────────────┘

关键概念:伪装从库

CDC 工具(如 Debezium、Canal)并不是直接读取 binlog 文件,而是伪装成 MySQL 的一个从库(Replica),通过 MySQL 主从复制协议来接收 binlog 事件

# Debezium 连接配置示例
database.hostname=mysql-master
database.port=3306
database.user=cdc_user          # 需有 REPLICATION SLAVE 权限
database.password=xxx
database.serverId=85744          # 唯一 server ID,不能与从库冲突

三、主流 CDC 工具对比

3.1 工具全景图

开源生态
├── DebeziumRed Hat)────────── Kafka Connect 生态,知名度最高
├── Canal(阿里巴巴)──────────── 内部开源,最早在国内大规模使用
├── MaxwellZendesk)────────── 轻量级,MySQL  Kafka/Redis
├── DatabusLinkedIn)───────── 实时性最强,支持回溯
├── Alibaba Cloud DTS ─────────── 云服务,一站式数据传输
│
商业/企业级
├── Oracle GoldenGate ─────────── Oracle 官方,商业版最强
├── AWS DMS ──────────────────── AWS 托管,支持多种源
├── Fivetran ─────────────────── SaaS 模式,连接器最全
├── Striim ───────────────────── 实时流处理 + CDC 集成
├── Airbyte ──────────────────── 开源 ELTCDC 是其核心能力
└── HVR / Qlik Replicate ─────── 企业级数据集成

3.2 核心工具详解

① Debezium(推荐首选)

属性 说明
开发方 Red Hat(已被 IBM 收购),开源(Apache 2.0)
架构 基于 Kafka Connect,connector 模式运行
支持的数据库 MySQL、PostgreSQL、MongoDB、SQL Server、Oracle、DB2
消息格式 自定义 JSON(携带 schema)/ Avro / Protobuf
优势 社区活跃、文档完善、支持 Schema Evolution、连接器最全
输出 Kafka Topic(每个表一个 topic)
# Debezium 核心流程
MySQL binlog  Debezium MySQL Connector  Kafka Topic  下游消费者

Debezium 的独特能力——Schema Evolution
当源表结构发生变更(新增列 / 修改类型),Debezium 可以自动处理并告知下游消费者,Kafka Schema Registry 会验证兼容性,避免数据不一致。


② Canal(阿里巴巴)

属性 说明
开发方 阿里巴巴内部开源(GitHub: alibaba/canal)
架构 自成体系,不需要 Kafka(可直接推送到 MQ/ES/RDB)
支持的数据库 MySQL(主要)、MariaDB
消息格式 Protocol Buffers(轻量,高性能)
优势 轻量、对 MySQL 兼容性深、国内社区大、云产品成熟(DTS)
输出 Kafka / RocketMQ / Elasticsearch / RDBMS

Canal 的工作流程

Canal Server(伪装从库)→ Canal Parser(解析binlog)→ Canal Filter(过滤)→ Canal Adapter(输出到下游)

Canal 最早解决的是阿里巴巴内部"双11"大促期间数据库变更同步到计算引擎的问题,在淘宝/天猫大规模验证过。


③ Maxwell(Zendesk)

属性 说明
开发方 Zendesk 开源(GitHub: zendesk/maxwell)
架构 轻量级单一进程,MySQL → Kafka / Redis / RabbitMQ
支持的数据库 MySQL(主要)、MariaDB、Percona
消息格式 JSON(比 Canal 更简洁)
优势 配置极简、资源占用低、适合轻量级同步场景
输出 Kafka / Redis / RabbitMQ / stdout
# Maxwell 配置示例(最简单的启动)
./bin/maxwell --user='cdc_user' --password='xxx' \
  --host='mysql-master' \
  --producer=kafka \
  --kafka.bootstrap.servers=kafka:9092 \
  --kafka_topic=maxwell-events

④ Databus(LinkedIn)

属性 说明
开发方 LinkedIn 开源
架构 高度优化,SCN(System Change Number)游标,毫秒级延迟
支持的数据库 Oracle(主要)、MySQL
优势 真正的流式处理,支持从任意时间点回溯重放事件
劣势 项目活跃度低,Oracle 为主,MySQL 支持较弱

⑤ Alibaba Cloud DTS(数据传输服务)

属性 说明
开发方 阿里云商业服务
架构 云托管,开箱即用
支持的数据库 MySQL、PolarDB、PostgreSQL、Redis、MongoDB、SQL Server 等
优势 无需运维、云厂商深度集成、在线迁移不停服
劣势 供应商锁定、成本较高

3.3 横向对比表

维度 Debezium Canal Maxwell Databus DTS
生态 Kafka Connect 自有架构 自有架构 自有架构 云服务
MySQL 支持 ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐ ⭐⭐⭐⭐⭐
PostgreSQL ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐
Oracle ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐
延迟 毫秒级 毫秒级 毫秒级 亚毫秒级 秒级
Schema Evolution ✅ 原生支持 ⚠️ 需额外处理 ⚠️ 部分支持
活跃度 非常活跃 较活跃 一般 云服务
学习成本 较高(Kafka生态) 中等
输出到 ES Kafka+Connector 原生支持 Kafka 自有协议 原生支持
开源协议 Apache 2.0 Apache 2.0 MIT Apache 2.0 商业

四、CDC 工具协作关系与数据流

4.1 核心数据流(Debezium + Kafka 为例)

┌─────────────────────────────────────────────────────────────────┐
                        MySQL 主库                                
                     (开启 binlog ROW 格式)                        
  ┌────────────┐                                                
    binlog     ◄── 记录所有 DML 变更                           
  └──────┬─────┘                                                
└─────────┼──────────────────────────────────────────────────────┘
           MySQL Replication Protocol(伪装从库)
          
┌─────────────────────────────────────────────────────────────────┐
  Kafka Connect 集群                                             
  ┌──────────────────────────────────────────────────────────┐  
   Debezium MySQL Source Connector                             
    ├── 读取 binlog position                                  
    ├── 解析 ROW 事件(parse binlog event                    
    ├── 封装为 Debezium Event(带 schema + payload           
    └── 发送到 Kafka Topic                                     
  └──────────────────────────────────────────────────────────┘  
└─────────────────────────────────────────────────────────────────┘
          
           Kafka Topic: dbserver1.inventory.products
┌─────────────────────────────────────────────────────────────────┐
  Kafka 消息总线                                                 
                                                                  
  Topic 分区策略:默认按表名分区,保证同一表事件有序               
  消息格式(JSON 示例):                                          
  {                                                              
    "before": {"id": 1, "name": "手机", "stock": 10},          
    "after":  {"id": 1, "name": "手机", "stock": 8},            
    "op": "UPDATE",          // create / read / update / delete 
    "ts_ms": 1735800000123,  // 时间戳                           
    "source": {               // 元信息                         
      "db": "ecommerce",      // 库名                           
      "table": "products"     // 表名                           
    }                                                            
  }                                                              
└─────────────────────────────────────────────────────────────────┘
          
          ├──► [消费者1] Elasticsearch Sink Connector  ES 索引
          ├──► [消费者2] JDBC Sink Connector  数据仓库
          ├──► [消费者3] 自定义 Kafka Consumer  Redis / 业务服务
          └──► [消费者4] Flink SQL  实时计算

4.2 Canal 输出链路(国内常见方案)

MySQL → Canal Server → Canal Parser → Canal Filter
                                      │
                    ┌─────────────────┼─────────────────┐
                    ▼                 ▼                 ▼
              Kafka/RocketMQ      Elasticsearch       RDBMS
              (消息队列)        (搜索)            (数据库)

Canal 相比 Debezium 更直接,不需要 Kafka Connect 层,Canal Server 直接推送消息,减少了架构层级,在国内电商/物流场景大量使用。

MySQL → Debezium → Kafka → Flink SQL CDC → 实时物化视图/计算结果

Flink CDC Connectors 直接消费 Debezium 格式的事件,在 Flink 内部完成:

  • 无损同步:增量快照(Incremental Snapshot)
  • 实时计算:对变更流做窗口聚合
  • 输出到多个 Sink:ClickHouse、Iceberg、Kafka

五、CDC + Elasticsearch 典型配置

方案:Debezium → Kafka → Elasticsearch Sink

第一步:MySQL 开启 binlog

-- my.cnf
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 7
server_id = 1

第二步:创建 CDC 用户

CREATE USER 'debezium'@'%' IDENTIFIED BY 'password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium';
GRANT ALL PRIVILEGES ON heartbeat.* TO 'debezium';  -- 心跳表

第三步:启动 Debezium MySQL Connector

{
  "name": "mysql-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql-master",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "password",
    "database.server.id": "85744",
    "topic.prefix": "dbserver1",
    "database.include.list": "ecommerce",
    "table.include.list": "ecommerce.products,ecommerce.orders",
    "snapshot.mode": "initial",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes"
  }
}

第四步:Elasticsearch Sink Connector 消费

{
  "name": "es-sink-connector",
  "config": {
    "connector.class": "ElasticsearchSinkConnector",
    "kafka.bootstrap.servers": "kafka:9092",
    "kafka.topic": "dbserver1.ecommerce.products",
    "elasticsearch.url": "http://elasticsearch:9200",
    "index.name": "products",
    "type.name": "_doc",
    "pk.mode": "record_key",
    "pk.fields": "id"
  }
}

六、CDC 常见问题与处理

Q1:binlog 格式选哪种?

ROW 格式是必须项。Statement 格式只记录 SQL,CDC 工具无法可靠地还原变更前后状态。

Q2:DDL 变更(ALTER TABLE)如何处理?

  • Debezium:支持 Schema Evolution,需要 Schema Registry 配合,DDL 事件会写入单独的 topic
  • Canal/Maxwell:DDL 事件会单独处理,需要下游消费者兼容处理
  • 建议:生产环境尽量避免高峰期执行 DDL,或使用在线 DDL 工具(pt-online-schema-change / gh-ost)

Q3:如何保证 Exactly-Once 语义?

CDC 工具本身提供至少一次(At-Least-Once) 语义保证:

  • MySQL 端记录 binlog position(可回溯)
  • 消费者端需要做幂等写入(利用主键或唯一键去重)
  • Debezium + Kafka Transactions 可实现端到端 Exactly-Once

Q4:大规模表的全量同步怎么做?

方案 说明
Snapshot(快照)模式 Debezium 首次启动时自动扫描全表,生成快照写入 Kafka
增量快照 Flink CDC 支持增量快照,不锁表,线上可用
外部快照 用 mydumper 导出 → 导入目标库,CDC 只同步增量

Q5:多表合并到同一个 ES 索引?

  • Debezium 天然按分发到不同 Topic,需要 Sink 端做跨 Topic join
  • Flink CDC 可以读取多个 Topic 并做实时 Join
  • 简单场景:应用层在写入 MySQL 时同时发消息到 Kafka,不走 CDC

本文档为 CDC 技术全景扫盲,如需深入某个工具的详细配置,可进一步展开。

Left-click: follow link, Right-click: select node, Scroll: zoom
x