1. 什么是Flink CDC?为什么你需要它?
想象一下你正在经营一家电商平台,商品价格每天都在变动,库存实时更新,用户评价不断新增。传统的做法是每隔几小时甚至一天才把这些数据同步到分析系统,等你看到报表时可能已经错过了最佳决策时机。这就是为什么我们需要实时数据同步技术。
Flink CDC(Change Data Capture)是Apache Flink生态中专门用于捕获数据库变更的组件。它通过读取数据库的日志(比如MySQL的binlog),实时捕捉数据的插入、更新、删除操作。我去年帮一个客户从Canal+Kafka方案迁移到Flink CDC后,他们的数据延迟从原来的15分钟降到了秒级,运维人力节省了60%。
与传统方案相比,Flink CDC有三大杀手锏:
- 链路极简:不再需要维护Canal和Kafka中间件,一条SQL就能搞定全量和增量同步
- 零代码入侵:完全不用修改业务代码,对线上系统零影响
- Exactly-Once语义:确保数据不丢不重,这对财务类数据至关重要
2. 环境准备:手把手搭建实验环境
2.1 硬件配置建议
虽然Flink CDC可以在笔记本上运行,但为了模拟真实场景,建议准备:
- 至少4核CPU(我实测2核跑全量同步时会卡死)
- 8GB内存(Flink JobManager和TaskManager各分配2GB)
- 50GB磁盘空间(存放Flink和MySQL数据)
# 查看系统资源 free -h lscpu df -h2.2 软件版本选择
踩过版本兼容的坑后,我强烈推荐这个组合:
- Flink 1.13.6(最稳定的1.13.x版本)
- flink-sql-connector-mysql-cdc 2.2.0
- MySQL 8.0.28(必须开启binlog)
# 下载Flink和connector wget https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar2.3 MySQL关键配置
很多同学在这一步会踩坑,务必检查my.cnf:
[mysqld] server-id = 1 log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 7执行SHOW VARIABLES LIKE '%binlog%';确认配置生效。上周有个客户因为没设置binlog_row_image导致无法捕获更新前的数据,排查了整整一天。
3. 搭建Flink集群:单机模式实战
3.1 快速安装指南
解压后要做三件事:
- 配置JAVA_HOME(建议JDK8)
- 上传connector到lib目录
- 调整内存配置
tar -zxvf flink-1.13.6-bin-scala_2.11.tgz cd flink-1.13.6 # 修改conf/flink-conf.yaml taskmanager.memory.process.size: 2048m jobmanager.memory.process.size: 1024m3.2 启动集群的正确姿势
不要直接用start-cluster.sh!先做这两步:
- 检查端口占用:
netstat -tuln | grep 8081 - 设置时区(避免时间戳问题):
# 在flink-conf.yaml追加 env.java.opts: -Duser.timezone=GMT+08启动后访问http://localhost:8081,如果看不到Web UI,大概率是内存不足。
4. 电商场景实战:产品表实时同步
4.1 创建源表和数据
我们模拟电商产品表,包含重量字段(后面会演示浮点数精度问题):
CREATE TABLE `products` ( `id` INT NOT NULL, `name` VARCHAR(100), `price` DECIMAL(10,2), `weight` DOUBLE, `last_updated` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ); -- 插入测试数据 INSERT INTO products VALUES (1, 'iPhone 14', 6999.00, 0.172, NOW()), (2, 'MacBook Pro', 12999.00, 1.4, NOW());4.2 配置Flink CDC连接器
这是最容易出错的环节,注意三个关键点:
- snapshot.mode配置初始快照策略
- server-time-zone解决时区问题
- decimal.handling.mode处理精度
CREATE TABLE products_cdc ( id INT, name STRING, price DECIMAL(10,2), weight DOUBLE, last_updated TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'root', 'password' = '123456', 'database-name' = 'test', 'table-name' = 'products', 'server-time-zone' = 'Asia/Shanghai', 'scan.incremental.snapshot.enabled' = 'true', 'scan.incremental.snapshot.chunk.size' = '5000' );4.3 实时验证技巧
不要傻等数据,用这个技巧立即看到变化:
-- 在Flink SQL Client执行 SELECT * FROM products_cdc /*+ OPTIONS('scan.startup.mode'='latest-offset') */; -- 另开一个MySQL会话执行更新 UPDATE products SET price = 6599.00 WHERE id = 1;你会立即在Flink端看到变更记录,包含before和after的完整数据。我在压力测试中发现,当QPS超过500时,建议调整debezium.min.row.count.to.stream.result参数。
5. 生产环境进阶配置
5.1 高可用方案
单机模式只适合测试,生产环境需要:
- 搭建ZooKeeper集群
- 配置Checkpoint和Savepoint
- 设置重启策略
# conf/flink-conf.yaml high-availability: zookeeper high-availability.storageDir: hdfs://namenode:8020/flink/ha/ high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:21815.2 性能调优参数
根据我的压测经验,这些参数最影响性能:
| 参数 | 推荐值 | 说明 |
|---|---|---|
| scan.incremental.snapshot.chunk.size | 5000 | 快照分块大小 |
| chunk-meta.group.size | 1000 | 元数据分组大小 |
| connect.timeout | 30s | 连接超时时间 |
| connection.pool.size | 20 | 连接池大小 |
5.3 常见故障排查
问题1:CDC表无法捕获删除操作
- 检查binlog_row_image是否设置为FULL
- 确认用户有REPLICATION权限
问题2:浮点数精度丢失
- 在WITH参数中添加
'decimal.handling.mode'='precise' - 或者改用DECIMAL类型
问题3:同步延迟越来越高
- 调整
scan.incremental.snapshot.enabled'='true' - 增加TaskManager的并行度
6. 与传统方案的对比实测
去年我们在生产环境做了对比测试:
| 指标 | Flink CDC | Canal+Kafka |
|---|---|---|
| 端到端延迟 | 1.2s | 8.5s |
| CPU占用 | 15% | 35% |
| 运维复杂度 | 2个组件 | 5个组件 |
| 数据一致性 | Exactly-Once | At-Least-Once |
特别说明:当源表没有主键时,Canal方案会直接报错,而Flink CDC可以通过scan.incremental.snapshot.enabled'='false'降级处理。
7. 踩坑记录与最佳实践
时区问题:所有节点必须统一时区,最好用UTC。有次凌晨3点收到告警,发现是某台服务器时区设置错误导致时间戳错乱。
大表初始化:对于亿级数据表,先用
'scan.startup.mode'='initial'做全量同步,完成后切换为'latest-offset'。监控指标:必须监控这些指标:
- source.numRecordsInPerSecond
- currentFetchEventTimeLag
- source.idleTime
Schema变更:ALTER TABLE操作会导致CDC中断。建议先在测试环境执行,观察兼容性。