数据库 PostgreSQL CDC
版本说明
- Flink版本:当前仅支持 Flink 1.12 版本。
- PostgreSQL 版本:
- Database: 9.6, 10, 11, 12
- JDBC Driver: 42.2.12
 
使用范围
PostgreSQL CDC 连接器支持对 PostgreSQL 数据库的全量和增量读取。
会先读取数据库快照,然后继续读取 binlog,即使发生故障,也可以保证 exactly-once。
PostgreSQL CDC 源不能并行读取,因为只有一个任务可以接收 binlog 事件。
DDL 定义
-- 在 Flink SQL 里注册 PostgreSQL 表 'shipments'
CREATE TABLE shipments (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'localhost',
  'port' = '5432',
  'username' = 'postgres',
  'password' = 'postgres',
  'database-name' = 'postgres',
  'schema-name' = 'public',
  'table-name' = 'shipments'
);  
PostgreSQL CDC 源表 WITH 参数
| 参数值 | 是否必填 | 默认值 | 数据类型 | 描述 | 
|---|---|---|---|---|
| connector | 是 | 无 | String | 连接器,固定值为 jdbc。 | 
| hostname | 是 | 无 | String | PostgreSQL 数据库 IP 地址。 | 
| username | 是 | 无 | String | PostgreSQL 数据库用户名。 | 
| password | 是 | 无 | String | PostgreSQL 数据库密码。 | 
| database-name | 是 | 无 | String | PostgreSQL 数据库名称。 | 
| schema-name | 是 | 无 | String | PostgreSQL 数据库 schema 名称。 | 
| table-name | 是 | 无 | String | PostgreSQL 表名。 | 
| port | 否 | 5432 | Integer | PostgreSQL 数据库端口号。 | 
| decoding.plugin.name | 否 | decoderbufs | String | PostgresSQL 逻辑解码插件名,可选值:decoderbufs、wal2json、wal2json_rds、wal2json_streaming、wal2json_rds_streaming 和 pgoutput。 | 
| slot.name | 否 | flink | String | PostgreSQL Replication Slot 名称,必须符合 PostgreSQL Replication Slot 命名规则。 | 
| debezium.* | 否 | 无 | String | 传递 Debezium 的属性,如: 'debezium.snapshot.mode' = 'never'。更多信息请查看 Debezium 的 PostgreSQL 连接器属性。 | 
说明
slot.name 建议针对不同的表设置不同的值,以避免类似的错误:PSQLException: ERROR: replication slot “flink” is active for PID 974。
类型映射
| PostgreSQL CDC 字段类型 | Flink SQL 字段类型 | 
|---|---|
| SMALLINT INT2 SMALLSERIAL SERIAL2 | SMALLINT | 
| INTEGER SERIAL | INT | 
| BIGINT BIGSERIAL | BIGINT | 
| BIGINT | BIGINT | 
| REAL FLOAT4 | FLOAT | 
| FLOAT8 DOUBLE PRECISION | DOUBLE | 
| NUMERIC(p, s) DECIMAL(p, s) | DECIMAL(p, s) | 
| BOOLEAN | BOOLEAN | 
| DATE | DATE | 
| TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] | 
| TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] | 
| CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT | STRING | 
| BYTEA | BYTES | 
| ARRAY | ARRAY | 
代码示例
-- 用 Flink SQL 注册 PostgreSQL 表 students,使用 CDC 机制流式获取 changelog
CREATE TABLE postgresql_cdc_source (
    id INT,
    name STRING,
    score INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'postgres-cdc',
    'hostname' = 'localhost',
    'port' = '5432',
    'username' = '$username',
    'password' = '$password',
    'database-name' = 'postgres',
    'schema-name' = 'public',
    'table-name' = 'students'
)
 
-- 用 Flink SQL 注册 ElasticSearch index stu
CREATE TABLE elasticsearch_sink (
    id INT,
    name STRING,
    score INT,
    PRIMARY KEY (id) NOT ENFORCED              --定义主键则根据主键 upsert,否则是 append 模式
) WITH (
    'connector' = 'elasticsearch-7',           --输出到 ElasticSearch7
    'hosts' = 'http://192.168.100.19:9200',    --ElasticSearch 连接地址
    'index' = 'stu',                           --ElasticSearch 的 Index 名
    'sink.flush-on-checkpoint' = 'true',       --checkpoint 时批量写入
    'sink.bulk-flush.max-actions' = '50',      --每批次最多的操作数
    'sink.bulk-flush.max-size' = '10mb',       --每批次累计最大大小
    'sink.bulk-flush.interval' = '1000ms',     --批量写入的间隔
    'connection.max-retry-timeout' = '1000ms', --每次请求的最大超时时间
    'format' = 'json'                          --输出数据格式,目前只支持 'json'
);
 
-- 将 PostgreSQL 表 students 的 upsert 流实时同步到 ElasticSearch 的 stu index 里
INSERT INTO elasticsearch_sink SELECT id, name, score FROM postgresql_cdc_source;