一、部署准备

1.1 环境准备

我们这里使用vagrant创建了三台centos虚拟机,具体可参 《win10搭建Vagrant+VirtualBox环境》 。我们这里部署计划如下:

主机名IP地址集群节点ID是否部署zookeeper是否部署kafka
node01192.168.33.111
node02192.168.33.122
node03192.168.33.133

1.2 关闭防护墙

由于centos默认使用firewalld,使用如下命令关闭。

1
2
systemctl stop firewalld
systemctl disable firewalld

具体可参考 《firewalld常用命令总结》

如果使用iptables,可以在配置文件**/etc/sysconfig/iptables**增加如下内容:

1
2
3
-A INPUT -s 192.168.33.11/32 -p tcp -m multiport --dports 0:65535 -j ACCEPT
-A INPUT -s 192.168.33.12/32 -p tcp -m multiport --dports 0:65535 -j ACCEPT
-A INPUT -s 192.168.33.13/32 -p tcp -m multiport --dports 0:65535 -j ACCEPT

然后执行下面命令使iptables生效

1
service  iptables  restart

具体可参考 《iptables配置与常用示例》

1.3 编辑host文件

在**/etc/hosts**文件增加如下内容

1
2
3
192.168.33.11	node01
192.168.33.12 node02
192.168.33.13 node03

注意:三个节点都要操作

二、部署zookeeper集群

2.1 下载zookeeper

执行下面命令下载:

1
wget  https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.9.1/apache-zookeeper-3.9.1-bin.tar.gz

也可以访问官网 [https://zookeeper.apache.org/releases.html] 进行下载。

2.2 解压并修改文件名

1
2
tar  xvf  apache-zookeeper-3.9.1-bin.tar.gz
mv apache-zookeeper-3.9.1-bin zookeeper

2.3 修改配置文件

1
2
mv  zookeeper/conf/zoo_sample.cfg  zookeeper/conf/zoo.cfg
vi zoo.cfg

修改内容如下

1
2
3
4
5
6
dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/log

server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

配置参数解读

Server.A=B:C:D。
A是一个数字,表示这个是第几号服务器;
B是这个服务器的ip地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

2.4 新增myid

这个时候我们需要创建数据和日志目录,在zookeeper下执行

1
mkdir  data  log

集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。

1
vi data/myid

分别按照《1.1 环境准备》中规划的节点ID进行配置。

2.5 启动zookeeper

分别在三台主机上执行以上操作,注意myid的区别。然后在zookeeper目录执行下面命令启动

1
./bin/zkServer.sh   start

最后分别查看启动状态

三、部署kafka集群

3.1 下载kafka

1
wget  https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz

3.2 解压并修改文件名

1
2
tar  xvf  kafka_2.13-3.6.0.tgz
mv kafka_2.13-3.6.0 kafka

3.3 修改配置文件

1
2
cd  kafka
vi config/server.properties

修改如下内容:

1
2
3
broker.id=1
log.dirs=/opt/kafka/kafka-logs
zookeeper.connect=node01:2181,node02:2181,node03:2181/kafka

注意: 定义broker id,不能重复,只能为数字,这里我们和每台节点上zookeeper的myid保持一致。

3.4 创建日志目录

1
mkdir kafka-logs

3.5 启动kafka

分别在三台节点执行下面命令启动

1
./bin/kafka-server-start.sh   ./config/server.properties   &

四、测试验证

在其中一台上执行下面命令

4.1 查看主题

1
./bin/kafka-topics.sh --list --bootstrap-server  localhost:9092

4.2 创建主题

1
./bin/kafka-topics.sh --bootstrap-server  localhost:9092  --create  --topic test-topic  --partitions 1

再次查看主题如下所示

4.3 增加topic分区数

1
./bin/kafka-topics.sh --bootstrap-server  localhost:9092  --alter --topic test-topic --partitions 2

4.4 添加配置

1
./bin/kafka-configs.sh --bootstrap-server  localhost:9092  --entity-type topics --entity-name  test-topic  --alter --add-config x=y

4.5 删除配置

1
./bin/kafka-configs.sh --bootstrap-server  localhost:9092  --entity-type topics --entity-name  test-topic  --alter --delete-config x

4.6 检查消费组offset

1
./bin/kafka-consumer-groups.sh --bootstrap-server  localhost:9092  --describe --group my-group

4.7 列出消费组

1
./bin/kafka-consumer-groups.sh --bootstrap-server  localhost:9092  --list

4.8 要手动删除一个或多个消费者组

1
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

4.9 将消费组的offset 重置为最新

1
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

4.10 删除主题

1
./bin/kafka-topics.sh --bootstrap-server  localhost:9092  --delete --topic  test-topic