Kafka SQL Connector的介绍

  • 概述
    Kafka SQL Connector 分为 Kafka SQL Connector 和 Upsert Kafka SQL Connector

  • 功能
    Upsert Kafka Connector支持以 upsert 方式从 Kafka topic 中读写数据
    Kafka Connector支持从 Kafka topic 中读写数据

  • 区别
    a)建表语句的主键
    i)Kafka Connector 要求表不能有主键,如果设置了主键,报错信息如下

    Caused by: org.apache.flink.table.api.ValidationException: The Kafka table 'default_catalog.default_database.normal_sink_topic' with 'json' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.
    

    ii)而 Upsert Kafka Connector 要求表必须有主键,如果没有设置主键,报错信息如下

    Caused by: org.apache.flink.table.api.ValidationException: 'upsert-kafka' tables require to define a PRIMARY KEY constraint. The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys.
    

    iii)语法: primary key(id) not enforced
    注意:not enforced 表示不对来往数据做约束校验,Flink 并不是数据的主人,因此只支持 not enforced 模式
    如果没有 not enforced,报错信息如下

    Exception in thread "main" org.apache.flink.table.api.ValidationException: Flink doesn't support ENFORCED mode for PRIMARY KEY constaint. ENFORCED/NOT ENFORCED  controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only supported mode is the NOT ENFORCED mode
    

    b)对表中数据操作类型的要求
    i)Kafka Connector 不能消费带有 Upsert/Delete 操作类型数据的表,如 left join 生成的动态表。如果对这类表进行消费,报错信息如下

    Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.normal_sink_topic' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_9]], fields=[l_id, tag_left, tag_right])
    

    ii)Upsert Kafka Connector 将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,因此同一主键的更新/删除消息将落在同一分区,从而保证同一主键的消息有序。

  • 参数配置
    Kafka Connector的参数包括:
    connector:指定使用的连接器,对于Kafka,使用’kafka’。
    topic:主题。
    properties.bootstrap.servers:以逗号分隔的Kafka broker列表。
    properties.group.id:消费者组ID。
    format:指定Kafka消息中value部分的序列化和反序列化方式。
    scan.startup.mode:Kafka消费者启动模式,有四种取值:‘earliest-offset’,‘latest-offset’,‘group-offsets’和’timestamp’。
    properties.*:可以通过properties. 的方式指定配置项,的位置用Kafka官方规定的配置项的key替代。并不是所有的配置都可以通过这种方式配置,因为Flink可能会将它们覆盖,如:key.deserializer和value.deserializer。
    scan.startup.specific-offsets:当scan.startup.mode为specific-offsets时,可以使用此参数为每个分区提供偏移量。
    Upsert Kafka Connector的参数包括:
    connector:指定使用的连接器,对于Upsert Kafka,使用’upsert-kafka’。
    topic:主题。
    properties.bootstrap.servers:以逗号分隔的Kafka broker列表。
    key.format:key的序列化和反序列化格式。
    value.format:value的序列化和反序列化格式。
    key.fields:指定哪些字段应该写入Kafka消息的key中。
    value.fields-include:指定哪些字段应该写入Kafka消息的value中。

请使用浏览器的分享功能分享到微信等