单位门户网站建设的请示,网站做哪种推广好,做网页引用别的网站的视频,怎么注册wordpress账号分区算法分类
kafka在生产者投递消息时#xff0c;会根据是否有key采取不用策略来获取分区。
存在key时会根据key计算一个hash值#xff0c;然后采用hash%分区数的方式获取对应的分区。
而不存在key时采用随机算法选取分区#xff0c;然后将所有的消息封装到这个batch上直…分区算法分类
kafka在生产者投递消息时会根据是否有key采取不用策略来获取分区。
存在key时会根据key计算一个hash值然后采用hash%分区数的方式获取对应的分区。
而不存在key时采用随机算法选取分区然后将所有的消息封装到这个batch上直到达到限定数量然后才发送出去。
如下图6条消息采用key可能分三次发送到三个不同的分区需要3次网络请求。如果没有key将封住成一个批次发送。这样一次网路请求就可以发送多条消息大大提高了效率。 源码分析
producer根据keyBytes是否有值采用不同的分区策略。有key的计算hash % numPartitions得到分区。 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {if (keyBytes null) {return stickyPartitionCache.partition(topic, cluster);}// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;} 并且kafka在这里做了缓存如果第一次获取到了粘性分区后面会缓存起来。 public int partition(String topic, Cluster cluster) {Integer part indexCache.get(topic);if (part null) {return nextPartition(topic, cluster, -1);}return part;}
没有key的采用stickyPartitionCache的策略这里是分区算法的主要代码。获取所有的availablePartitions然后如果availablePartitions大于1获取一个随机数random然后通过random % availablePartitions.size()的方式获取分区。 ListPartitionInfo availablePartitions cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() 1) {Integer random Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart random % partitions.size();} else if (availablePartitions.size() 1) {newPart availablePartitions.get(0).partition();} else {while (newPart null || newPart.equals(oldPart)) {int random Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart availablePartitions.get(random % availablePartitions.size()).partition();}}
abortForNewBatch表示需要发送到新的批次然后调用onNewBatch获取新的分区。 if (result.abortForNewBatch) {int prevPartition partition;partitioner.onNewBatch(record.topic(), cluster, prevPartition);partition partition(record, serializedKey, serializedValue, cluster);tp new TopicPartition(record.topic(), partition);...public void onNewBatch(String topic, Cluster cluster, int prevPartition) {stickyPartitionCache.nextPartition(topic, cluster, prevPartition);}
在下一个批次发送时会检测是否和上一个分区相同如果相同将会缓存一个新的分区。 // Check that the current sticky partition for the topic is either not set or that the partition that // triggered the new batch matches the sticky partition that needs to be changed.if (oldPart null || oldPart prevPartition) { 总结
为了提升kafka发送消息的速率在对消息顺序没有特殊的要求情况下应该尽量避免设置消息的key这样可以提交发送消息的吞吐量。