如何使用 Kafka Connector 操作 Kafka

我们向kafka发送数据,一般是使用 Kafka Client。我们如果使用 java 程序来发送 kafka 消息,这种做法基本是没有问题的。但是如果我们有其他系统也要集成 kafka,但是不一定用的是 java 语言,比如 ABAP、C++、C 语言、Python、Go等等,那怎么办呢?这里我们将介绍一种通用的方法,即:通过 HTTP 调用的方式向 Kafka 发送数据和读取数据。

Confluent 简介

      Confluent是一家专注于Apache Kafka的企业公司,提供了一整套的Kafka相关工具和服务。

      Confluent提供了企业版Kafka以及一些相关的工具和服务,例如Kafka Connect,Kafka Streams,Schema Registry等。这些工具和服务可以帮助企业更方便地构建、管理和监控Kafka集群,并且更容易地将Kafka与其他数据存储和处理系统集成。

      这里我们主要用到其中一种组件,即:Confluent Kafka Connector。其分为商业版本和开源版本,开源版本就可以满足我们一般场景下的需求。

Confluent Kafka Connector

      Confluent Kafka Connector 是一种插件,可将 Kafka 与其他数据存储和处理系统集成,例如数据库、文件系统、搜索引擎等。它允许我们将数据从 Kafka 流式传输到这些系统中,或者从这些系统中读取数据并将其发送到 Kafka。Confluent Kafka Connector 还提供了许多现成的连接器,可帮助我们快速地将Kafka与其他系统集成。

接下来让我们通过实际操作来揭秘如何使用它吧。

① 环境准备

      这里我们只需要配置好 jdk 即可,这里我们推荐的 jdk 版本为 jdk1.8 或者 jdk11。配置的过程这里就不做过多赘述了,还不知道的小伙伴可以去各大搜索引擎寻找答案。

② Confluent 安装

配置好 jdk 之后,就到了我们最重要的安装环节了。

下载 Confluent

      大家根据需要选择适合自己的版本,我这里用的是:5.2.4-2.11。5.2.4 表示 Confluent 平台的主版本号,2.11 表示 Confluent 平台所依赖的 Apache Kafka 版本号。

    浏览器访问这个链接地址就可以进入官网下载地址了:https://www.confluent.io/previous-versions/。选择如下图所示的进行下载就可以:

解压

我们将下载后的文件解压至 /soft 目录下,可以根据自己情况指定你自己的目录。

tar -zxvf confluent-5.2.4-2.11.tar.gz -C /soft

配置文件

我们需要修改几个配置文件,配置文件目录在:/soft/confluent-5.2.4/etc/

  • • connect-avro-distributed.properties

      用于在分布式模式下使用 Avro 格式来进行序列化和反序列化。Avro 是一种数据序列化格式,它提供了一种紧凑、快速、通用的数据交换格式,适用于各种编程语言和平台。这里我们主要配置 kafka 的地址。

bootstrap.servers=ip1:9092,ip2:9092,ip3:9092
集群模式下group.id必须是唯一的, 3台机器保持一致
group.id=connect-cluster-1
  • • schema-registry.properties

      Confluent Schema Registry 是一种用于管理和存储 Avro 模式的服务,用于在 Apache Kafka 之间传输数据。Schema Registry 服务允许我们注册、存储和检索 Avro 模式,以便在数据传输期间对数据进行序列化和反序列化。

Schema Registry 服务ip
host.name=127.0.0.1
Schema Registry 服务监听的地址和端口号
listeners=http://0.0.0.0:8081
kafka 元数据 zookeeper 地址
kafkastore.connection.url=ip1:2181,ip2:2181,ip3:2181
kafka 服务地址
kafkastore.bootstrap.servers=PLAINTEXT://ip1:9092,PLAINTEXT://ip2:9092,PLAINTEXT://ip3:9092

      集群模式下所有机器都必须配置,不一样的是host.name 要设置成 Schema Registry 服务所在的ip

  • • kafka-rest.properties

      Confluent Kafka REST 服务的配置文件。通过使用 Kafka REST 服务,我们可以使用 REST API 对 Apache Kafka 进行读取和写入操作。

集群唯一标识,每台机器需保持唯一
id=1001
Kafka Rest 服务ip
host.name=127.0.0.1
schema-registry 服务ip
schema.registry.url=http://ip1:8081,http://ip2:8081,http://ip3:8081
zookeeper 地址
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181
kafka 服务地址
bootstrap.servers=PLAINTEXT://ip1:9092,PLAINTEXT://ip2:9092,PLAINTEXT://ip3:9092
指定发送的最大字节数
max.request.size=10485760
发送消息的批次大小
batch.size=32768
批次间隔
linger.ms=1

③ 启动

启动脚本如下:

1. 启动 Schema-registry-start
bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties

2. 启动 Connector
nohup bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties >>logs/connect-avro-distributed.log 2>&1 &

3. 启动 kafka-rest
nohup bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties >>logs/rest.log 2>&1 &

④ 负载均衡配置

      为了使我们的服务更加稳定,集群模式下我们通过 nginx 来做负载均衡。现在我们来看以下如何安装 nginx 以及如何配置负载均衡。

nginx 安装

防火墙配置

为了方便起见,我们直接关闭防火墙,生产环境中可以开启 nginx 端口

查看防火墙状态
systemctl status firewalld.service
关闭防火墙
systemctl stop firewalld.service
禁止防火墙自启动
systemctl disable firewalld.service

安装4个插件

yum install gcc-c++
yum install -y pcre pcre-devel
yum install -y zlib zlib-devel
yum install -y openssl openssl-devel
  • • gcc-c++

      gcc 可以编译 C,C++,Ada,Object C和Java等语言(安装nginx需要先将官网下载的源码进行编译,编译依赖 gcc 环境)

  • • pcre pcre-devel

      pcre是一个perl库,包括perl兼容的正则表达式库,nginx的http模块使用pcre来解析正则表达式,所以需要安装pcre库

  • • zlib zlib-devel

      zlib库提供了很多种压缩和解压缩方式nginx使用zlib对http包的内容进行gzip

  • • openssl openssl-devel

      OpenSSL 是一个强大的安全套接字层密码库,囊括主要的密码算法、常用的密钥和证书封装管理功能及 SSL 协议,并提供丰富的应用程序供测试或其它目的使用。nginx 不仅支持 http 协议,还支持 https(即在ssl协议上传输http),所以需要在 Centos 安装 OpenSSL 库

安装 nginx

下载nginx
wget https://nginx.org/download/nginx-1.21.6.tar.gz
解压
tar xvf nginx-1.21.6.tar.gz
配置
cd nginx-1.21.6
./configure --prefix=/usr/local/nginx --sbin-path=/usr/local/nginx/sbin/nginx --conf-path=/usr/local/nginx/conf/nginx.conf --error-log-path=/var/log/nginx/error.log --http-log-path=/var/log/nginx/access.log  --pid-path=/var/run/nginx/nginx.pid --lock-path=/var/lock/nginx.lock  --with-http_ssl_module --with-http_stub_status_module --with-http_gzip_static_module --http-client-body-temp-path=/var/tmp/nginx/client/ --http-proxy-temp-path=/var/tmp/nginx/proxy/ --http-fastcgi-temp-path=/var/tmp/nginx/fcgi/ --http-uwsgi-temp-path=/var/tmp/nginx/uwsgi --http-scgi-temp-path=/var/tmp/nginx/scgi --with-pcre --with-stream
编译安装
make && make install
查看安装路径
whereis nginx

设置 nginx 开机自启

vim /lib/systemd/system/nginx.service

[Unit]
Description=nginx service
After=network.target
 
[Service]
Type=forking
ExecStart=/usr/local/nginx/sbin/nginx
ExecReload=/usr/local/nginx/sbin/nginx -s reload
ExecStop=/usr/local/nginx/sbin/nginx -s stop
PrivateTmp=true
[Install]
WantedBy=multi-user.target
  • • Description:服务描述

  • • After:服务类别

  • • Type=forking:后台运行

  • • ExecStart:启动命令

  • • ExecReload:重启命令

  • • ExecStop:停止命令

  • • PrivateTmp=true:给服务分配独立的临时空间

  • • WantedBy:运行级别下服务安装的相关设置,设置为多用户

加入开机自启

systemctl enable nginx.service

负载均衡配置

cd /usr/local/nginx/ & vim conf/nginx.conf

#user  nobody;
worker_processes  10;

#
error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#
pid        logs/nginx.pid;

events {
    worker_connections  4096;
}

stream {
    upstream backend {
        server ip1:8082;
        server ip2:8082;
    }

    log_format proxy '$remote_addr [$time_local] '
                     '$protocol $status $bytes_sent $bytes_received '
                     '$session_time "$upstream_addr" ';

    server {
        listen 8082;
        proxy_connect_timeout 5s;
        proxy_timeout 10s;

        proxy_pass backend;
        access_log /usr/local/nginx/logs/tcp_access.log proxy;
        error_log /usr/local/nginx/logs/tcp_error.log;
    }
}

最后再重新加载配置文件

nginx -s load

⑤ 使用示例

      以下列出了常用的一些 rest api,可以很方便的对 kafka 进行各种操作。当然了,你也可以通过 HTTP 去调用相关的接口。

# 获取topic列表
curl "http://localhost:8082/topics"
# 获取topic详情
curl "http://localhost:8082/topics/jsontest"
# 发送一条消息
curl -X POST \
     -H "Content-Type: application/vnd.kafka.json.v2+json" \
     --data '{"records":[{"value":{"name": "testUser"}}]}' \
     "http://nginx_ip:nginx_port/topics/jsontest"
 # 创建一个消费者,从头消费
 curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" -H "Accept: application/vnd.kafka.v2+json" \
    --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
    http://localhost:8082/consumers/my_json_consumer
# 订阅topic
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \
    http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
# 消费一些数据
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
    http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
# 删除topic
curl -X DELETE -H "Accept: application/vnd.kafka.v2+json" \
          http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance


往期推荐

本地提交Flink任务到远程 Yarn 集群

数据质量系统设计与实现

Flink CDC零代码实现数据同步实践

kafka 数据快速接入 Apache Doris

Flink CDC数据接入Apache Doris实践

基于Flink和Doris构建的实时数仓方案实践

Flink自定义触发器

EasyRules规则引擎工具类

使用Doris Flink Connector进行数据接入

使用docker编译Doris

Spark 使用bulk load导入数据到Hbase中


请使用浏览器的分享功能分享到微信等