学生求职网站的需求分析怎么做,个人网页背景图片,网站建设电话着么打,怎么设计图片文章目录 1. Partitioner分区器2. 自定义分区器3. RecordAccumulator数据收集器 1. Partitioner分区器 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#xff0c;中doSend方法#xff0c;记录了生产者将消息发送的流程#xff0c;其中有一步… 文章目录 1. Partitioner分区器2. 自定义分区器3. RecordAccumulator数据收集器 1. Partitioner分区器 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java中doSend方法记录了生产者将消息发送的流程其中有一步就是计算当前消息应该发送往对应Topic哪一个分区
int partition partition(record, serializedKey, serializedValue, cluster);private final Partitioner partitioner;private int partition(ProducerRecordK, V record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {//当record的分区已存在则直接返回这对应了创建Record时可以手动传入partition参数if (record.partition() ! null)return record.partition();// 如果存在partitioner分区器则使用Partitioner.partition方法计算分区数据if (partitioner ! null) {int customPartition partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);if (customPartition 0) {throw new IllegalArgumentException(String.format(The partitioner generated an invalid partition number: %d. Partition number should always be non-negative., customPartition));}return customPartition;}// 如果没有分区器的情况if (serializedKey ! null !partitionerIgnoreKeys) {// hash the keyBytes to choose a partitionreturn BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());} else {return RecordMetadata.UNKNOWN_PARTITION;}}// 利用键的哈希值来选择分区
public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;}2. 自定义分区器
新建类实现Partitioner接口key是字符串数字奇数送到分区0偶数送到分区1 。
public class MyKafkaPartitioner implements Partitioner {Overridepublic int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {// Ensure the key is a non-null stringif (key null || !(key instanceof String)) {throw new IllegalArgumentException(Key must be a non-null String);}// Parse the key as an integerint keyInt;try {keyInt Integer.parseInt((String) key);} catch (NumberFormatException e) {throw new IllegalArgumentException(Key must be a numeric string, e);}// Determine the partition based on the keys odd/even natureif (keyInt % 2 0) {return 1; // Even keys go to partition 2} else {return 0; // Odd keys go to partition 0}}Overridepublic void close() {}Overridepublic void configure(MapString, ? map) {}
}新建一个存在多分区的Topic。 public class KafkaProducerPartitionorTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMapString, Object config new HashMap();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:19092);config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//指定拦截器config.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ValueInterceptorTest.class.getName());//指定分区器config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyKafkaPartitioner.class.getName());KafkaProducerString, String producer new KafkaProducerString, String(config);for (int i 0; i 10; i) {//创建recordProducerRecordString, String record new ProducerRecordString, String(test1,keyi,我是你爹i);//发送recordproducer.send(record);Thread.sleep(500);}//关闭producerproducer.close();}
}配置好PARTITIONER_CLASS_CONFIG后发送消息。
可以分区器成功起作用了。
3. RecordAccumulator数据收集器
通过数据校验后数据从分区器来到数据收集器。
数据收集器的工作机制 队列缓存RecordAccumulator为每个分区维护一个队列。默认情况下每个队列的批次大小buffer size是16KB这个大小可以通过配置参数batch.size来调整。 缓冲区管理 每个分区都有一个或多个批次每个批次包含多条消息。当一个批次填满即达到batch.size或者达到发送条件如linger.ms时间窗口即发送消息前等待的时间时批次会被标记为可发送状态并被传递给Sender线程。 满批次处理 当某个分区的队列中的某个批次大小超过了16KB默认值或满足linger.ms的时间条件RecordAccumulator会将该批次加入到一个待发送的队列中。Sender线程会从待发送队列中获取这些满批次并将其发送到Kafka集群。