上下游开发指南

 

数据库 MySQL CDC

更新时间 2023-09-06

版本说明

  • Flink版本:当前仅支持 Flink 1.12 版本。
  • MySQL 版本:
    • Database: 5.7, 8.0.x
    • JDBC Driver: 8.0.16

使用范围

MySQL CDC 连接器支持对 MySQL 数据库的全量和增量读取。底层使用了 Debezium 来做 CDC。

会先读取数据库快照,然后继续读取 binlog,即使发生故障,也可以保证 exactly-once。

MySQL CDC 源不能并行读取,因为只有一个任务可以接收 binlog 事件。

原理

当 MySQL CDC source 启动时,会获取一个全局读锁,以阻止其他数据库客户端的写入操作。然后读取当前的 binlog 位置以及数据库和表的 schema 信息。之后会释放全局读锁,允许其他数据库客户端对数据库进行写操作。扫描全表,并从 binlog 记录位置处开始读取。Flink 作业会周期性的执行 checkpoint 来记录 binlog 位置,当作业崩溃恢复时,便会从之前记录的 Binlog 点继续处理,从而保证 Exactly Once 语义。

注意

全局读锁被持有期间,会阻塞写,可能需要几秒钟,具体取决于表的数量,这段时间会影响在线业务。

前提条件

为 MySQL CDC 连接器创建用户,并配置适当的权限。

  1. 创建 MySQL 用户。

    mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
    
  2. 授予用户所需的权限。

    mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
    
  3. 确认用户的权限。

    mysql> FLUSH PRIVILEGES;
    

详细信息请参考:权限说明

DDL 定义

-- 在 Flink SQL 里注册 MySQL 表 'orders'
CREATE TABLE orders (
  order_id INT,
  order_date TIMESTAMP(0),
  customer_name STRING,
  price DECIMAL(10, 5),
  product_id INT,
  order_status BOOLEAN
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = '$username',
  'password' = '$password',
  'database-name' = 'mydb',
  'table-name' = 'orders'
);

MySQL CDC 源表参数

参数 是否必填 默认值 数据类型 描述
connector String 连接器,固定值为 mysql-cdc
hostname String MySQL 数据库 IP 地址。
username String MySQL 数据库用户名。
password String MySQL 数据库密码。
database-name String MySQL 数据库名称,支持正则匹配多个数据库。
table-name String MySQL 表名,支持正则匹配多张表名。
port 3306 String MySQL 数据库端口号。
server-id String 数据库客户端的一个数字ID,该ID必须是MySQL集群中全局唯一的,建议针对同一个数据库的每个作业都设置一个不同的 ID。默认会随机生成一个 5400~6400 的值。该参数也支持 ID 范围的格式,例如 5400-5408。
server-time-zone UTC String 会话时区,例如:“Asia/Shanghai”。控制 MYSQL 中的 TIMESTAMP 类型如何转换为 STRING。详细请参考
debezium.min.row.count.to.stream.results 1000 Integer 首次读取数据时,当表的条数大于该值,则使用分批读取模式。设置为 ‘0’ 跳过检查始终用流式传输读取数据。
debezium.* String 传递 Debezium 的属性,如:'debezium.snapshot.mode' = 'never'。更多信息请查看 Debezium 的 MySQL 连接器属性

类型映射

MySQL 字段类型 Flink SQL 字段类型
TINYINT TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNED DECIMAL(20, 0)
BIGINT BIGINT
FLOAT FLOAT
DOUBLE
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
TINYINT(1)
BOOLEAN
DATE DATE
TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n)
VARCHAR(n)
TEXT
STRING
BINARY
VARBINARY
BLOB
BYTES

代码示例

示例一

-- 用 Flink SQL 注册 MySQL 表 students,使用 CDC 机制流式获取 changelog
CREATE TABLE mysql_cdc_source (
    id INT,
    name STRING,
    score INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = '$username',
    'password' = '$password',
    'database-name' = 'detail',
    'table-name' = 'students'
)
 
-- 用 Flink SQL 注册 ElasticSearch index stu
CREATE TABLE elasticsearch_sink (
    id INT,
    name STRING,
    score INT,
    PRIMARY KEY (id) NOT ENFORCED              --定义主键则根据主键 upsert,否则是 append 模式
) WITH (
    'connector' = 'elasticsearch-7',           --输出到 ElasticSearch7
    'hosts' = 'http://192.168.100.19:9200',    --ElasticSearch 连接地址
    'index' = 'stu',                           --ElasticSearch 的 Index 名
    'sink.flush-on-checkpoint' = 'true',       --checkpoint 时批量写入
    'sink.bulk-flush.max-actions' = '50',      --每批次最多的操作数
    'sink.bulk-flush.max-size' = '10mb',       --每批次累计最大大小
    'sink.bulk-flush.interval' = '1000ms',     --批量写入的间隔
    'connection.max-retry-timeout' = '1000ms', --每次请求的最大超时时间
    'format' = 'json'                          --输出数据格式,目前只支持 'json'
);
 
-- 将 MySQL 表 students 的 upsert 流实时同步到 ElasticSearch 的 stu index 里
INSERT INTO elasticsearch_sink SELECT id, name, score FROM mysql_cdc_source;

示例二

-- MySQL CDC:订单表
CREATE TABLE orders (
    amount DECIMAL,
    currency STRING,
    order_time AS PROCTIME()
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'retail',
    'table-name' = 'orders'
)
 
-- MySQL :汇率表
CREATE TABLE currency_rates(
    currency STRING,
    rate DECIMAL,
    update_time TIMESTAMPE(3)
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/demo',
    'table-name' = 'cate_dim',
    'username' = 'root',
    'password' = 'password'
)
 
CREATE TABLE print_sink(
    amount DECIMAL
) WITH (
    'connector' = 'print'
)
 
SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency
这篇文档解决了您的问题吗?
0
0