数据库 MySQL CDC
版本说明
- Flink版本:当前仅支持 Flink 1.12 版本。
- MySQL 版本:
- Database: 5.7, 8.0.x
- JDBC Driver: 8.0.16
 
使用范围
MySQL CDC 连接器支持对 MySQL 数据库的全量和增量读取。底层使用了 Debezium 来做 CDC。
会先读取数据库快照,然后继续读取 binlog,即使发生故障,也可以保证 exactly-once。
MySQL CDC 源不能并行读取,因为只有一个任务可以接收 binlog 事件。
原理
当 MySQL CDC source 启动时,会获取一个全局读锁,以阻止其他数据库客户端的写入操作。然后读取当前的 binlog 位置以及数据库和表的 schema 信息。之后会释放全局读锁,允许其他数据库客户端对数据库进行写操作。扫描全表,并从 binlog 记录位置处开始读取。Flink 作业会周期性的执行 checkpoint 来记录 binlog 位置,当作业崩溃恢复时,便会从之前记录的 Binlog 点继续处理,从而保证 Exactly Once 语义。
注意
全局读锁被持有期间,会阻塞写,可能需要几秒钟,具体取决于表的数量,这段时间会影响在线业务。
前提条件
为 MySQL CDC 连接器创建用户,并配置适当的权限。
- 
创建 MySQL 用户。 mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
- 
授予用户所需的权限。 mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
- 
确认用户的权限。 mysql> FLUSH PRIVILEGES;
详细信息请参考:权限说明。
DDL 定义
-- 在 Flink SQL 里注册 MySQL 表 'orders'
CREATE TABLE orders (
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10, 5),
  product_id INT,
  order_status BOOLEAN
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = '$username',
  'password' = '$password',
  'database-name' = 'mydb',
  'table-name' = 'orders'
);
MySQL CDC 源表参数
| 参数 | 是否必填 | 默认值 | 数据类型 | 描述 | 
|---|---|---|---|---|
| connector | 是 | 无 | String | 连接器,固定值为 mysql-cdc。 | 
| hostname | 是 | 无 | String | MySQL 数据库 IP 地址。 | 
| username | 是 | 无 | String | MySQL 数据库用户名。 | 
| password | 是 | 无 | String | MySQL 数据库密码。 | 
| database-name | 是 | 无 | String | MySQL 数据库名称,支持正则匹配多个数据库。 | 
| table-name | 是 | 无 | String | MySQL 表名,支持正则匹配多张表名。 | 
| port | 否 | 3306 | String | MySQL 数据库端口号。 | 
| server-id | 否 | 无 | String | 数据库客户端的一个数字ID,该ID必须是MySQL集群中全局唯一的,建议针对同一个数据库的每个作业都设置一个不同的 ID。默认会随机生成一个 5400~6400 的值。该参数也支持 ID 范围的格式,例如 5400-5408。 | 
| server-time-zone | 否 | UTC | String | 会话时区,例如:“Asia/Shanghai”。控制 MYSQL 中的 TIMESTAMP 类型如何转换为 STRING。详细请参考。 | 
| debezium.min.row.count.to.stream.results | 否 | 1000 | Integer | 首次读取数据时,当表的条数大于该值,则使用分批读取模式。设置为 ‘0’ 跳过检查始终用流式传输读取数据。 | 
| debezium.* | 否 | 无 | String | 传递 Debezium 的属性,如: 'debezium.snapshot.mode' = 'never'。更多信息请查看 Debezium 的 MySQL 连接器属性。 | 
类型映射
| MySQL 字段类型 | Flink SQL 字段类型 | 
|---|---|
| TINYINT | TINYINT | 
| SMALLINT TINYINT UNSIGNED | SMALLINT | 
| INT MEDIUMINT SMALLINT UNSIGNED | INT | 
| BIGINT INT UNSIGNED | BIGINT | 
| BIGINT UNSIGNED | DECIMAL(20, 0) | 
| BIGINT | BIGINT | 
| FLOAT | FLOAT | 
| DOUBLE DOUBLE PRECISION | DOUBLE | 
| NUMERIC(p, s) DECIMAL(p, s) | DECIMAL(p, s) | 
| BOOLEAN TINYINT(1) | BOOLEAN | 
| DATE | DATE | 
| TIME [(p)] | TIME [(p)] [WITHOUT TIMEZONE] | 
| DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | 
| CHAR(n) VARCHAR(n) TEXT | STRING | 
| BINARY VARBINARY BLOB | BYTES | 
代码示例
示例一
-- 用 Flink SQL 注册 MySQL 表 students,使用 CDC 机制流式获取 changelog
CREATE TABLE mysql_cdc_source (
    id INT,
    name STRING,
    score INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = '$username',
    'password' = '$password',
    'database-name' = 'detail',
    'table-name' = 'students'
)
 
-- 用 Flink SQL 注册 ElasticSearch index stu
CREATE TABLE elasticsearch_sink (
    id INT,
    name STRING,
    score INT,
    PRIMARY KEY (id) NOT ENFORCED              --定义主键则根据主键 upsert,否则是 append 模式
) WITH (
    'connector' = 'elasticsearch-7',           --输出到 ElasticSearch7
    'hosts' = 'http://192.168.100.19:9200',    --ElasticSearch 连接地址
    'index' = 'stu',                           --ElasticSearch 的 Index 名
    'sink.flush-on-checkpoint' = 'true',       --checkpoint 时批量写入
    'sink.bulk-flush.max-actions' = '50',      --每批次最多的操作数
    'sink.bulk-flush.max-size' = '10mb',       --每批次累计最大大小
    'sink.bulk-flush.interval' = '1000ms',     --批量写入的间隔
    'connection.max-retry-timeout' = '1000ms', --每次请求的最大超时时间
    'format' = 'json'                          --输出数据格式,目前只支持 'json'
);
 
-- 将 MySQL 表 students 的 upsert 流实时同步到 ElasticSearch 的 stu index 里
INSERT INTO elasticsearch_sink SELECT id, name, score FROM mysql_cdc_source;
示例二
-- MySQL CDC:订单表
CREATE TABLE orders (
    amount DECIMAL,
    currency STRING,
    order_time AS PROCTIME()
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'retail',
    'table-name' = 'orders'
)
 
-- MySQL :汇率表
CREATE TABLE currency_rates(
    currency STRING,
    rate DECIMAL,
    update_time TIMESTAMPE(3)
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/demo',
    'table-name' = 'cate_dim',
    'username' = 'root',
    'password' = 'password'
)
 
CREATE TABLE print_sink(
    amount DECIMAL
) WITH (
    'connector' = 'print'
)
 
SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency