建设一个普通的网站需要多少钱,信息服务平台有哪些网站,erp软件是干嘛的,html5网站正在建设中1、基本概念 1. 消息#xff1a; Kafka是一个分布式流处理平台#xff0c;它通过消息进行数据的传输和存储。消息是Kafka中的基本单元#xff0c;可以包含任意类型的数据。
2. 生产者#xff08;Producer#xff09;#xff1a; 生产者负责向Kafka主题发送消息。它将消息…1、基本概念 1. 消息 Kafka是一个分布式流处理平台它通过消息进行数据的传输和存储。消息是Kafka中的基本单元可以包含任意类型的数据。
2. 生产者Producer 生产者负责向Kafka主题发送消息。它将消息发布到指定的主题可以按照自定义的逻辑生成消息并决定消息发送的频率和顺序。
3. 消费者Consumer 消费者从Kafka主题订阅并接收消息。它可以以不同的方式消费消息如批量拉取、实时流式处理或订阅特定的消息主题。
4. 主题Topic 主题是Kafka中消息的分类标签用于组织消息。每个主题可以有多个生产者和多个消费者。主题通常与特定的业务领域或数据类型相关联。
5. 分区Partition 主题可以被分割成多个分区每个分区都是一个有序且持久化的消息队列。分区允许Kafka对消息进行水平扩展并提供了并行处理和负载均衡的能力。
6. 偏移量Offset 偏移量是消息在分区中的唯一标识符用于表示消息在分区内的顺序位置。消费者可以跟踪偏移量来记录已经读取的消息以便实现精确的消费位置控制。
7. 消费者组Consumer Group 消费者组是一组具有相同逻辑的消费者它们共同消费一个或多个主题中的消息。消费者组允许Kafka进行水平扩展和负载均衡在该组内的每个消费者负责处理不同的分区。
8. 副本Replication Kafka使用副本机制来提供数据冗余和高可用性。每个分区都可以配置多个副本这些副本保持分区数据的一致性并可以替代主副本以提供故障恢复功能。
2、安装部署
参考 https://juejin.cn/post/7158663198411849741
https://www.cnblogs.com/linjiqin/p/13196347.html
3、常用命令
配置文件解析cat server.properties
#broker 的全局唯一编号不能重复
broker.id0
#删除 topic 功能使能
delete.topic.enabletrue
#处理网络请求的线程数量
num.network.threads3
#用来处理磁盘 IO 的现成数量
num.io.threads8
#发送套接字的缓冲区大小
socket.send.buffer.bytes102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes102400
#请求套接字的缓冲区大小
socket.request.max.bytes104857600 #kafka 运行日志存放的路径
log.dirs/opt/module/kafka/logs
#topic 在当前 broker 上的分区个数
num.partitions1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir1
#segment 文件保留的最长时间超时将被删除
log.retention.hours168
#配置连接 Zookeeper 集群地址
zookeeper.connecthadoop102:2181,hadoop103:2181,hadoop104:2181
启动/关闭 kafka
cd /usr/local/kafka/kafka_2.12-3.5.0/bin/
bin/kafka-server-start.sh config/server.propertiesbin/kafka-server-stop.sh stop验证kafka是否可以使用仍在bin目录下
运行kafka生产者发送消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic sun运行kafka消费者接收消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sun --from-beginning4、常用操作API
创建生产者并发送消息
from kafka import KafkaProducer
import time
# 创建生产者
producer KafkaProducer(bootstrap_serverslocalhost:9092)# 发送单条消息
producer.send(my_topic, bHello, Kafka!)# Kafka的发送实际上是异步的
# 生产者在发送消息之后并不会等待确认消息是否已经成功到达Kafka broker
# 而是立即继续执行下一行代码或退出程序
# 在生产者发送完消息后给消费者足够的时间来连接到Kafka broker并订阅主题# 等待消费者订阅主题
time.sleep(2) # 延迟2秒钟给消费者足够的时间连接到Kafka并订阅主题# 发送多条消息
messages [bMessage 1, bMessage 2, bMessage 3]
for message in messages:producer.send(my_topic, message)
time.sleep(2) # 延迟2秒钟给消费者足够的时间连接到Kafka并订阅主题
创建消费者并订阅主题并消费消息
from kafka import KafkaConsumer# 创建消费者
consumer KafkaConsumer(my_topic, bootstrap_serverslocalhost:9092)# 消费消息
for message in consumer:print(message.value.decode())
指定消费者组和自动提交偏移量
from kafka import KafkaConsumer# 创建消费者并指定消费者组和自动提交偏移量
consumer KafkaConsumer(my_topic, group_idmy_consumer_group,bootstrap_serverslocalhost:9092,enable_auto_commitTrue)# 消费消息
for message in consumer:print(message.value.decode())
指定消费者组和自动提交偏移量
为什么需要指定消费者组呢
在Kafka中消费者组是一组消费者的逻辑名称它们共同协作来消费一个或多个主题中的消息。通过将消费者组绑定到特定主题上Kafka能够提供高可用性、负载均衡和容错能力。
指定消费者组有以下几个原因
负载均衡 当多个消费者以相同的消费者组订阅同一个主题时Kafka会自动分配分区给每个消费者从而实现负载均衡。每个消费者只处理被分配的分区这样可以确保所有分区被均匀地消费。容错能力 如果有消费者发生故障或离线指定消费者组可以确保其他消费者接管该消费者组失去的分区从而实现容错能力。这意味着即使某些消费者不可用消息仍然可以被处理。消费者协作 消费者组允许多个消费者协同工作以实现更高的消费并行度。每个消费者可以独立地处理其分配的分区并且可以扩展系统的整体处理能力。
需要注意的是如果您没有为消费者指定消费者组则它将成为一个独立的消费者。这种情况下每个消费者将独立地消费所有分区中的消息而不会共享负载或具备容错能力。
因此在大多数情况下为了实现负载均衡、容错和提高处理能力您应该指定消费者组尤其是在需要同时处理大量消息或要求高可用性的场景中。如果您只需要简单地消费主题中的消息而不关注这些特性那么可以选择不指定消费者组。
手动提交偏移量
from kafka import KafkaConsumer# 创建消费者并禁用自动提交偏移量
consumer KafkaConsumer(my_topic, group_idmy_consumer_group,bootstrap_serverslocalhost:9092,enable_auto_commitFalse)# 消费消息并手动提交偏移量
for message in consumer:print(message.value.decode())consumer.commit()
自动提交偏移量和手动提交偏移量有什么区别呢
自动提交偏移量Auto Commit Offset和手动提交偏移量Manual Commit Offset是两种不同的消费者偏移量管理方式。
自动提交偏移量
在自动提交模式下消费者会定期自动将已消费的消息偏移量提交给Kafka。消费者无需显式调用提交偏移量的方法Kafka会在后台自动处理。自动提交偏移量可以简化代码减少了手动提交的复杂性。然而自动提交偏移量可能会导致一些问题。例如如果消费者在处理消息之前发生故障那么已经消费但尚未提交的偏移量将丢失造成消息重复或丢失。
手动提交偏移量
在手动提交模式下消费者需要显式地调用提交偏移量的方法将已消费的消息偏移量提交给Kafka。手动提交偏移量提供了更好的控制能力可以确保消息的准确处理和可靠提交。消费者可以在适当的时机调用commit()方法来提交偏移量。通常在成功处理消息后再进行提交是一个常见的模式。手动提交偏移量需要额外的代码来管理和处理偏移量的提交但它提供了更高的灵活性和可靠性。
选择使用自动提交偏移量还是手动提交偏移量取决于具体的使用场景和需求。如果您的应用程序对消息处理的准确性和可靠性要求较高或者需要更精细的控制以避免重复消费或消息丢失那么手动提交偏移量可能更适合。否则自动提交偏移量可以提供一种简化的方式来管理偏移量尤其在简单的消费者应用中很常见。
手动提交偏移量与自动提交偏移量在性能方面可能存在一些差异但这取决于具体的使用情况和配置。
性能方面的考虑
提交频率 自动提交偏移量会定期提交偏移量到Kafka服务器默认情况下是每隔一段时间提交一次。相比之下手动提交偏移量可以根据应用程序的需求选择何时提交可以控制提交的频率。如果手动提交偏移量过于频繁可能会影响性能。网络延迟 手动提交偏移量需要与Kafka服务器进行通信来提交偏移量。如果手动提交偏移量的操作导致频繁的网络调用而且网络延迟较高可能会对性能产生一定的影响。消息处理时间 如果消息处理时间很长手动提交偏移量可能会在处理消息之前进行提交以保证消息处理的可靠性。然而这样也会增加提交偏移量的开销可能降低整体性能。
需要注意的是性能差异通常是微小的并且在大多数情况下不会成为主要限制因素。如果性能是一个关键问题可以根据实际情况进行测试和优化。
此外可以通过调整参数来改善性能例如增加自动提交的间隔时间、批量提交偏移量等。使用合适的配置和优化技术可以平衡性能和可靠性之间的权衡。
总而言之手动提交偏移量可能会稍微影响性能但仍然取决于具体的使用情况和配置。对于大多数应用程序而言差异通常是可以接受的并且可以根据实际需求进行调整和优化。
查看当前有哪些topic
from kafka import KafkaAdminClient# 创建AdminClient连接到Kafka集群
admin_client KafkaAdminClient(bootstrap_serverslocalhost:9092)# 获取主题列表
topic_list admin_client.list_topics()# 打印主题列表
print(topic_list)# [my_topic, sun, __consumer_offsets]
# __consumer_offsets是Kafka中的一个系统内置主题
# 这个特殊的主题用于存储消费者组的偏移量offsets
# 以跟踪消费者在每个分区中读取消息的位置
# __consumer_offsets主题的目的是为了支持Kafka的消费者组功能
# 当消费者组启用自动提交偏移量时Kafka会将消费者组的偏移量信息存储在__consumer_offsets主题中
# 以便能够在重平衡、故障恢复等情况下为消费者提供正确的偏移量。