上下游开发指南

 

数据库 PostgreSQL

更新时间 2023-09-06

版本说明

当前仅支持 Flink 1.12 版本。

使用范围

使用 JDBC 驱动程序从 PostgreSQL 中读取数据或将数据写入其中。

支持作为数据源、维表、数据目的。

DDL 定义

CREATE TABLE pg_table (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:postgresql://localhost:5433/demo',
   'table-name' = 'users',
   'username' = 'root',
   'password' = '123456'
);

PostgreSQL 源表 WITH 参数

参数值 是否必填 默认值 数据类型 描述
connector String 连接器,固定值为 jdbc
url String PostgreSQL 数据库 JDBC URL。
table-name String PostgreSQL 表名。
driver String JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。
username String PostgreSQL 数据库用户名。
password String PostgreSQL 数据库密码。
scan.partition.column Integer 指定对输入数据进行分区扫描的列名。该列必须是数值类型、日期类型或时间戳类型。
scan.partition.num Integer 分区扫描启用后,指定分区数。
scan.partition.lower-bound Integer 分区扫描启用后,指定第一个分区的最小值。
scan.partition.upper-bound Integer 分区扫描启用后,指定最后一个分区的最大值。
scan.fetch-size 0 Integer 每次从数据库读取时,批量获取的行数。
scan.auto-commit true Boolean 自动提交标志,决定每个语句是否在事务中自动提交。

说明

  1. scan.partition.lower-boundscan.partition.upper-bound 仅用于决定分区步长,而不是用于过滤表中的行。所以表中的所有行都会被分区并返回。
  2. 分区扫描功能可以加速读取数据,每个子任务可以读取自己的分区。使用该功能时,四个 scan.partition 开头的参数都必须指定。

PostgreSQL 维表 WITH 参数

参数值 是否必填 默认值 数据类型 描述
connector String 连接器,固定值为 jdbc
url String PostgreSQL 数据库 JDBC URL。
table-name String PostgreSQL 表名。
driver String JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。
username String PostgreSQL 数据库用户名。
password String PostgreSQL 数据库密码。
lookup.cache.max-rows Integer Lookup 缓存中最多缓存的数据行数。超过此值,最旧的行将过期。默认情况下禁用查找缓存。
lookup.cache.ttl Duration Lookup 缓存中每条记录最长的缓存时间。超过此时间,最旧的行将过期。默认情况下禁用 Lookup 缓存。
lookup.max-retries 3 Integer 数据库查询失败时,最多重试的次数。

说明

  1. Lookup 缓存提升维表读取性能,目前仅支持同步读取模式。
  2. 默认情况下,Lookup 缓存未启用,所有请求需要请求数据库。
  3. 通过设置 lookup.cache.max-rows 和 lookup.cache.ttl 可以启用 Lookup 缓存,这时每个进程(即 TaskManager)都会持有一份缓存。Flink 会先查找缓存,缓存未命中时会向数据库发送请求,并根据返回的值更新缓存。

PostgreSQL 结果表 WITH 参数

参数值 必填 默认值 数据类型 描述
connector String 连接器,固定值为 jdbc
url String PostgreSQL 数据库 JDBC URL。
table-name String PostgreSQL 表名。
driver String JDBC 驱动程序的类名,如果未设置,将自动从 url 获取。
username String PostgreSQL 数据库用户名。
password String PostgreSQL 数据库密码。
sink.buffer-flush.max-rows 100 Integer 批量输出时,最多缓存的数据行数。设置为 0 表示禁用输出缓存。
sink.buffer-flush.interval 1s Duration 每隔多久异步线程自动批量输出数据。设置为 0 表示禁用自动异步输出。
sink.max-retries 3 Integer 数据库写入失败时,最多重试的次数。

说明

  • 如果在 DDL 上定义了主键,则 sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,不支持消费 UPDATE/DELETE 消息。
  • 对于 PostgreSQL 表,Upsert 功能的实现采用 INSERT .. ON CONFLICT .. DO UPDATE SET .. 语法。

类型映射

PostgreSQL 字段类型 Flink SQL 字段类型
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INTEGER
SERIAL
INT
BIGINT
BIGSERIAL
BIGINT
BIGINT BIGINT
REAL
FLOAT4
FLOAT
FLOAT8
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN BOOLEAN
DATE DATE
TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
STRING
BYTEA BYTES
ARRAY ARRAY

代码示例

-- 利用 datagen 随机生成数据源数据
CREATE TEMPORARY TABLE datagen_source(
    name STRING,
    age INT
) WITH (
    'connector' = 'datagen'
)
 
-- 用 Flink SQL 注册 PostgreSQL 表 students
CREATE TEMPORARY TABLE pg_sink(
    name VARCHAR,
    age INT
) WITH (
    'connector' = 'jdbc'
    'url' = 'jdbc://postgresql://localhost:5433/mydb',
    'table-name' = 'students',
    'username' = 'root',
    'password' = '123456'
)
 
INSERT INTO pg_sink
SELECT * FROM datagen_source;
这篇文档解决了您的问题吗?
0
0