菏泽兼职网站建设,读书网网站建设策划书,wordpress手机发布文章,绍兴网页设计目录 一、Kafka
二、发送端#xff08;生产者#xff09;
三、接收端#xff08;消费者#xff09;
四、其他操作 一、Kafka
Apache Kafka 是一个开源流处理平台#xff0c;由 LinkedIn 开发#xff0c;并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构…
目录 一、Kafka
二、发送端生产者
三、接收端消费者
四、其他操作 一、Kafka
Apache Kafka 是一个开源流处理平台由 LinkedIn 开发并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构建实时的数据流和流式处理应用程序它以高吞吐量、可扩展性和容错性著称。
kafka-python 是一个用 Python 编写的 Apache Kafka 客户端库。
安装命令如下
pip install kafka-python
二、发送端生产者
自动创建test主题并每隔一秒发送一条数据示例代码如下
from kafka import KafkaProducer
import json
import time# Kafka服务器地址
bootstrap_servers [localhost:9092]# 创建KafkaProducer实例
producer KafkaProducer(bootstrap_serversbootstrap_servers)# 发送消息的函数
def send_message(topic, message):# 将消息转换为字节producer.send(topic, json.dumps(message).encode(utf-8))producer.flush()if __name__ __main__:# 创建test主题topic test# 发送消息i 1while True:message {num: i, msg: fHello Kafka {i}}send_message(topic, message)i 1time.sleep(1)三、接收端消费者
代码如下
from kafka import KafkaConsumer
import json# Kafka服务器地址
bootstrap_servers [localhost:9092]# 创建KafkaConsumer实例
consumer KafkaConsumer(test,bootstrap_serversbootstrap_servers,auto_offset_resetlatest, # 从最新的消息开始消费# auto_offset_resetearliest, # 从最早的offset开始消费enable_auto_commitTrue, # 自动提交offsetgroup_idmy-group # 消费者组ID
)# 消费消息
for message in consumer:# 将接收到的消息解码并转换为字典message json.loads(message.value.decode(utf-8))print(fReceived message: {message}) 消费者参数如下
1、auto_offset_reset 该参数指定了当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时例如数据被删除了消费者应从何处开始读取数据。 可选值earliest从最早的记录开始消费即从分区日志的开始处开始。latest从最新的记录开始消费即从分区日志的末尾开始。默认none如果没有为消费者指定初始偏移量就抛出一个异常。
2、enable_auto_commit
该参数指定了消费者是否周期性地提交它所消费的偏移量。自动提交偏移量可以简化消费者的使用但可能有重复消费或数据丢失的风险。禁用自动提交可以更精确地控制偏移量的提交时机通常在确保消息处理成功后才提交偏移量。 可选值true自动提交偏移量。默认false不自动提交偏移量需要手动调用commitSync()或commitAsync()来提交偏移量。
3、group_id
该参数用于指定消费者所属的消费组。同一个消费组的消费者将共同消费一个主题的不同分区而不同消费组的消费者可以独立地消费消息互不影响。这对于实现负载均衡和故障转移很有用。 类型字符串必须指定
四、其他操作
list_topics()获取主题元数据。
create_topics()创建新主题。
delete_topics()删除主题。
from kafka.admin import KafkaAdminClient, NewTopic# 获取主题元数据
admin_client KafkaAdminClient(bootstrap_serverslocalhost:9092, client_idtest)
topics admin_client.list_topics()
print(topics)# 创建主题
new_topic NewTopic(nametest-topic, num_partitions3, replication_factor1)
admin_client.create_topics(new_topics[new_topic], validate_onlyFalse)# 删除主题
admin_client.delete_topics(topics[test-topic])