数据库 HBase
版本说明
当前仅支持 Flink 1.12 版本。
使用范围
支持读取和写入 HBase 集群。
DDL 定义
注意
- HBase 表所有列簇需要声明为 ROW 类型,内部定义列名和类型,可定义多组。
- 非 ROW 类型的字段会被识别为 rowkey,rowkey 可以定义为任意名字。
-- 在 Flink SQL 里注册 HBase 表 'mytable'
CREATE TABLE hTable (
 rowkey INT,
 family1 ROW<q1 INT>,
 family2 ROW<q2 STRING, q3 BIGINT>,
 family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
 PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
 'connector' = 'hbase-2.2',
 'table-name' = 'mytable',
 'zookeeper.quorum' = 'localhost:2181'
 'zookeeper.znode.parent' = '/hbase/cl-8wry3tmz'
);
HBase 源表 WITH 参数
| 参数值 | 是否必填 | 默认值 | 数据类型 | 描述 | 
|---|---|---|---|---|
| connector | 是 | 无 | String | 连接器,目前支持 hbase-2.2。 | 
| table-name | 是 | 无 | String | HBase 表名。 | 
| zookeeper.quorum | 是 | 无 | String | HBase 的 zookeeper 地址。 | 
| zookeeper.znode.parent | 是 | /hbase | String | HBase 在 zookeeper 中的根目录,请根据实际情况进行配置。例如: /hbase/{集群id}。 | 
| null-string-literal | 否 | null | String | HBase 字段类型为 String 时,遇到空值则将该字段赋值为当前参数值。 | 
HBase 维表 WITH 参数
| 参数值 | 是否必填 | 默认值 | 数据类型 | 描述 | 
|---|---|---|---|---|
| connector | 是 | 无 | String | 连接器,目前支持 hbase-2.2。 | 
| table-name | 是 | 无 | String | HBase 表名。 | 
| zookeeper.quorum | 是 | 无 | String | HBase 的 zookeeper 地址。 | 
| zookeeper.znode.parent | 是 | /hbase | String | HBase 在 zookeeper 中的根目录,请根据实际情况进行配置。例如: /hbase/{集群id}。 | 
| null-string-literal | 否 | null | String | HBase 字段类型为 String 时,遇到空值则将该字段赋值为当前参数值。 | 
HBase 结果表 WITH 参数
| 参数值 | 是否必填 | 默认值 | 数据类型 | 描述 | 
|---|---|---|---|---|
| connector | 是 | 无 | String | 连接器,目前支持 hbase-2.2。 | 
| table-name | 是 | 无 | String | HBase 表名。 | 
| zookeeper.quorum | 是 | 无 | String | HBase 的 zookeeper 地址。 | 
| zookeeper.znode.parent | 是 | /hbase | String | HBase 在 zookeeper 中的根目录,请根据实际情况进行配置。例如: /hbase/{集群id}。 | 
| null-string-literal | 否 | null | String | HBase 字段类型为 String 时,遇到空值则将该字段赋值为当前参数值。 | 
| sink.buffer-flush.max-size | 否 | 2mb | MemorySize | 每次写入请求在内存中缓存的数据量。调大该值有利于提高 HBase 写入性能,但会增加写入延迟。设置为 0禁用。 | 
| sink.buffer-flush.max-rows | 否 | 1000 | Integer | 每次写入请求在内存中缓存的数据行数。调大该值有利于提高 HBase 写入性能,但会增加写入延迟。设置为 0禁用。 | 
| sink.buffer-flush.interval | 否 | 1s | Duration | 周期性 flush 缓存数据到 HBase 的时间间隔。可以提高 HBase 写入性能,但会增加写入延迟。设置为 0’ 禁用。 | 
| sink.parallelism | 否 | 无 | Integer | HBase sink 算子的并行度,默认情况下,框架使用上游算 子相同的平行度。 | 
类型映射
HBase 将所有数据存储为字节数组。
| Flink 字段类型 | HBase 字段类型 | 
|---|---|
| CHAR / VARCHAR / STRING | byte[] toBytes(String s) String toString(byte[] b) | 
| BOOLEAN | byte[] toBytes(boolean b) boolean toBoolean(byte[] b) | 
| BINARY / VARBINARY | byte[] 按原样返回。 | 
| DECIMAL | byte[] toBytes(BigDecimal v) BigDecimal toBigDecimal(byte[] b) | 
| TINYINT | new byte[] { val } bytes[0] // returns first and only byte from bytes | 
| SMALLINT | byte[] toBytes(short val) short toShort(byte[] bytes) | 
| INT | byte[] toBytes(int val) int toInt(byte[] bytes) | 
| BIGINT | byte[] toBytes(long val) long toLong(byte[] bytes) | 
| FLOAT | byte[] toBytes(float val) float toFloat(byte[] bytes) | 
| DOUBLE | byte[] toBytes(double val) double toDouble(byte[] bytes) | 
| DATE | 将日期转换成自 1970.01.01 以来的天数,用 int 表示。 | 
| TIME | 将时间转换成自 00:00:00 以来的毫秒数,用 int 表示。 | 
| TIMESTAMP | 将时间戳转换成自 1970-01-01 00:00:00 以来的毫秒数,用 long 表示。 | 
| ARRAY | 不支持 | 
| MAP / ULTISET | 不支持 | 
| ROW | 不支持 | 
代码示例
-- 使用 datagen 连接器生成随机数据
CREATE TEMPORARY TABLE datagen_source (
  rowkey INT,
  f1q1 INT,
  f2q1 STRING,
  f2q2 BIGINT,
  f3q1 DOUBLE,
  f3q2 BOOLEAN,
  f3q3 STRING
) with (
  'connector'='datagen'
);
-- 在 Flink SQL 里注册 HBase 表 'demo'
CREATE TEMPORARY TABLE hbase_sink (
  rowkey INT,
  family1 ROW<q1 INT>,
  family2 ROW<q1 STRING, q2 BIGINT>,
  family3 ROW<q1 DOUBLE, q2 BOOLEAN, q3 STRING>,
  PRIMARY KEY (rowkey) NOT ENFORCE
) with (
  'connector'='hbase-2.2',
  'table-name'='demo',
  'zookeeper.quorum'='localhost:2181'
  'zookeeper.znode.parent' = '/hbase/cl-8wry3tmz'
);
-- 从 datagen_source 中读取数据并写入 hbase_sink 中
INSERT INTO hbase_sink
SELECT rowkey, ROW(f1q1), ROW(f2q1, f2q2), ROW(f3q1, f3q2, f3q3) FROM datagen_source;