昆明城乡建设网站,宁波外贸公司联系方式,网站数据库模板,优秀网站要素Kafka与ZooKeeper
Apache ZooKeeper是一个基于观察者模式的分布式服务管理框架#xff0c;即服务注册中心。同时ZooKeeper还具有存储数据的能力。Kafka的每台服务器作为一个broker注册到ZooKeeper#xff0c;多个broker借助ZooKeeper形成了Kafka集群。同时ZooKeeper会保存一…Kafka与ZooKeeper
Apache ZooKeeper是一个基于观察者模式的分布式服务管理框架即服务注册中心。同时ZooKeeper还具有存储数据的能力。Kafka的每台服务器作为一个broker注册到ZooKeeper多个broker借助ZooKeeper形成了Kafka集群。同时ZooKeeper会保存一些信息例如消息消费的进度Offset等。另外ZooKeeper还负责Kafka集群中Leader的选举。
使用
下载
从官方下载最新版Kafka https://kafka.apache.org/downloads.html 当前最新版本为kafka_2.13-3.4.0。 下载后解压。注意解压路径不能有空格及其他特殊字符。 Kafka中默认自带了ZooKeeper因此不需要单独下载ZooKeeper。 当然也可不使用Kafka自带的ZooKeeper注意配置文件保持一致即可。
配置文件
进入Kafka根目录新建一个 tmp 文件夹然后在其下创建 zookeeper 和 kafka-logs 文件夹分别用于存放ZooKeeper的数据和Kafka的日志。 打开 /config/zookeeper.properties 找到 dataDir/tmp/zookeeper 将其修改为新创建的 zookeeper 文件夹。然后在最后添加配置audit.enabletrue。 打开 /config/server.properties 找到 log.dirs/tmp/kafka-logs 将其修改为新创建的 kafka-logs 文件夹。 注意路径必须使用/不能使用\。 若不修改上述目录则会在磁盘的根目录下自动创建 zookeeper 和 kafka-logs 文件夹。
配置的一致性
ZooKeeper默认的端口为2181Kafka默认的端口为9092。 若要更改Kafka的端口则将listenersPLAINTEXT://:9092配置开放修改其端口值同时要加上hostname即改为listenersPLAINTEXT://localhost:9092。 Kafka配置中默认指定zookeeper.connectlocalhost:2181若使用非默认配置需修改该属性。若有多个ZooKeeper地址可使用,隔开。
运行和关闭
运行
Kafka的运行基于ZooKeeper因此需要在ZooKeeper服务启动后再运行Kafka。 进入Kafka根目录。 首先要启动ZooKeeper。在根目录下打开cmd命令行输入 .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties 然后启动Kafka。在根目录下打开cmd命令行输入 .\bin\windows\kafka-server-start.bat .\config\server.properties Kafka运行后会在根目录下生成 logs 文件夹。
命令行
进入Kafka根目录打开cmd命令行。
topic
新建 .\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1 查看topic列表 .\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092 查看单个topic详情: .\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic topic1 删除topic: .\bin\windows\kafka-topics.bat --delete --bootstrap-server localhost:9092 -topic topic1 group
查看group列表 .\bin\windows\kafka-consumer-groups.bat --list --bootstrap-server localhost:9092 查看group详情: .\bin\windows\kafka-consumer-groups.bat --describe --bootstrap-server localhost:9092 --group test-consumer-group ZooKeeper命令行
若运行的是Kafka自带的ZooKeeper则要进入其命令行
进入Kafka根目录打开cmd命令行。输入命令: .\bin\windows\zookeeper-shell.bat localhost:2181 。
注意地址和端口要正确否则会提示JLine support is disabled。 执行后会打印
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is disabledWATCHER::WatchedEvent state:SyncConnected type:None path:null此时即可输入ZooKeeper命令:
# 查看服务
ls /# 查看0号broker
get /brokers/ids/0关闭
关闭时要先关闭Kafka再关闭ZooKeeper。 关闭Kafka时不能直接关闭命令行窗口这样可能会导致Kafka无法完成对日志文件的解锁于是下次启动Kafka时就会因为日志文件被锁而无法启动成功。正确的关闭方式是运行 /bin/windows/kafka-server-stop.bat 。 关闭ZooKeeper时同样要运行 /bin/windows/zookeeper-server-stop.bat 。
暴力关闭导致异常
如果暴力关闭了Kafka再次启动时遇到提示 ERROR Fatal error during KafkaServer startup. Prepare to shutdown 此时删除根目录下的 logs 文件夹即可。
Spring Boot集成
首先要启动ZooKeeper和Kafka服务。
依赖
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependency配置文件
spring:kafka:bootstrap-servers: localhost:9092 #Kafka的地址producer: # 生产者key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # 消费者key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializergroup-id: test-consumer-group #在Kafka的/config/consumer.properties中查看和修改以上为最基础的配置。可以根据具体需求进行更加详细的配置。
生产者和消费者
对于Kafka必须设置生产者和消费者。
生产者: 生产者负责将消息发送给Kafka。有异步和同步两种方式。消费者: 消费者使用监听的方式接收Kafka的消息。当存在消息时会被及时消费。
生产者调用命令push来发送数据而消费者调用命令pull来拉取数据。注意消费者的数据不是Kafka主动推送的。
生产者
为了方便测试定义一个Controller来接收触发信号并将信息发送给Kafka。
RequestMapping(/producer)
RestController
public class ProducerController {AutowiredKafkaTemplateString, String kafkaTemplate;// 异步发送RequestMapping(/register)public String register(User user) {String message JSON.toJSONString(user);System.out.println(接收到用户信息 message);kafkaTemplate.send(register, message);return OK;}// 同步发送RequestMapping(/register/sync)public String registerSync(User user) throws Exception {String message JSON.toJSONString(user);System.out.println(接收到用户信息 message);ListenableFutureSendResultString, String future kafkaTemplate.send(register, message);// 设置等待时间超出后不再等待返回SendResultString, String result future.get(3, TimeUnit.SECONDS);return result.getProducerRecord().value();}
}使用 KafkaTemplateK, V 来发送消息。其中send()方法第一个参数指定了topics的名称。 若要使用同步发送可对kafkaTemplate.send()的返回结果调用get()方法并在其中设置等待时间。 对于异步发送发送后会立即收到返回结果对于同步发送会一直等待发送结果并返回直到设置的等待时间耗尽。 对于异步发送若想得知发送的最终结果则需要注册一个监听器KafkaTemplate.setProducerListener()来等待回调:
Configuration
public class KafkaListener {private final static Logger logger LoggerFactory.getLogger(KafkaListener.class);AutowiredKafkaTemplate kafkaTemplate;// 配置监听PostConstructprivate void listener() {kafkaTemplate.setProducerListener(new ProducerListenerString, Object() {Overridepublic void onSuccess(ProducerRecordString, Object producerRecord, RecordMetadata recordMetadata) {logger.info(ok,message{}, producerRecord.value());}Overridepublic void onError(ProducerRecordString, Object producerRecord, Exception exception) {logger.error(error!message{}, producerRecord.value());}});}
}消费者
对于消费者需设置一个监听器来监听指定的topics
Component
public class Consumer {KafkaListener(topics register)public void consume(String message) {System.out.println(收到消息 message);User user JSON.parseObject(message, User.class);System.out.println(为 user.getName() 进行账号注册);}
}当Kafka收到消息后监听者的回调方法被触发。
指定分区
Producer发送时指定分区和key值
// 发送时指定0号分区key为test
RequestMapping(/register)
public String register(User user) {String message JSON.toJSONString(user);System.out.println(收到用户信息 message);kafkaTemplate.send(register, 0, test, message);return OK;
}Consumer接收时指定分区:
KafkaListener(topics {register}, topicPattern 0)
public void onMessage(ConsumerRecord?, ? consumerRecord) {Optional? optional Optional.ofNullable(consumerRecord.value());if (optional.isPresent()) {String message (String)optional.get();User user JSON.parseObject(message, User.class);System.out.println(为 user.getName() 进行账号注册);}
}编码器和解码器
在yml中配置了编码器和解码器
spring:kafka:...producer: # 生产者key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: # 消费者key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer...其中:
key-serializer: 生产者key的编码器。value-serializer: 生产者value的编码器。key-deserializer: 消费者key的解码器。value-deserializer: 消费者value的解码器。
编码器和解码器统称序列化其作用是生产者将消息编码为字节流发送消费者接收到字节流再进行解码。 这里使用的是Kafka提供的字符串编码器StringSerializer和字符串解码器StringDeserializer。 Kafka提供了多种编码器包含: StringSerializer、JsonSerializer、BytesSerializer、IntegerSerializer、LongSerializer、ListSerializer、StringOrBytesSerializer等等。对应地也提供了多种解码器包含StringDeserializer、JsonDeserializer、BytesDeserializer、IntegerDeserializer、LongDeserializer、ListDeserializer、StringOrBytesDeserializer等等。 Kafka默认提供的序列化类可满足绝大多数场景。用户也可自定义编码器和解码器。
编码器
编码器需要从Serializer类派生并实现serialize()方法。 以下为StringSerializer源码
package org.apache.kafka.common.serialization;public class StringSerializer implements SerializerString {private String encoding StandardCharsets.UTF_8.name();Overridepublic void configure(MapString, ? configs, boolean isKey) {String propertyName isKey ? key.serializer.encoding : value.serializer.encoding;Object encodingValue configs.get(propertyName);if (encodingValue null)encodingValue configs.get(serializer.encoding);if (encodingValue instanceof String)encoding (String) encodingValue;}Overridepublic byte[] serialize(String topic, String data) {try {if (data null)return null;elsereturn data.getBytes(encoding);} catch (UnsupportedEncodingException e) {throw new SerializationException(Error when serializing string to byte[] due to unsupported encoding encoding);}}
}解码器
编码器需要从Deserializer类派生并实现deserialize()方法。 以下为StringDeserializer源码
package org.apache.kafka.common.serialization;public class StringDeserializer implements DeserializerString {private String encoding StandardCharsets.UTF_8.name();Overridepublic void configure(MapString, ? configs, boolean isKey) {String propertyName isKey ? key.deserializer.encoding : value.deserializer.encoding;Object encodingValue configs.get(propertyName);if (encodingValue null)encodingValue configs.get(deserializer.encoding);if (encodingValue instanceof String)encoding (String) encodingValue;}Overridepublic String deserialize(String topic, byte[] data) {try {if (data null)return null;elsereturn new String(data, encoding);} catch (UnsupportedEncodingException e) {throw new SerializationException(Error when deserializing byte[] to string due to unsupported encoding encoding);}}
}自定义编码器和解码器
现在希望生产者和消费者以Object类型来处理消息
// 编码器类
public class TestSerializer implements Serializer {Overridepublic byte[] serialize(String topic, Object data) {String json JSON.toJSONString(data);return json.getBytes();}
}// 解码器类
public class TestDeserializer implements Deserializer {Overridepublic Object deserialize(String topic, byte[] data) {try {String json new String(data,utf-8);return JSON.parse(json);} catch (UnsupportedEncodingException e) {e.printStackTrace();}return null;}}然后在yml中为value配置自定义的编码器和解码器
spring:kafka:...producer: # 生产者key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: com.example.test.component.TestSerializerconsumer: # 消费者key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: com.example.test.component.TestDeserializer...这样就可以使用自定义解码器了。
topic内分区
每一个topic实际是分成多个区(partition)的这些topic都存储在Kafka内。无论是生产者将消息发送到Kafka还是消费者从Kafka中获取消息都需要明确告知Kafka分区信息是什么。 生产者发送消息时消息要发往哪个分区是根据条件来确定的
若指定分区号则消息直接发到Kafka的指定分区。若未指定分区号但给定了数据key值则消息可对key值取Hashcode自动计算分区。若未指定分区号且未给定数据key值则直接轮循分区。默认方案自定义分区策略。
使用指定分区号及Key值的方式只需要在调用KafkaTemplate.send()时传入对应的参数即可。
自定义分区策略
自定义分区策略即生产者在将消息发给Kafka前先经过自定义的分区器进行分区计算计算出目标分区后再发给Kafka。故而自定义分区器应添加在生产者工程中。 使用自定义分区的流程为
从Partitioner派生重载其partition()方法。在这里自定义分区逻辑。添加一个Configuration类在其中更新KafkaTemplate的属性将自定义的分区器设置进来。正常发送消息。
自定义分区器
public class MyPartitioner implements Partitioner {Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 自定义分区策略。若key以0开头则放入分区0其他放入分区1String keyStr String.valueOf(key);return keyStr.startsWith(0) ? 0 : 1;}Overridepublic void close() {}Overridepublic void configure(MapString, ? map) {}
}添加一个配置类来更新KafkaTemplate的属性:
Configuration
public class KafkaProducerConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;KafkaTemplate kafkaTemplate;PostConstructpublic void setKafkaTemplate() {MapString, Object props new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 将自定义的分区器设置进来props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);this.kafkaTemplate new KafkaTemplateString, String(new DefaultKafkaProducerFactory(props));}public KafkaTemplate getKafkaTemplate(){return kafkaTemplate;}
}然后正常发送消息即可。
SpringBoot手动提交offset
offset默认是自动计算自动提交的。若要对offset进行手动提交流程为
修改配置关闭自动提交。consumer在处理结束时手动提交。
在application.yml中配置了offset自动提交
enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset默认单位为ms)现在添加一个配置类来覆盖yml中的配置
Configuration
public class KafkaConsumerConfig {Value(${spring.kafka.bootstrap-servers})private String bootstrapServers;Beanpublic KafkaListenerContainerFactory? manualKafkaListenerContainerFactory() {MapString, Object configProps new HashMap();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 关闭自动提交configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);ConcurrentKafkaListenerContainerFactoryString, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(new DefaultKafkaConsumerFactory(configProps));/*** AckMode 在ENABLE_AUTO_COMMIT_CONFIG false时生效。所有取值为* RECORD: 每处理一条commit一次* BATCH(默认): 每次poll的时候批量提交一次频率取决于每次poll的调用频率* TIME: 每次间隔ackTime的时间去commit* COUNT: 累积达到ackCount次的ack去commit* COUNT_TIME: ackTime或ackCount哪个条件先满足就commit* MANUAL: listener负责ack但是背后也是批量上去* MANUAL_IMMEDIATE: listner负责ack每调用一次就立即commit*/factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}
}然后在consumer的监听方法中调用Consumer.commitSync()来手动提交offset
/**
* 消费者1
*/
KafkaListener(topics {register})
public void consumer1(Payload String message, ConsumerRecord record, Consumer consumer) {System.out.println(consumer1收到消息并消费 message);// 消息消费结束同步提交偏移量默认为offset1consumer.commitSync();System.out.println(consumer1提交位移);}/**
* 消费者2
*/
KafkaListener(topics {register})
public void consumer2(Payload String message, ConsumerRecord record, Consumer consumer) {System.out.println(consumer2收到消息并消费 message);// 消息消费结束同步提交偏移量手动更改为offset2Maporg.apache.kafka.common.TopicPartition, OffsetAndMetadata currentOffset new HashMap();currentOffset.put(new org.apache.kafka.common.TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() 2));consumer.commitSync(currentOffset);System.out.println(consumer2提交位移);}注意消费者2中手动更改偏移量使用的类为org.apache.kafka.common.TopicPartition。该类这里直接手动引入而没有放在import中是因为该类与注解TopicPartition的类org.springframework.kafka.annotation.TopicPartition同名。如果放在import中则会造成冲突。 上述消费者2为了说明手动更改偏移量的操作而令offset2这样会导致偏移量与实际不符实际开发应按实际情况处理。 如果关闭了偏移量自动提交且在消费者逻辑中没有提交偏移量则会导致偏移量始终不变于是每次消费者拉取的消息都是同一条从而造成消息重复消费。