数据仓库 ClickHouse
版本说明
当前仅支持 Flink 1.12 版本。
使用范围
ClickHouse 仅支持作为目的表(Sink)。
DDL 定义
CREATE TABLE t_user (
`user_id` BIGINT,
`user_type` INTEGER,
`language` STRING,
`country` STRING,
`gender` STRING,
`score` DOUBLE,
`list` ARRAY<STRING>,
`map` Map<STRING, BIGINT>,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://{ip}:{port}',
'database-name' = 'tutorial',
'table-name' = 'users',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);
ClickHouse 参数
参数值 | 是否必填 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 是 | 无 | String | 连接器,固定值为 clickhouse 。 |
url | 是 | 无 | String | ClickHouse 服务的URL地址,clickhouse://{ip}:{port}。 |
username | 是 | 无 | String | ClickHouse 服务的用户名。 |
password | 是 | 无 | String | ClickHouse 服务的密码。 |
database-name | 否 | default | String | ClickHouse 数据库名称。 |
table-name | 是 | 无 | String | ClickHouse 服务的表名。 |
sink.batch-size | 否 | 1000 | Integer | 刷新频率,数据量超过该大小会刷新写入数据。 |
sink.flush-interval | 否 | 1s | Duration | 刷新频率,超过这个刷新间隔 异步线程会刷新写入数据。 |
sink.max-retries | 否 | 3 | Integer | 写入失败重试最大次数,超过该次数任务会失败终止。 |
sink.write-local | 否 | false | Boolean | 针对 distributed 表,会直接写入当前指定 url 的节点表中。 |
sink.partition-startegy | 否 | balanced | String | 写入策略:balanced(轮询),hash(根据分区 key 的 hash 值写入),shuffle(随机写入)。 |
sink.partition-key | 否 | 无 | String | hash 策略中的分区 key。 |
sink.ignore-delete | 否 | true | Boolean | 是否将 update 语句视为 insert 语句并忽略删除,默认为 true。 |
类型映射
ClickHouse 类型 | Flink SQL 类型 |
---|---|
STRING | CHAR |
String / IP / UUID | VARCHAR |
String / Enum | STRING |
UInt8 | BOOLEAN |
FixedString | BYTES |
Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 | DECIMAL |
Int8 | TINYINT |
Int16 / UInt8 | SMALLINT |
Int32 / UInt16 / Interval | INTEGER |
Int64 / UInt32 | BIGINT |
Float32 | FLOAT |
Float64 | DOUBLE |
Date | DATE |
DateTime | TIME |
DateTime | TIMESTAMP |
DateTime | TIMESTAMP_LTZ |
Int32 | INTERVAL_YEAR_MONTH |
Int64 | INTERVAL_DAY_TIME |
Not supported | ROW |
Not supported | MULTISET |
Not supported | RAW |
代码示例
CREATE TABLE source(
id INT,
province STRING
) WITH (
'connector' = 'faker',
'fileds.id.expression' = '#{number.numberBetween ''1'',''100''}',
'fields.province.expression' = '#{regexify ''(河北省|山西省|辽宁省|吉林省|黑龙江省|江苏省|浙江省|安徽省|福建省|江西省|山东省|河南省|湖北省|湖南省|广东省|海南省|四川省|贵州省|云南省|陕西省|甘肃省|青海省|台湾省){1}''}',
'rows-per-second' = '1'
);
CREATE TABLE clickhouse_sink(
id INT,
province STRING
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://localhost:6379',
'database-name' = 'tutorial',
'username' = 'default',
'password' = 'default',
'table-name' = 'users',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);
INSERT INTO clickhouse_sink SELECT * FROM source;