美的集团网站建设方案书,建网站seo,单页面网站模板,中小企业网站建设济南兴田德润o厉害吗目录
1. 依赖Zookeeper搭建集群
1. 下载Kafka二进制文件
2. 更改kafka配置
3. 启动Zookeeper集群和Kafka集群
4. 验证集群
1.创建主题
2. 检查主题是否存在
3. 创建生产者生产数据
4. 创建消费者消费数据
5. 检查Zookeeper中Kafka集群的元数据
2. 不依赖Zookeeper搭…目录
1. 依赖Zookeeper搭建集群
1. 下载Kafka二进制文件
2. 更改kafka配置
3. 启动Zookeeper集群和Kafka集群
4. 验证集群
1.创建主题
2. 检查主题是否存在
3. 创建生产者生产数据
4. 创建消费者消费数据
5. 检查Zookeeper中Kafka集群的元数据
2. 不依赖Zookeeper搭建Kafka集群
1.下载Kafka二进制文件
2.更改Kafka配置
3. 生成meta.properties文件 4. 启动kafka集群
5. 验证集群
1. 创建主题并验证主题存在
2. 生产者生产数据
3. 消费者消费数据 1. 依赖Zookeeper搭建集群
在2.8.0版本以前Kafka集群的搭建依赖于Zookeeper环境需要将kafka集群节点ip及主题等Kafka相关的元数据存入Zookeeper服务中。所以搭建Kafka集群前需要先搭建Zookeeper集群。
zookeeper多主机集群搭建https://blog.csdn.net/dxh9231028/article/details/141052933?spm1001.2014.3001.5501
1. 下载Kafka二进制文件
搭建完Zookeeper集群后从Kafka官网下载Kafka二进制文件这里我们下载最新的3.8.0版本。 然后创建/kafka文件夹将文件传入该文件夹解压并将解压后的文件改名为kafka 。
#解压文件
tar -xf ./kafka_2.13-3.8.0.tgz
#更改名字
mv ./kafka_2.13-3.8.0 ./kafka
这里我们搭建三个节点的Kafka集群所以要再三个主机都进行如上操作。
2. 更改kafka配置
修改kafka/conf/server.properties文件中的如下配置
#集群中kafka节点的唯一id其他的节点要配置别的id
broker.id0 #Kafka启动时会向Zookeeper集群进行服务注册这个配置项记录所有Zookeeper节点的ip和端口只记录一个localhost也能成功启动不过为了集群的高可用性全部配置更好。
zookeeper.connectip1:2181,ip2:2181,ip3:2181 #Kafka监听的IP端口由于现在的主题通常是多网卡环境所以需要指定ip早期版本好像可以使用0.0.0.0监听全部的ip不过这个版本不可以必须指定明确的ip。
listenersPLAINTEXT://ip1:9092#kafka日志的存储位置实际上是Kafka的全部数据包括Kafka内部存储的消息以及Kafka集群第一次启动时从Zookeeper中获取的集群元数据。
log.dirs/path/to/kafka/logs
启动一个简单的集群Kafka不需要太多的配置不过实际生产环境中Kafka的部署需要考虑大量因素需要大量的调整配置运维难度较大。
3. 启动Zookeeper集群和Kafka集群
如果在Zookeeper集群搭建后已经启动了这里就不需要在启动了如果没有启动不仅可以通过Zookeeper提供的zkServer.sh可执行文件启动也可以通过Kafka的可执行文件启动。
#启动Zookeeper如果之前没启动
bin/zookeeper-server-start.sh config/zookeeper.properties#启动kafka
bin/kafka-server-start.sh config/server.properties#后台启动Kafka不占用当前终端
nohup bin/kafka-server-start.sh config/server.properties kafka.out 21
4. 验证集群
集群启动成功后我们可以通过创建生产者生产数据并在另一个Kafka服务进行消费来验证集群创建是否成功。
1.创建主题
通过调用Kafka的bin目录下的kafka-topic.sh文件创建主题命令如下。
./bin/kafka-topics.sh --create --topic test-topic --bootstrap-server 192.168.142.20:9092 --replication-factor 3 --partitions 3
#create代表当前执行的创建主题操作
#topic指定创建主题名称
#bootstrap-server指定从哪个kafka服务实例连接集群可以指定多个防止某个节点宕机造成指令执行失败。
#replication-factor指定当前主题要复制几份分给其他Kafka服务实例当前是三节点集群我们创建三份保证每个节点都有该主题数据。
#当复制份数少于集群节点数时会有部分节点没有该主题数据不过不会影响kafka集群正常消费数据。
创建成功后结果如下图 2. 检查主题是否存在
依旧是调用Kafka-topic.sh文件检查主题是否存在
./bin/kafka-topics.sh --list --bootstrap-server 192.168.142.21:9092
#list指的是当前是要执行查看主题列表操作
#bootstrap-server指定接入集群的kafka服务地址
这里我们通过第二台主机连接第二个kafka节点获取主题列表结果如下 3. 创建生产者生产数据
确定主题存在后我们再用这台主机和这个kafka节点服务创建生产者生产数据。
bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.142.21:9092 仔细阅读上面的配置解释这里的配置就不解释了应该很容易看懂结果如下 可以看到执行完可执行文件后会出现特殊的命令行在这个命令行填写数据并回车则可以向指定的主题消费数据这里我生产了三条数据。
4. 创建消费者消费数据
这里我们再用第三台主机和第三个kafka节点服务来消费数据通过可执行文件kafka-console-consumer.sh执行以下命令
bin/kafka-console-consumer.sh --topic test-topic --from-beginning --bootstrap-server 192.168.142.22:9092
结果如下 可以看到成功消费到生产者生产的数据
5. 检查Zookeeper中Kafka集群的元数据
通过zkCli.sh执行如下命令连接zookeeper。
./zkCli.sh -server 192.168.142.20
连接后通过调用ls -R / 查看zookeeper中所有的节点信息内容过长这里无法截图直接复制到代码块中并且删除掉__consumer_offset节点下的所有子节点信息方便大家查看。
从如下结果可以看到除了/zookeeper节点外其他节点均为kafka集群元数据信息其中/brokers/ids节点由012三个子节点分别记录了Kafka集群中每个节点的ip端口等信息而/brokers/topics下保存了所有主题信息其中可以看到除了我们创建的test-topic主题外还有__consumer_offsets主题这个主题中记录了所有消费者组和其负责的主题分区的对应关系内容极多这里为了方便大家观看结果将其删除想了解更多可以看我的另一篇文章
Kafka基本概念及消费流程https://blog.csdn.net/dxh9231028/article/details/141270920?spm1001.2014.3001.5501
/
/admin
/brokers
/cluster
/config
/consumers
/controller
/controller_epoch
/feature
/isr_change_notification
/latest_producer_id_block
/log_dir_event_notification
/zookeeper
/admin/delete_topics
/brokers/ids
/brokers/seqid
/brokers/topics
/brokers/ids/0
/brokers/ids/1
/brokers/ids/2
/brokers/topics/__consumer_offsets
/brokers/topics/test-topic
/brokers/topics/test-topic/partitions
/brokers/topics/test-topic/partitions/0
/brokers/topics/test-topic/partitions/1
/brokers/topics/test-topic/partitions/2
/brokers/topics/test-topic/partitions/0/state
/brokers/topics/test-topic/partitions/1/state
/brokers/topics/test-topic/partitions/2/state
/cluster/id
/config/brokers
/config/changes
/config/clients
/config/ips
/config/topics
/config/users
/config/topics/__consumer_offsets
/config/topics/test-topic
/zookeeper/config
/zookeeper/quota2. 不依赖Zookeeper搭建Kafka集群
Kafka 从版本 2.8.0 开始引入了新的 KRaft 模式这使得 Kafka 可以在不依赖 ZooKeeper 的情况下部署集群。在传统的 Kafka 架构中ZooKeeper 用于管理和协调 Kafka 集群中的元数据例如主题、分区、副本状态等。KRaft 模式则将这些管理功能集成到 Kafka 自身的控制器节点中消除了对 ZooKeeper 的依赖。
1.下载Kafka二进制文件
第一步仍然是下载解压文件和之前一样
2.更改Kafka配置
Kafka默认配置是基于Zookeeper的所以我们首先要删除Zookeeper相关的配置通过vim命令中/zookeeper查找内容注释或删除掉相关配置然后删除掉broker.id排除了zookeeper模式相关配置的影响后在进行如下配置不删除可能也不影响启动不过最好删除防止其他不可预料的行为。
#指定 Kafka 节点的角色可以是broker、controller或两者兼具。
process.rolesbroker,controller#当前节点的唯一id
node.id1#kafka集群中担任controller角色的kafka服务地址
controller.quorum.voters1192.168.142.30:9093,2192.168.142.31:9093,3192.168.142.32:9093#当前的节点中由controller角色则需要配置一个name标识必须大写
controller.listener.namesCONTROLLER#为上面设置的controller标识设置一个ip地址这个地址也是controller.quorum.voters中配置的地址
listenersPLAINTEXT://192.168.142.30:9092,CONTROLLER://192.168.142.30:9093#存储之前存储在Zookeeper中的元数据内容
metadata.log.dir/var/lib/kafka/meta#配置初始化broker角色的kafka实例的超时时间默认是30s这里配置高一点下面会说原因
initial.broker.registration.timeout.ms240000
在传统的zookeeper模式下zookeeper担任着选举leader及各种集群元数据管理的工作而在新版本的kraft模式下controller角色的kafka服务实例接替了这一工作代替zookeeper处理Leader选举以及元数据管理的工作并将元数据存入metadata.log.dir配置的路径中。
3. 生成meta.properties文件
在传统的zookeeper模式下kafka集群信息保存在zookeeper中也就是说只要Kafka实例·将自己的信息注册在zookeeper中它就是集群的一部分区分不同集群的根据是不同的zookeeper集群。
而在kraft模式下kafka并没有像redis或zookeeper那样将集群中所有节点的ip地址配置到配置文件中而是通过kafka-storage.sh可执行文件生成meta.properties文件命令如下
bin/kafka-storage.sh format --config conf/server.properties --cluster-id 123
#config选项指定配置文件路径获取配置文件中的nodeid
#cluster-id指定一个集群id实际生产过程中使用Kafka提供的bin/kafka-storage.sh random-uuid生成这两个idbin/kafka-storage.sh random-uuid可以生成一个唯一id用于nodeid或clusterid这里我们定义简单的nodeid和clusterid用于测试Kafka集群搭建过程。 每个Kafka实例都要执行这个命令生成一个meta.properties文件文件内容如下
#
#Sat Aug 17 02:55:59 PDT 2024
cluster.id123
version1
directory.idAABGMd7o4oHy2t-qzVzhcQ
node.id1其中cluster.id就是集群标识一个集群中的所有Kafka节点实例的meta.properties文件中的cluster.id必须相同并且node.id必须不同。 Kafka没有像其他中间件那样将节点ip写在配置中可能的原因是想增加可扩展性。如果将所有ip写在配置中意味着如果我想新增一个节点则需要修改全部kafka实例的配置文件而如果像Kafka这样利用一个文件中的cluster-id选项区分是否是一个集群这样当想要新增一个Kafka实例时只需要让新增的Kafka实例生成的meta.properties中的集群id相同即可。 4. 启动kafka集群
启动kafka集群命令和传统模式一样都是通过kafka-server-start.sh可执行文件启动Kafka服务。
bin/kafka-server-start.sh config/server.properties#后台启动
nohup bin/kafka-server-start.sh /path/to/kraft-server.properties kafka.out 21
不同的时在传统模式下集群的元数据的处理是靠已经启动的zookeeper这是各个Kafka实例可以随时启动。而在kraft模式下集群的元数据处理依赖于controller节点所以在启动broker角色的Kafka实例之前必须将controller全部启动完成然后broker才会正常启动。
但如果节点既是controller又是broker呢那么此时这个节点的controller角色将会正常启动不过由于controller没有全部启动broker角色无法初始化成功在这种情况下该Kafka实例会不断地循环初始化直到controller节点全部启动后broker角色才会初始化成功。一旦brocker角色循环初始化的时间超过了超时时间默认是30sKafka服务就会启动失败。
所以在配置项中我们新增一项initial.broker.registration.timeout.ms240000将broker初始化的超时时间设置为240s让我们由充足的时间启动全部的controller角色的Kafka实例。
5. 验证集群
验证集群和之前一样通过在不同主机和Kafka实例上操作生产者消费者在主题中生产或消费信息
1. 创建主题并验证主题存在 2. 生产者生产数据 3. 消费者消费数据 与之前不同的是kraft模式下集群相关元数据存储controller角色在 metadata.log.dir 配置项配置的文件夹下内容如下 具体内容是什么我也不知道也没必要深究。