Kafka3依赖Zookeeper进行元数据管理,分为Scala2.12和Scala2.13编写的两个版本。
而Kafka4则移除了对Zookeeper的依赖,采用KRaft协议确保一致性。
同时需要注意的是Kafka3的运行时环境(JRE)是Java8,而Kafka4的运行时环境是Java17。
参考资料:尚硅谷的B站课程
Zookeeper下载链接、
Kafka下载链接
Zookeeper搭建
下载Zookeeper安装包后解压缩,修改名字为zookeeper。上传到虚拟机或者服务器,准备安装(我上传到了/usr/local/
)。

创建数据存储目录
在zookeeper路径下创建zkData文件夹
1
2
|
cd /usr/local/zookeeper
mkdir zkData
|
创建serverid指定文件
在zkData目录下创建一个myid文件,填入一个与server对应的数字。这里注意三台机的数字一定要不同,我准备在1号机填入2,二号机填入3,3号机填入4。事实上填入1、2、3应该也是可以。这里先填入数字,后面会分发zookeeper,后续步骤会进行修改。
1
2
|
cd /usr/local/zookeeper/zkData
vim myid
|
修改zoo.cfg文件的配置信息
1
2
3
4
5
6
7
8
|
cd /usr/local/zookeeper/conf
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 填入如下内容
dataDir=/usr/local/zookeeper/zkData
server.2=hadoop1:2888:3888
server.3=hadoop2:2888:3888
server.4=hadoop3:2888:3888
|
注意上面配置信息里的dataDir就是zookeeper存储数据的文件夹。要修改为自己的路径。server.2就是1号机的myid里的数字,也要修改为自己填的数字。
分发zookeeper
分发zookeeper。xsync是一个文件分发脚本。如果没有这个脚本,可以参考我的Hadoop部署教程。
zookeeper分发完成后一定记得修改2号机和3号机的myid,填入serverid。
1
2
|
cd /usr/local/
xsync zookeeper
|
启动zookeeper
- 在三台机上分别执行如下zookeeper启动命令。然后查看zookeeper启动状态。
1
2
|
/usr/local/zookeeper/bin/zkServer.sh start
/usr/local/zookeeper/bin/zkServer.sh status
|
正确的状态应该如下:
1
2
3
4
5
|
rust@hadoop1:/usr/local/zookeeper$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
|
- 在1号机上编写启停脚本。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
vim ~/bin/zk.sh
# 填入如下内容,注意把主机名改成自己的
#!/bin/bash
case $1 in
"start"){
for i in hadoop1 hadoop2 hadoop3
do
echo ---------- zookeeper $i 启动 ------------
ssh $i "/usr/local/zookeeper/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop1 hadoop2 hadoop3
do
echo ---------- zookeeper $i 停止 ------------
ssh $i "/usr/local/zookeeper/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop1 hadoop2 hadoop3
do
echo ---------- zookeeper $i 状态 ------------
ssh $i "/usr/local/zookeeper/bin/zkServer.sh status"
done
};;
esac
|
- 增加脚本执行权限
1
2
3
4
5
6
7
|
sudp chmod 777 ~/bin/zk.sh
# zookeeper集群启动命令
zk.sh start
# zookeeper集群关闭命令
zk.sh stop
# zookeeper集群查看状态
zk.sh status
|
Kafka3搭建
解压下载的Kafka3,并改名为kafka,并上传到虚拟机或服务器的/usr/local目录下。
修改配置文件
1
2
3
4
5
6
7
|
cd /usr/local/config
vim server.properties
# 填入如下内容
broker.id=0
advertised.listeners=PLAINTEXT://hadoop1:9092
log.dirs=/usr/local/kafka/datas
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka
|
分发Kafka,修改另外两台机的broker.id(broker.id不能重复,2号机改成1,3号机改成2)和advertised.listeners。
配置环境变量
1
2
3
4
5
6
|
sudo vim ~/.bashrc
# 填入如下内容
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
xsync ~/.bashrc
|
启动Kafka集群
启动Kafka3集群前一定要先启动zookeeper集群,关闭则要先关闭Kafka集群再关闭zookeeper集群。
不然zookeeper关闭后Kafka就不能正常关闭,只能手动杀进程了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
zk.sh start
# 依次在三台机执行启动命令
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
# 查看进程
xcall jps
# 成功搭建后应如下
rust@hadoop1:/usr/local/kafka$ xcall jps
要执行的命令是jps
--------------------hadoop1--------------------
8388 Jps
8299 Kafka
6828 QuorumPeerMain
--------------------hadoop2--------------------
4464 Kafka
3721 QuorumPeerMain
4543 Jps
--------------------hadoop3--------------------
4491 QuorumPeerMain
5212 Jps
5132 Kafka
# 手动关闭Kafka集群,在三台机上分别执行如下命令
/usr/local/kafka//usr/local/kafka/
|
集群启停脚本
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
sudo vim ~/bin/kf.sh
# 填入如下内容
#!/bin/bash
case $1 in
"start")
for i in hadoop1 hadoop2 hadoop3
do
echo "---启动 $i kafka---"
ssh $i "/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties"
done
;;
"stop")
for i in hadoop1 hadoop2 hadoop3
do
echo "---停止 $i kafka---"
ssh $i "/usr/local/kafka/bin/kafka-server-stop.sh"
done
;;
esac
# 添加执行权限
sudo chmod 777 ~/bin/kf.sh
# kakfa集群启动
kf.sh strat
# kafka集群关闭
kf.sh stop
# kafka集群状态查看
kf.sh status
|
Kafka终端命令操作
- 主题(topic)相关命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# 1)查看topic命令参数
/usr/local/bin/kafka-topics.sh
参数 描述
--bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。
--topic <String: topic> 操作的topic名称。
--create 创建主题。
--delete 删除主题。
--alter 修改主题。
--list 查看所有主题。
--describe 查看主题详细描述。
--partitions <Integer: # of partitions> 设置分区数。
--replication-factor<Integer: replication factor> 设置分区副本。
--config <String: name=value> 更新系统默认的配置。
# 2)查看当前服务器中的所有topic
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --list
# 3)创建名字为first的topic
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --create --partitions 1 --replication-factor 3 --topic first
# --topic 定义topic名
# --replication-factor 定义副本数
# --partitions 定义分区数
# 4)查看first主题的详情
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --describe --topic first
# 5)修改分区数(注意:分区数只能增加,不能减少)
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --alter --topic first --partitions 3
# 6)再次查看first主题的详情
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --describe --topic first
# 7)删除topic
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --delete --topic first
|
- 生产者(producer)相关命令
1
2
3
4
5
6
7
8
9
10
11
|
# 1)查看操作生产者命令参数
/usr/local/kafka/bin/kafka-console-producer.sh
# 参数 描述
# --bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。
# --topic <String: topic> 操作的topic名称。
# 2)发送消息
/usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092 --topic first
>hello world
>atguigu atguigu
|
- 消费者(consumer)相关命令
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# 1)查看操作消费者命令参数
/usr/local/kafka/bin/kafka-console-consumer.sh
# 参数 描述
# --bootstrap-server <String: server toconnect to> 连接的Kafka Broker主机名称和端口号。
# --topic <String: topic> 操作的topic名称。
# --from-beginning 从头开始消费。
# --group <String: consumer group id> 指定消费者组名称。
# 2)消费消息
# (1)消费first主题中的数据
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic first
# (2)把主题中所有的数据都读取出来(包括历史数据)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic first
|