Ubuntu22.4搭建Kafka3及Zookeeper

Kafka3依赖Zookeeper进行元数据管理,分为Scala2.12和Scala2.13编写的两个版本。 而Kafka4则移除了对Zookeeper的依赖,采用KRaft协议确保一致性。 同时需要注意的是Kafka3的运行时环境(JRE)是Java8,而Kafka4的运行时环境是Java17。

参考资料:尚硅谷的B站课程

Zookeeper下载链接Kafka下载链接

Zookeeper搭建

下载Zookeeper安装包后解压缩,修改名字为zookeeper。上传到虚拟机或者服务器,准备安装(我上传到了/usr/local/)。

创建数据存储目录

在zookeeper路径下创建zkData文件夹

1
2
cd /usr/local/zookeeper
mkdir zkData

创建serverid指定文件

在zkData目录下创建一个myid文件,填入一个与server对应的数字。这里注意三台机的数字一定要不同,我准备在1号机填入2,二号机填入3,3号机填入4。事实上填入1、2、3应该也是可以。这里先填入数字,后面会分发zookeeper,后续步骤会进行修改。

1
2
cd /usr/local/zookeeper/zkData
vim myid

修改zoo.cfg文件的配置信息

1
2
3
4
5
6
7
8
cd /usr/local/zookeeper/conf
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 填入如下内容
dataDir=/usr/local/zookeeper/zkData
server.2=hadoop1:2888:3888
server.3=hadoop2:2888:3888
server.4=hadoop3:2888:3888

注意上面配置信息里的dataDir就是zookeeper存储数据的文件夹。要修改为自己的路径。server.2就是1号机的myid里的数字,也要修改为自己填的数字。

分发zookeeper

分发zookeeper。xsync是一个文件分发脚本。如果没有这个脚本,可以参考我的Hadoop部署教程zookeeper分发完成后一定记得修改2号机和3号机的myid,填入serverid。

1
2
cd /usr/local/
xsync zookeeper

启动zookeeper

  1. 在三台机上分别执行如下zookeeper启动命令。然后查看zookeeper启动状态。
1
2
/usr/local/zookeeper/bin/zkServer.sh start
/usr/local/zookeeper/bin/zkServer.sh status

正确的状态应该如下:

1
2
3
4
5
rust@hadoop1:/usr/local/zookeeper$ bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
  1. 在1号机上编写启停脚本。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
vim ~/bin/zk.sh 
# 填入如下内容,注意把主机名改成自己的
#!/bin/bash

case $1 in
"start"){
	for i in hadoop1 hadoop2 hadoop3
	do
        echo ---------- zookeeper $i 启动 ------------
		ssh $i "/usr/local/zookeeper/bin/zkServer.sh start"
	done
};;
"stop"){
	for i in hadoop1 hadoop2 hadoop3
	do
        echo ---------- zookeeper $i 停止 ------------    
		ssh $i "/usr/local/zookeeper/bin/zkServer.sh stop"
	done
};;
"status"){
	for i in hadoop1 hadoop2 hadoop3
	do
        echo ---------- zookeeper $i 状态 ------------    
		ssh $i "/usr/local/zookeeper/bin/zkServer.sh status"
	done
};;
esac
  1. 增加脚本执行权限
1
2
3
4
5
6
7
sudp chmod 777 ~/bin/zk.sh
# zookeeper集群启动命令
zk.sh start
# zookeeper集群关闭命令
zk.sh stop
# zookeeper集群查看状态
zk.sh status

Kafka3搭建

解压下载的Kafka3,并改名为kafka,并上传到虚拟机或服务器的/usr/local目录下。

修改配置文件

1
2
3
4
5
6
7
cd /usr/local/config
vim server.properties
# 填入如下内容
broker.id=0
advertised.listeners=PLAINTEXT://hadoop1:9092
log.dirs=/usr/local/kafka/datas
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka

分发Kafka,修改另外两台机的broker.id(broker.id不能重复,2号机改成1,3号机改成2)和advertised.listeners。

配置环境变量

1
2
3
4
5
6
sudo vim ~/.bashrc
# 填入如下内容
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin

xsync ~/.bashrc

启动Kafka集群

启动Kafka3集群前一定要先启动zookeeper集群,关闭则要先关闭Kafka集群再关闭zookeeper集群。 不然zookeeper关闭后Kafka就不能正常关闭,只能手动杀进程了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
zk.sh start

# 依次在三台机执行启动命令
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

# 查看进程
xcall jps

# 成功搭建后应如下
rust@hadoop1:/usr/local/kafka$ xcall jps
要执行的命令是jps
--------------------hadoop1--------------------
8388 Jps
8299 Kafka
6828 QuorumPeerMain
--------------------hadoop2--------------------
4464 Kafka
3721 QuorumPeerMain
4543 Jps
--------------------hadoop3--------------------
4491 QuorumPeerMain
5212 Jps
5132 Kafka


# 手动关闭Kafka集群,在三台机上分别执行如下命令
/usr/local/kafka//usr/local/kafka/

集群启停脚本

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
sudo vim ~/bin/kf.sh
# 填入如下内容
#!/bin/bash

case $1 in
"start")
    for i in hadoop1 hadoop2 hadoop3
    do 
        echo "---启动 $i kafka---"
        ssh $i "/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties"
    done
;;
"stop")
    for i in hadoop1 hadoop2 hadoop3
    do 
        echo "---停止 $i kafka---"
        ssh $i "/usr/local/kafka/bin/kafka-server-stop.sh"
    done
;;
esac

# 添加执行权限
sudo chmod 777 ~/bin/kf.sh
# kakfa集群启动
kf.sh strat
# kafka集群关闭
kf.sh stop
# kafka集群状态查看
kf.sh status

Kafka终端命令操作

  1. 主题(topic)相关命令
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 1)查看topic命令参数

/usr/local/bin/kafka-topics.sh

参数	描述
--bootstrap-server <String: server toconnect to>	连接的Kafka Broker主机名称和端口号。
--topic <String: topic>	操作的topic名称。
--create	创建主题。
--delete	删除主题。
--alter	修改主题。
--list	查看所有主题。
--describe	查看主题详细描述。
--partitions <Integer: # of partitions>	设置分区数。
--replication-factor<Integer: replication factor>	设置分区副本。
--config <String: name=value>	更新系统默认的配置。

# 2)查看当前服务器中的所有topic
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --list

# 3)创建名字为first的topic
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --create --partitions 1 --replication-factor 3 --topic first

# --topic 定义topic名
# --replication-factor  定义副本数
# --partitions  定义分区数
# 4)查看first主题的详情
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --describe --topic first
# 5)修改分区数(注意:分区数只能增加,不能减少)
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --alter --topic first --partitions 3
# 6)再次查看first主题的详情
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --describe --topic first
# 7)删除topic
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server hadoop1:9092 --delete --topic first
  1. 生产者(producer)相关命令
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# 1)查看操作生产者命令参数
/usr/local/kafka/bin/kafka-console-producer.sh

# 参数	描述
# --bootstrap-server <String: server toconnect to>	连接的Kafka Broker主机名称和端口号。
# --topic <String: topic>	操作的topic名称。
# 2)发送消息
/usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server hadoop1:9092 --topic first

>hello world
>atguigu  atguigu
  1. 消费者(consumer)相关命令
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# 1)查看操作消费者命令参数
/usr/local/kafka/bin/kafka-console-consumer.sh
# 参数	描述
# --bootstrap-server <String: server toconnect to>	连接的Kafka Broker主机名称和端口号。
# --topic <String: topic>	操作的topic名称。
# --from-beginning	从头开始消费。
# --group <String: consumer group id>	指定消费者组名称。

# 2)消费消息
# (1)消费first主题中的数据
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --topic first
# (2)把主题中所有的数据都读取出来(包括历史数据)
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092 --from-beginning --topic first
网站总访客数:Loading
网站总访问量:Loading
使用 Hugo 构建
主题 StackJimmy 设计