-
概述
Kafka SQL Connector 分为 Kafka SQL Connector 和 Upsert Kafka SQL Connector -
功能
Upsert Kafka Connector支持以 upsert 方式从 Kafka topic 中读写数据
Kafka Connector支持从 Kafka topic 中读写数据 -
区别
a)建表语句的主键
i)Kafka Connector 要求表不能有主键,如果设置了主键,报错信息如下Caused by: org.apache.flink.table.api.ValidationException: The Kafka table 'default_catalog.default_database.normal_sink_topic' with 'json' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.ii)而 Upsert Kafka Connector 要求表必须有主键,如果没有设置主键,报错信息如下
Caused by: org.apache.flink.table.api.ValidationException: 'upsert-kafka' tables require to define a PRIMARY KEY constraint. The PRIMARY KEY specifies which columns should be read from or write to the Kafka message key. The PRIMARY KEY also defines records in the 'upsert-kafka' table should update or delete on which keys.iii)语法: primary key(id) not enforced
注意:not enforced 表示不对来往数据做约束校验,Flink 并不是数据的主人,因此只支持 not enforced 模式
如果没有 not enforced,报错信息如下Exception in thread "main" org.apache.flink.table.api.ValidationException: Flink doesn't support ENFORCED mode for PRIMARY KEY constaint. ENFORCED/NOT ENFORCED controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only supported mode is the NOT ENFORCED modeb)对表中数据操作类型的要求
i)Kafka Connector 不能消费带有 Upsert/Delete 操作类型数据的表,如 left join 生成的动态表。如果对这类表进行消费,报错信息如下Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.normal_sink_topic' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_9]], fields=[l_id, tag_left, tag_right])ii)Upsert Kafka Connector 将 INSERT/UPDATE_AFTER 数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。Flink 将根据主键列的值对数据进行分区,因此同一主键的更新/删除消息将落在同一分区,从而保证同一主键的消息有序。
-
参数配置
Kafka Connector的参数包括:
connector:指定使用的连接器,对于Kafka,使用’kafka’。
topic:主题。
properties.bootstrap.servers:以逗号分隔的Kafka broker列表。
properties.group.id:消费者组ID。
format:指定Kafka消息中value部分的序列化和反序列化方式。
scan.startup.mode:Kafka消费者启动模式,有四种取值:‘earliest-offset’,‘latest-offset’,‘group-offsets’和’timestamp’。
properties.*:可以通过properties. 的方式指定配置项,的位置用Kafka官方规定的配置项的key替代。并不是所有的配置都可以通过这种方式配置,因为Flink可能会将它们覆盖,如:key.deserializer和value.deserializer。
scan.startup.specific-offsets:当scan.startup.mode为specific-offsets时,可以使用此参数为每个分区提供偏移量。
Upsert Kafka Connector的参数包括:
connector:指定使用的连接器,对于Upsert Kafka,使用’upsert-kafka’。
topic:主题。
properties.bootstrap.servers:以逗号分隔的Kafka broker列表。
key.format:key的序列化和反序列化格式。
value.format:value的序列化和反序列化格式。
key.fields:指定哪些字段应该写入Kafka消息的key中。
value.fields-include:指定哪些字段应该写入Kafka消息的value中。