数据库 MySQL
版本说明
当前仅支持 Flink 1.12 版本。
使用范围
使用 JDBC 驱动程序从 MySQL 中读取数据或将数据写入其中。 
支持作为数据源、维表、数据目的。
DDL 定义
CREATE TABLE mysql_table (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/demo',
   'table-name' = 'users',
   'username' = 'root',
   'password' = '123456'
);
源表 WITH 参数
| 参数 | 是否必填 | 默认值 | 数据类型 | 描述 | 
|---|---|---|---|---|
| connector | 是 | 无 | String | 连接器,固定值为 jdbc。 | 
| url | 是 | 无 | String | MySQL 数据库 JDBC URL。 | 
| table-name | 是 | 无 | String | MySQL 表名。 | 
| driver | 否 | 无 | String | JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。 | 
| username | 是 | 无 | String | MySQL 数据库用户名。 | 
| password | 是 | 无 | String | MySQL 数据库密码。 | 
| scan.partition.column | 否 | 无 | Integer | 指定对输入数据进行分区扫描的列名。该列必须是数值类型、日期类型或时间戳类型。 | 
| scan.partition.num | 否 | 无 | Integer | 分区扫描启用后,指定分区数。 | 
| scan.partition.lower-bound | 否 | 无 | Integer | 分区扫描启用后,指定第一个分区的最小值。 | 
| scan.partition.upper-bound | 否 | 无 | Integer | 分区扫描启用后,指定最后一个分区的最大值。 | 
| scan.fetch-size | 否 | 0 | Integer | 每次从数据库读取时,批量获取的行数。 | 
| scan.auto-commit | 否 | true | Boolean | 自动提交标志,决定每个语句是否在事务中自动提交。 | 
说明
scan.partition.lower-bound和scan.partition.upper-bound仅用于决定分区步长,而不是用于过滤表中的行。所以表中的所有行都会被分区并返回。- 分区扫描功能可以加速读取数据,每个子任务可以读取自己的分区。使用该功能时,四个
scan.partition开头的参数都必须指定。
维表 WITH 参数
| 参数 | 是否必填 | 默认值 | 数据类型 | 描述 | 
|---|---|---|---|---|
| connector | 是 | 无 | String | 连接器,固定值为 jdbc。 | 
| url | 是 | 无 | String | MySQL 数据库 JDBC URL。 | 
| table-name | 是 | 无 | String | MySQL 表名。 | 
| driver | 否 | 无 | String | JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。 | 
| username | 是 | 无 | String | MySQL 数据库用户名。 | 
| password | 是 | 无 | String | MySQL 数据库密码。 | 
| lookup.cache.max-rows | 否 | 无 | Integer | Lookup 缓存中最多缓存的数据行数。超过此值,最旧的行将过期。默认情况下禁用查找缓存。 | 
| lookup.cache.ttl | 否 | 无 | Duration | Lookup 缓存中每条记录最长的缓存时间。超过此时间,最旧的行将过期。默认情况下禁用 Lookup 缓存。 | 
| lookup.max-retries | 否 | 3 | Integer | 数据库查询失败时,最多重试的次数。 | 
说明
- Lookup 缓存提升维表读取性能,目前仅支持同步读取模式。
- 默认情况下,Lookup 缓存未启用,所有请求需要请求数据库。
- 通过设置
lookup.cache.max-rows和lookup.cache.ttl可以启用 Lookup 缓存,这时每个进程(即 TaskManager)都会持有一份缓存。Flink 会先查找缓存,缓存未命中时会向数据库发送请求,并根据返回的值更新缓存。
结果表 WITH 参数
| 参数 | 是否必填 | 默认值 | 数据类型 | 描述 | 
|---|---|---|---|---|
| connector | 是 | 无 | String | 连接器,固定值为 jdbc。 | 
| url | 是 | 无 | String | MySQL 数据库 JDBC URL。 | 
| table-name | 是 | 无 | String | MySQL 表名。 | 
| driver | 否 | 无 | String | JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。 | 
| username | 是 | 无 | String | MySQL 数据库用户名。 | 
| password | 是 | 无 | String | MySQL 数据库密码。 | 
| sink.buffer-flush.max-rows | 否 | 100 | Integer | 批量输出时,最多缓存的数据行数。设置为 0 表示禁用输出缓存。 | 
| sink.buffer-flush.interval | 否 | 1s | Duration | 每隔多久异步线程自动批量输出数据。设置为 0 表示禁用自动异步输出。 | 
| sink.max-retries | 否 | 3 | Integer | 数据库写入失败时,最多重试的次数。 | 
说明
- 如果在 DDL 上定义了主键,则 sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。
- 对于 MySQL 表,Upsert 功能的实现采用
INSERT .. ON DUPLICATE KEY UPDATE ..语法。
类型映射
| MySQL 字段类型 | Flink SQL 字段类型 | 
|---|---|
| TINYINT | TINYINT | 
| SMALLINT TINYINT UNSIGNED | SMALLINT | 
| INT MEDIUMINT SMALLINT UNSIGNED | INT | 
| BIGINT INT UNSIGNED | BIGINT | 
| BIGINT UNSIGNED | DECIMAL(20, 0) | 
| BIGINT | BIGINT | 
| FLOAT | FLOAT | 
| DOUBLE DOUBLE PRECISION | DOUBLE | 
| NUMERIC(p, s) DECIMAL(p, s) | DECIMAL(p, s) | 
| BOOLEAN TINYINT(1) | BOOLEAN | 
| DATE | DATE | 
| TIME [(p)] | TIME [(p)] [WITHOUT TIMEZONE] | 
| DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | 
| CHAR(n) VARCHAR(n) TEXT | STRING | 
| BINARY VARBINARY BLOB | BYTES | 
代码示例
示例一
-- 用 Flink SQL 注册 MySQL 表 users
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
 
CREATE TABLE MY_DIM (
    key BIGINT,
    desc STRING,
    rt AS PROCTIME(),
    PRIMARY KEY (key) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://localhost:3306/mydatabase',
    'table-name' = 'user_extr'
);
-- 从另一张表 T 迁移数据到 MyUserTable 表
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;
 
-- 查看数据
SELECT id, name, age, status FROM MyUserTable;
 
-- 将 MyUserTable 作为维表与其他流式数据源进行 temporal join
SELECT * FROM MyUserTable LEFT JOIN MY_DIM FOR SYSTEM_TIME AS OF MY_DIM.rt
ON MY_DIM.key = MyUserTable.id;
示例二
-- 利用 datagen 随机生成数据源数据
CREATE TEMPORARY TABLE datagen_source(
    name STRING,
    age INT
) WITH (
    'connector' = 'datagen'
)
 
-- 用 Flink SQL 注册 MySQL 表 students
CREATE TEMPORARY TABLE mysql_sink(
    name VARCHAR,
    age INT
) WITH (
    'connector' = 'jdbc'
    'url' = 'jdbc://mysql://localhost:3306/mydb',
    'table-name' = 'students',
    'username' = 'root',
    'password' = '123456'
)
 
INSERT INTO mysql_sink
SELECT * FROM datagen_source;