我们向kafka发送数据,一般是使用 Kafka Client。我们如果使用 java 程序来发送 kafka 消息,这种做法基本是没有问题的。但是如果我们有其他系统也要集成 kafka,但是不一定用的是 java 语言,比如 ABAP、C++、C 语言、Python、Go等等,那怎么办呢?这里我们将介绍一种通用的方法,即:通过 HTTP 调用的方式向 Kafka 发送数据和读取数据。
Confluent 简介
Confluent是一家专注于Apache Kafka的企业公司,提供了一整套的Kafka相关工具和服务。
Confluent提供了企业版Kafka以及一些相关的工具和服务,例如Kafka Connect,Kafka Streams,Schema Registry等。这些工具和服务可以帮助企业更方便地构建、管理和监控Kafka集群,并且更容易地将Kafka与其他数据存储和处理系统集成。
这里我们主要用到其中一种组件,即:Confluent Kafka Connector。其分为商业版本和开源版本,开源版本就可以满足我们一般场景下的需求。
Confluent Kafka Connector
Confluent Kafka Connector 是一种插件,可将 Kafka 与其他数据存储和处理系统集成,例如数据库、文件系统、搜索引擎等。它允许我们将数据从 Kafka 流式传输到这些系统中,或者从这些系统中读取数据并将其发送到 Kafka。Confluent Kafka Connector 还提供了许多现成的连接器,可帮助我们快速地将Kafka与其他系统集成。
接下来让我们通过实际操作来揭秘如何使用它吧。
① 环境准备
这里我们只需要配置好 jdk 即可,这里我们推荐的 jdk 版本为 jdk1.8 或者 jdk11。配置的过程这里就不做过多赘述了,还不知道的小伙伴可以去各大搜索引擎寻找答案。
② Confluent 安装
配置好 jdk 之后,就到了我们最重要的安装环节了。
下载 Confluent
大家根据需要选择适合自己的版本,我这里用的是:5.2.4-2.11。5.2.4 表示 Confluent 平台的主版本号,2.11 表示 Confluent 平台所依赖的 Apache Kafka 版本号。
浏览器访问这个链接地址就可以进入官网下载地址了:https://www.confluent.io/previous-versions/。选择如下图所示的进行下载就可以:
解压
我们将下载后的文件解压至 /soft 目录下,可以根据自己情况指定你自己的目录。
tar -zxvf confluent-5.2.4-2.11.tar.gz -C /soft
配置文件
我们需要修改几个配置文件,配置文件目录在:/soft/confluent-5.2.4/etc/
• connect-avro-distributed.properties
用于在分布式模式下使用 Avro 格式来进行序列化和反序列化。Avro 是一种数据序列化格式,它提供了一种紧凑、快速、通用的数据交换格式,适用于各种编程语言和平台。这里我们主要配置 kafka 的地址。
bootstrap.servers=ip1:9092,ip2:9092,ip3:9092
# 集群模式下group.id必须是唯一的, 3台机器保持一致
group.id=connect-cluster-1
• schema-registry.properties
Confluent Schema Registry 是一种用于管理和存储 Avro 模式的服务,用于在 Apache Kafka 之间传输数据。Schema Registry 服务允许我们注册、存储和检索 Avro 模式,以便在数据传输期间对数据进行序列化和反序列化。
# Schema Registry 服务ip
host.name=127.0.0.1
# Schema Registry 服务监听的地址和端口号
listeners=http://0.0.0.0:8081
# kafka 元数据 zookeeper 地址
kafkastore.connection.url=ip1:2181,ip2:2181,ip3:2181
# kafka 服务地址
kafkastore.bootstrap.servers=PLAINTEXT://ip1:9092,PLAINTEXT://ip2:9092,PLAINTEXT://ip3:9092
集群模式下所有机器都必须配置,不一样的是host.name 要设置成 Schema Registry 服务所在的ip
• kafka-rest.properties
Confluent Kafka REST 服务的配置文件。通过使用 Kafka REST 服务,我们可以使用 REST API 对 Apache Kafka 进行读取和写入操作。
# 集群唯一标识,每台机器需保持唯一
id=1001
# Kafka Rest 服务ip
host.name=127.0.0.1
# schema-registry 服务ip
schema.registry.url=http://ip1:8081,http://ip2:8081,http://ip3:8081
# zookeeper 地址
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181
# kafka 服务地址
bootstrap.servers=PLAINTEXT://ip1:9092,PLAINTEXT://ip2:9092,PLAINTEXT://ip3:9092
# 指定发送的最大字节数
max.request.size=10485760
# 发送消息的批次大小
batch.size=32768
# 批次间隔
linger.ms=1
③ 启动
启动脚本如下:
# 1. 启动 Schema-registry-start
bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties
# 2. 启动 Connector
nohup bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties >>logs/connect-avro-distributed.log 2>&1 &
# 3. 启动 kafka-rest
nohup bin/kafka-rest-start etc/kafka-rest/kafka-rest.properties >>logs/rest.log 2>&1 &
④ 负载均衡配置
为了使我们的服务更加稳定,集群模式下我们通过 nginx 来做负载均衡。现在我们来看以下如何安装 nginx 以及如何配置负载均衡。
nginx 安装
防火墙配置
为了方便起见,我们直接关闭防火墙,生产环境中可以开启 nginx 端口
# 查看防火墙状态
systemctl status firewalld.service
# 关闭防火墙
systemctl stop firewalld.service
# 禁止防火墙自启动
systemctl disable firewalld.service
安装4个插件
yum install gcc-c++
yum install -y pcre pcre-devel
yum install -y zlib zlib-devel
yum install -y openssl openssl-devel
•
gcc-c++
gcc 可以编译 C,C++,Ada,Object C和Java等语言(安装nginx需要先将官网下载的源码进行编译,编译依赖 gcc 环境)
•
pcre pcre-devel
pcre是一个perl库,包括perl兼容的正则表达式库,nginx的http模块使用pcre来解析正则表达式,所以需要安装pcre库
•
zlib zlib-devel
zlib库提供了很多种压缩和解压缩方式nginx使用zlib对http包的内容进行gzip
•
openssl openssl-devel
OpenSSL 是一个强大的安全套接字层密码库,囊括主要的密码算法、常用的密钥和证书封装管理功能及 SSL 协议,并提供丰富的应用程序供测试或其它目的使用。nginx 不仅支持 http 协议,还支持 https(即在ssl协议上传输http),所以需要在 Centos 安装 OpenSSL 库
安装 nginx
# 下载nginx
wget https://nginx.org/download/nginx-1.21.6.tar.gz
# 解压
tar xvf nginx-1.21.6.tar.gz
# 配置
cd nginx-1.21.6
./configure --prefix=/usr/local/nginx --sbin-path=/usr/local/nginx/sbin/nginx --conf-path=/usr/local/nginx/conf/nginx.conf --error-log-path=/var/log/nginx/error.log --http-log-path=/var/log/nginx/access.log --pid-path=/var/run/nginx/nginx.pid --lock-path=/var/lock/nginx.lock --with-http_ssl_module --with-http_stub_status_module --with-http_gzip_static_module --http-client-body-temp-path=/var/tmp/nginx/client/ --http-proxy-temp-path=/var/tmp/nginx/proxy/ --http-fastcgi-temp-path=/var/tmp/nginx/fcgi/ --http-uwsgi-temp-path=/var/tmp/nginx/uwsgi --http-scgi-temp-path=/var/tmp/nginx/scgi --with-pcre --with-stream
# 编译安装
make && make install
# 查看安装路径
whereis nginx
设置 nginx 开机自启
vim /lib/systemd/system/nginx.service
[Unit]
Description=nginx service
After=network.target
[Service]
Type=forking
ExecStart=/usr/local/nginx/sbin/nginx
ExecReload=/usr/local/nginx/sbin/nginx -s reload
ExecStop=/usr/local/nginx/sbin/nginx -s stop
PrivateTmp=true
[Install]
WantedBy=multi-user.target
•
Description:服务描述•
After:服务类别•
Type=forking:后台运行•
ExecStart:启动命令•
ExecReload:重启命令•
ExecStop:停止命令•
PrivateTmp=true:给服务分配独立的临时空间•
WantedBy:运行级别下服务安装的相关设置,设置为多用户
加入开机自启
systemctl enable nginx.service
负载均衡配置
cd /usr/local/nginx/ & vim conf/nginx.conf
#user nobody;
worker_processes 10;
#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;
#pid logs/nginx.pid;
events {
worker_connections 4096;
}
stream {
upstream backend {
server ip1:8082;
server ip2:8082;
}
log_format proxy '$remote_addr [$time_local] '
'$protocol $status $bytes_sent $bytes_received '
'$session_time "$upstream_addr" ';
server {
listen 8082;
proxy_connect_timeout 5s;
proxy_timeout 10s;
proxy_pass backend;
access_log /usr/local/nginx/logs/tcp_access.log proxy;
error_log /usr/local/nginx/logs/tcp_error.log;
}
}
最后再重新加载配置文件
nginx -s load
⑤ 使用示例
以下列出了常用的一些 rest api,可以很方便的对 kafka 进行各种操作。当然了,你也可以通过 HTTP 去调用相关的接口。
# 获取topic列表
curl "http://localhost:8082/topics"
# 获取topic详情
curl "http://localhost:8082/topics/jsontest"
# 发送一条消息
curl -X POST \
-H "Content-Type: application/vnd.kafka.json.v2+json" \
--data '{"records":[{"value":{"name": "testUser"}}]}' \
"http://nginx_ip:nginx_port/topics/jsontest"
# 创建一个消费者,从头消费
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" -H "Accept: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/my_json_consumer
# 订阅topic
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription
# 消费一些数据
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records
# 删除topic
curl -X DELETE -H "Accept: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance
往期推荐