教你8步轻松上手kafka

1、配置环境变量

必须要求Java 8及以上版本

[root@localhost kafka]# java -version
openjdk version "1.8.0_262"
OpenJDK Runtime Environment (build 1.8.0_262-b10)
OpenJDK 64-Bit Server VM (build 25.262-b10, mixed mode)

2、下载并解压kafka安装文件

kafka下载地址

https://kafka.apache.org/downloads

上传至服务器并解压

[root@localhost kafka]# ls
kafka_2.13-3.3.1.tgz
[root@localhost kafka]# tar -xzf ./kafka_2.13-3.3.1.tgz 
[root@localhost kafka]# ls
kafka_2.13-3.3.1  kafka_2.13-3.3.1.tgz
[root@localhost kafka]# cd ./kafka_2.13-3.3.1/
[root@localhost kafka_2.13-3.3.1]# ls
bin  config  libs  LICENSE  licenses  NOTICE  site-docs
[root@localhost kafka_2.13-3.3.1]#

3、启动服务

必须按照正确的顺序先启动zookeeper,在启动kafka,启动命令如下。

启动zookeeper

[root@localhost kafka_2.13-3.3.1]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties

启动kafka(另起一个窗口启动),注意在配置文件里面更改zookeeper.connect的参数值,本机默认为localhost,所以可以不用改。

[root@localhost kafka_2.13-3.3.1]# ./bin/kafka-server-start.sh ./config/server.properties4、创建topic

使用kafka-topics.sh创建名为local-topic的topic,localhost为kafka部署的主机名,9092为kafka的端口,需根据实际情况更改。

[root@localhost kafka_2.13-3.3.1]# ./bin/kafka-topics.sh --create --topic local-topic --bootstrap-server localhost:9092
Created topic local-topic.
[root@localhost kafka_2.13-3.3.1]#

5、查看topic

[root@localhost kafka_2.13-3.3.1]# ./bin/kafka-topics.sh --describe --topic local-topic --bootstrap-server localhost:9092
Topic: local-topicTopicId: zlnMG_F9RO6VAeS2hj6rqwPartitionCount: 1ReplicationFactor: 1Configs: 
Topic: local-topicPartition: 0Leader: 0Replicas: 0Isr: 0
[root@localhost kafka_2.13-3.3.1]#

6、使用kafka-console-producer.sh为local-topic写入event,进入交互式窗口写入event,完成后使用Ctrl-C结束窗口。

[root@localhost kafka_2.13-3.3.1]# ./bin/kafka-console-producer.sh --topic local-topic --bootstrap-server localhost:9092
>local-topic 1 test  
>local-topic 2 test
>local-topic 3 test
>^C[root@localhost kafka_2.13-3.3.1]#

7、使用kafka-console-consumer.sh查看local-topic写入的event,使用Ctrl-C结束窗口

[root@localhost kafka_2.13-3.3.1]# ./bin/kafka-console-consumer.sh --topic local-topic --from-beginning --bootstrap-server localhost:9092
local-topic 1 test
local-topic 2 test
local-topic 3 test
^CProcessed a total of 3 messages
[root@localhost kafka_2.13-3.3.1]#

8、以文件为例使用kafkaconnect导入数据

编辑connect-standalone.properties,指定plugin.path路径至对应的connect使用的jar包。

[root@localhost kafka_2.13-3.3.1]# vim ./config/connect-standalone.properties
[root@localhost kafka_2.13-3.3.1]# cat ./config/connect-standalone.properties | grep plugin.path
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=
plugin.path=/kafka/kafka_2.13-3.3.1/libs/connect-file-3.3.1.jar
[root@localhost kafka_2.13-3.3.1]#

编辑测试文件test,写入测试数据

[root@localhost kafka_2.13-3.3.1]# echo -e "foo\nbar" > /kafka/test.txt

修改connect-file-source.properties、connect-file-sink.properties中file中的文件路径为要导入数据的文件路径,并根据实际情况修改其他参数值

[root@localhost kafka_2.13-3.3.1]# vim ./config/connect-file-source.properties 
[root@localhost kafka_2.13-3.3.1]# cat ./config/connect-file-source.properties 
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/kafka/test.txt
topic=connect-test
[root@localhost kafka_2.13-3.3.1]#

启动connect进程

[root@localhost kafka_2.13-3.3.1]# ./bin/connect-standalone.sh ./config/connect-standalone.properties ./config/connect-file-source.properties ./config/connect-file-sink.properties

查看文件内容读取结果,将此窗口保留,测试实时写入test文件数据后的变化。可以看到数据已经读入kafka并按文件配置创建connect-test的topic,查看test.sink.txt文件内容,跟test一致。

[root@localhost kafka_2.13-3.3.1]# ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
connect-test
local-topic
[root@localhost kafka_2.13-3.3.1]# 
[root@localhost kafka_2.13-3.3.1]# cat /kafka/test.sink.txt
foo
bar
[root@localhost kafka_2.13-3.3.1]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

在test.txt文件中追加信息,如追加test3到/kafka/test.txt,写完后观察前面保留的窗口变化。

[root@localhost kafka_2.13-3.3.1]# echo "test3" >> /kafka/test.txt 
[root@localhost kafka_2.13-3.3.1]#

可以看到,新追加的test3会自动打印出来,并且在写入test.sink.txt

[root@localhost kafka_2.13-3.3.1]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"test3"}
[root@localhost kafka_2.13-3.3.1]# cat /kafka/test.sink.txt
foo
bar
test3

附:kafka-topics.sh帮助文档


This tool helps to create, delete, describe, or change a topic.
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
--at-min-isr-partitions                  if set when describing topics, only    
                                           show partitions whose isr count is   
                                           equal to the configured minimum.     
--bootstrap-server                               to.                                  
--command-config                     passed to Admin Client. This is used 
                                           only with --bootstrap-server option  
                                           for describing and altering broker   
                                           configs.                             
--config             A topic configuration override for the 
                                           topic being created or altered. The  
                                           following is a list of valid         
                                           configurations:                      
                                         cleanup.policy                        
                                         compression.type                      
                                         delete.retention.ms                   
                                         file.delete.delay.ms                  
                                         flush.messages                        
                                         flush.ms                              
                                         follower.replication.throttled.       
                                           replicas                             
                                         index.interval.bytes                  
                                         leader.replication.throttled.replicas 
                                         local.retention.bytes                 
                                         local.retention.ms                    
                                         max.compaction.lag.ms                 
                                         max.message.bytes                     
                                         message.downconversion.enable         
                                         message.format.version                
                                         message.timestamp.difference.max.ms   
                                         message.timestamp.type                
                                         min.cleanable.dirty.ratio             
                                         min.compaction.lag.ms                 
                                         min.insync.replicas                   
                                         preallocate                           
                                         remote.storage.enable                 
                                         retention.bytes                       
                                         retention.ms                          
                                         segment.bytes                         
                                         segment.index.bytes                   
                                         segment.jitter.ms                     
                                         segment.ms                            
                                         unclean.leader.election.enable        
                                         See the Kafka documentation for full   
                                           details on the topic configs. It is  
                                           supported only in combination with --
                                           create if --bootstrap-server option  
                                           is used (the kafka-configs CLI       
                                           supports altering topic configs with 
                                           a --bootstrap-server option).        
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config            A topic configuration override to be   
                                           removed for an existing topic (see   
                                           the list of configurations under the 
                                           --config option). Not supported with 
                                           the --bootstrap-server option.       
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--exclude-internal                       exclude internal topics when running   
                                           list or describe command. The        
                                           internal topics will be listed by    
                                           default                              
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting or    
                                           describing topics, the action will   
                                           only execute if the topic exists.    
--if-not-exists                          if set when creating topics, the       
                                           action will only execute if the      
                                           topic does not already exist.        
--list                                   List all available topics.             
--partitions   The number of partitions for the topic 
                                           being created or altered (WARNING:   
                                           If partitions are increased for a    
                                           topic that has a key, the partition  
                                           logic or ordering of the messages    
                                           will be affected). If not supplied   
                                           for create, defaults to the cluster  
                                           default.                             
--replica-assignment                                            
--replication-factor                       partition in the topic being         
                                           created. If not supplied, defaults   
                                           to the cluster default.              
--topic                   The topic to create, alter, describe   
                                           or delete. It also accepts a regular 
                                           expression, except for --create      
                                           option. Put topic name in double     
                                           quotes and use the '\' prefix to     
                                           escape regular expression symbols; e.
                                           g. "test\.topic".                    
--topic-id             The topic-id to describe.This is used  
                                           only with --bootstrap-server option  
                                           for describing topics.               
--topics-with-overrides                  if set when describing topics, only    
                                           show topics that have overridden     
                                           configs                              
--unavailable-partitions                 if set when describing topics, only    
                                           show partitions whose leader is not  
                                           available                            
--under-min-isr-partitions               if set when describing topics, only    
                                           show partitions whose isr count is   
                                           less than the configured minimum.    
--under-replicated-partitions            if set when describing topics, only    
                                           show under replicated partitions     
--version                                Display Kafka version.                 
[root@localhost kafka_2.13-3.3.1]#


请使用浏览器的分享功能分享到微信等