上下游开发指南

 

数据分析引擎 ElasticSearch

更新时间 2023-09-06

版本说明

当前仅支持 Flink 1.12 版本。

使用范围

ElasticSearch 仅支持作为 Sink,写入数据到 ElasticSearch 的 index 中。

DDL 定义

CREATE TABLE es_table(
    id INT,
    name STRING,
    score INT,
    PRIMARY KEY (id) NOT ENFORCED --定义主键则根据主键 upsert,否则是 append 模式
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'http://localhost:9200'
    'index' = 'student',
    'sink.flush-on-checkpoint' = 'true',
    'sink.bulk-flush.max-actions' = '50',
    'sink.bulk-flush.max-size' =. '10mb',
    'sink.bulk-flush.interval' = '1000ms',
    'connection.max-retry-timeout' = '1000',
    'format' = 'json'
)

ES 结果表参数

参数值 是否必填 默认值 数据类型 描述
connector String 连接器,目前支持 elasticesearch-7,连接 Elasticsearch 7.x 及更高版本的集群。
hosts String ElasticSearch 连接地址,如:http://host_name:9092;http://host_name:9093
index String ElasticSearch 索引名称,支持固定 Index(如:myIndex),也支持动态 Index(如:‘index-{log_ts|yyyy-MM-dd}’ 或 ‘{field_name}’ )。详细请参考动态索引
document-id.key-delimiter _ String 复合键的分隔符(默认为 “_")。例如有 a、b、c 三个主键,某条数据的 a 字段为 “1”,b 字段为 “2”,c 字段为 “3”,使用默认分隔符,则最终写入 Elasticsearch 的 _id 是 “1_2_3”。
username String 连接 ElasticSearch 集群的用户名。
password String 连接 ElasticSearch 集群的密码。如果定义了 username,则必须定义非空的 password。
failure-handler fall String ElasticSearch 请求失败时的故障处理策略。
  • fall:如果请求失败,则作业失败。
  • ignore:忽略失败并丢弃请求。
  • retry-rejected:重新添加由于队列容量满而失败的请求。
  • custom class name:使用 ActionRequestFailureHandler 子类进行故障处理(需要上传自定义程序包,并在作业的依赖资源处引用)。
  • sink.flush-on-checkpoint true Boolean 进行 checkpoint 时,是否等待现有记录完全写入 ElasticSearch 。如果设置为 false,则可能导致恢复时部分数据丢失或者重复。
    sink.bulk-flush.max-actions 1000 Integer 每个批量请求的最大缓冲操作数,设置为 0 禁用。
    sink.bulk-flush.max-size 2mb MemorySize 每个批量请求缓冲区内存大小,必须以 mb 为单位。设置为 ‘0’ 禁用。
    sink.bulk-flush.interval 1s Duration 周期性批量写入缓存数据到 ElasticSearch 的时间间隔。设置为 ‘0’ 禁用。
    sink.bulk-flush.backoff.strategy DISABLED String 批量写入操作失败时的重试策略。
  • DISABLED:不重试。
  • CONSTANT:常量等待时间重试,等待 sink.bulk-flush.backoff.delay 选项设置的时间后重试。
  • EXPONENTIAL:指数等待时间重试,首次失败等待 sink.bulk-flush.backoff.delay 选项设置的时间后重试,此后每次失败将指数增加下次的等待时间。
  • sink.bulk-flush.backoff.max-retries 8 Integer 批量写入时,最大重试次数。
    sink.bulk-flush.backoff.delay 50ms Duration 批量写入失败时,每次重试之间的等待间隔。
    connection.max-retry-timeout Duration 重试的最大超时时间。
    connection.path-prefix String 每个 REST 请求的前缀,如:’/v1’。
    format json String 输出格式。目前仅支持 json。详细信息, 请参考JSON 格式页面

    注意

    ElasticSearch sink 支持 upsert 模式或 append 模式。如果在 DDL 上定义了主键,则 sink 以 upsert 模式与外部系统交换 UPDATE/DELETE 消息,否则,它以 append 模式运行,仅支持 INSERT 消息。

    代码示例

    示例一

    -- 利用 datagen 随机生成数据源数据
    CREATE TABLE datagen_source (
        name STRING,
        age INT
    ) WITH (
        'connector' = 'datagen'
    )
     
    -- 用 Flink SQL 注册 ElasticSearch index my-index  
    CREATE TABLE elasticsearch7_sink (
        name STRING,
        age INT
    ) WITH (
        'connector' = 'elasticsearch-7',         -- 输出到 ElasticSearch7
        'username' = '$username',                -- ElasticSearch 用户名
        'password' = '$password',                -- ElasticSearch 密码
        'hosts' = 'http://192.168.100.19:9200',  -- ElasticSearch 连接地址
        'index' = 'my-index',                    -- ElasticSearch 的 Index 名
        'format' = 'json'                        -- 输出数据格式,目前只支持 'json'
    );
     
    INSERT INTO elasticsearch7_sink select * from datagen_source;
    

    示例二

    -- 用 Flink SQL 注册 MySQL 表 students,使用 CDC 机制流式获取 changelog
    CREATE TABLE students(
        id INT,
        name STRING,
        score INT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'localhost',
        'port' = '3306',
        'username' = '$username',
        'password' = '$password',
        'database-name' = 'detail',
        'table-name' = 'students'
    )
     
    -- 用 Flink SQL 注册 ElasticSearch index stu
    CREATE TABLE es_stu(
        id INT,
        name STRING,
        score INT,
        PRIMARY KEY (id) NOT ENFORCED --定义主键则根据主键 upsert,否则是 append 模式
    ) WITH(
        'connector' = 'elasticsearch-7',           --输出到 ElasticSearch7
        'hosts' = 'http://192.168.100.19:9200',    --ElasticSearch 连接地址
        'index' = 'stu',                           --ElasticSearch 的 Index 名
        'sink.flush-on-checkpoint' = 'true',       --checkpoint 时批量写入
        'sink.bulk-flush.max-actions' = '50',      --每批次最多的操作数
        'sink.bulk-flush.max-size' = '10mb',       --每批次累计最大大小
        'sink.bulk-flush.interval' = '1000ms',     --批量写入的间隔
        'connection.max-retry-timeout' = '1000ms', --每次请求的最大超时时间
        'format' = 'json'                          --输出数据格式,目前只支持 'json'
    );
     
    -- 将 MySQL 表 students 的 upsert 流实时同步到 ElasticSearch 的 stu index 里
    INSERT INTO es_stu SELECT id,name,score FROM students;
    
    这篇文档解决了您的问题吗?
    0
    0