上下游开发指南

 

数据仓库 ClickHouse

更新时间 2023-09-06

版本说明

当前仅支持 Flink 1.12 版本。

使用范围

ClickHouse 仅支持作为目的表(Sink)。

DDL 定义

CREATE TABLE t_user (
    `user_id` BIGINT,
    `user_type` INTEGER,
    `language` STRING,
    `country` STRING,
    `gender` STRING,
    `score` DOUBLE,
    `list` ARRAY<STRING>,
    `map` Map<STRING, BIGINT>,
    PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://{ip}:{port}',
    'database-name' = 'tutorial',
    'table-name' = 'users',
    'sink.batch-size' = '500',
    'sink.flush-interval' = '1000',
    'sink.max-retries' = '3'
);

ClickHouse 参数

参数值 是否必填 默认值 数据类型 描述
connector String 连接器,固定值为 clickhouse
url String ClickHouse 服务的URL地址,clickhouse://{ip}:{port}。
username String ClickHouse 服务的用户名。
password String ClickHouse 服务的密码。
database-name default String ClickHouse 数据库名称。
table-name String ClickHouse 服务的表名。
sink.batch-size 1000 Integer 刷新频率,数据量超过该大小会刷新写入数据。
sink.flush-interval 1s Duration 刷新频率,超过这个刷新间隔 异步线程会刷新写入数据。
sink.max-retries 3 Integer 写入失败重试最大次数,超过该次数任务会失败终止。
sink.write-local false Boolean 针对 distributed 表,会直接写入当前指定 url 的节点表中。
sink.partition-startegy balanced String 写入策略:balanced(轮询),hash(根据分区 key 的 hash 值写入),shuffle(随机写入)。
sink.partition-key String hash 策略中的分区 key。
sink.ignore-delete true Boolean 是否将 update 语句视为 insert 语句并忽略删除,默认为 true。

类型映射

ClickHouse 类型 Flink SQL 类型
STRING CHAR
String / IP / UUID VARCHAR
String / Enum STRING
UInt8 BOOLEAN
FixedString BYTES
Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256 DECIMAL
Int8 TINYINT
Int16 / UInt8 SMALLINT
Int32 / UInt16 / Interval INTEGER
Int64 / UInt32 BIGINT
Float32 FLOAT
Float64 DOUBLE
Date DATE
DateTime TIME
DateTime TIMESTAMP
DateTime TIMESTAMP_LTZ
Int32 INTERVAL_YEAR_MONTH
Int64 INTERVAL_DAY_TIME
Not supported ROW
Not supported MULTISET
Not supported RAW

代码示例

CREATE TABLE source(
    id INT,
    province STRING
) WITH (
    'connector' = 'faker',
    'fileds.id.expression' = '#{number.numberBetween ''1'',''100''}',
    'fields.province.expression'  = '#{regexify ''(河北省|山西省|辽宁省|吉林省|黑龙江省|江苏省|浙江省|安徽省|福建省|江西省|山东省|河南省|湖北省|湖南省|广东省|海南省|四川省|贵州省|云南省|陕西省|甘肃省|青海省|台湾省){1}''}',
    'rows-per-second'          = '1'
);
 
CREATE TABLE clickhouse_sink(
    id INT,
    province STRING
) WITH (
    'connector' = 'clickhouse',
    'url' = 'clickhouse://localhost:6379',
    'database-name' = 'tutorial',
    'username' = 'default',
    'password' = 'default',
    'table-name' = 'users',
    'sink.batch-size' = '500',
    'sink.flush-interval' = '1000',
    'sink.max-retries' = '3'
);
 
INSERT INTO clickhouse_sink SELECT * FROM source;
这篇文档解决了您的问题吗?
0
0