当前位置: 首页 > news >正文

网站seo什么意思软件推荐网站

网站seo什么意思,软件推荐网站,遵义市官网,西部数码网站管理助手 2008一、前提 kafka的版本是 2.6.2 一般我们消费kafka的时候是指定消费组#xff0c;是不会指定消费组内部消费kafka各个分区的分配策略#xff0c;但是我们也可以指定消费策略#xff0c;通过源码发现#xff0c;我们可以有三种分区策略#xff1a; RangeAssignor (默认是不会指定消费组内部消费kafka各个分区的分配策略但是我们也可以指定消费策略通过源码发现我们可以有三种分区策略 RangeAssignor (默认RoundRobinAssignorStickyAssignor 指定消费分区策略 props.put(partition.assignment.strategy, org.apache.kafka.clients.consumer.RoundRobinAssignor);kafka消费分区策略的分区入口类是ConsumerCoordinator的performAssignment方法 Overrideprotected MapString, ByteBuffer performAssignment(String leaderId,String assignmentStrategy,ListJoinGroupResponseData.JoinGroupResponseMember allSubscriptions) {//获取分区策略ConsumerPartitionAssignor assignor lookupAssignor(assignmentStrategy);//存储消费组订阅的所有topicSetString allSubscribedTopics new HashSet();//存储消费组内各个消费者对应的基本信息比如元数据MapString, Subscription subscriptions new HashMap();MapString, ListTopicPartition ownedPartitions new HashMap();for (JoinGroupResponseData.JoinGroupResponseMember memberSubscription : allSubscriptions) {Subscription subscription ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(memberSubscription.metadata()));subscription.setGroupInstanceId(Optional.ofNullable(memberSubscription.groupInstanceId()));subscriptions.put(memberSubscription.memberId(), subscription);allSubscribedTopics.addAll(subscription.topics());ownedPartitions.put(memberSubscription.memberId(), subscription.ownedPartitions());}//具体实现在类 AbstractPartitionAssignor 各个分区算法的抽象类MapString, Assignment assignments assignor.assign(metadata.fetch(), new GroupSubscription(subscriptions)).groupAssignment();...log.info(Finished assignment for group at generation {}: {}, generation().generationId, assignments);...return groupAssignment;}AbstractPartitionAssignor 的 assign() //各个分区策略具体的算法public abstract MapString, ListTopicPartition assign(MapString, Integer partitionsPerTopic,MapString, Subscription subscriptions);Overridepublic GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) {MapString, Subscription subscriptions groupSubscription.groupSubscription();SetString allSubscribedTopics new HashSet();for (Map.EntryString, Subscription subscriptionEntry : subscriptions.entrySet())allSubscribedTopics.addAll(subscriptionEntry.getValue().topics());MapString, Integer partitionsPerTopic new HashMap();for (String topic : allSubscribedTopics) {Integer numPartitions metadata.partitionCountForTopic(topic);if (numPartitions ! null numPartitions 0)partitionsPerTopic.put(topic, numPartitions);elselog.debug(Skipping assignment for topic {} since no metadata is available, topic);}/构建参数 partitionsPerTopicmap表示各个topic有多少个分区//subscriptions map表示消费者相关信息消费者id消费者对应的主题MapString, ListTopicPartition rawAssignments assign(partitionsPerTopic, subscriptions);// this class maintains no user data, so just wrap the resultsMapString, Assignment assignments new HashMap();for (Map.EntryString, ListTopicPartition assignmentEntry : rawAssignments.entrySet())assignments.put(assignmentEntry.getKey(), new Assignment(assignmentEntry.getValue()));return new GroupAssignment(assignments);}下面说明下RangeAssignor与RoundRobinAssignor两种分区策略的区别 二、RangeAssignor 分区策略 RangeAssignor是默认分配的策略 public class RangeAssignor extends AbstractPartitionAssignor {Overridepublic String name() {return range;}private MapString, ListMemberInfo consumersPerTopic(MapString, Subscription consumerMetadata) {MapString, ListMemberInfo topicToConsumers new HashMap();for (Map.EntryString, Subscription subscriptionEntry : consumerMetadata.entrySet()) {String consumerId subscriptionEntry.getKey();MemberInfo memberInfo new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());for (String topic : subscriptionEntry.getValue().topics()) {put(topicToConsumers, topic, memberInfo);}}return topicToConsumers;}Overridepublic MapString, ListTopicPartition assign(MapString, Integer partitionsPerTopic,MapString, Subscription subscriptions) {//获取主题对应的消费者列表//partitionsPerTopic 主题对应分区个数//subscriptions 消费者的信息消费者id消费者对应的主题消费者实例MapString, ListMemberInfo consumersPerTopic consumersPerTopic(subscriptions);//打印输出可以看到消费组group-one有两个消费者 consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3 和 consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd//其中 consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd 消费了 test_topic_partition_one 和 test_topic_partition_two// consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3 只消费了 test_topic_partition_one// consumersPerTopic: {test_topic_partition_one[MemberInfo [member.id: consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3, group.instance.id: {}], MemberInfo [member.id: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd, group.instance.id: {}]], test_topic_partition_two[MemberInfo [member.id: consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd, group.instance.id: {}]]}MapString, ListTopicPartition assignment new HashMap();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList());for (Map.EntryString, ListMemberInfo topicEntry : consumersPerTopic.entrySet()) {//获取topicString topic topicEntry.getKey();//获取topic对应的消费者ListMemberInfo consumersForTopic topicEntry.getValue();//获取topic的分区数Integer numPartitionsForTopic partitionsPerTopic.get(topic);if (numPartitionsForTopic null)continue;Collections.sort(consumersForTopic);//计算每个消费者至少消费几个分区int numPartitionsPerConsumer numPartitionsForTopic / consumersForTopic.size();//计算剩余几个分区int consumersWithExtraPartition numPartitionsForTopic % consumersForTopic.size();//获取主题分区列表ListTopicPartition partitions AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for (int i 0, n consumersForTopic.size(); i n; i) {int start numPartitionsPerConsumer * i Math.min(i, consumersWithExtraPartition);//可以看到前面的消费者会多分配一个分区int length numPartitionsPerConsumer (i 1 consumersWithExtraPartition ? 0 : 1);//计算每个消费者对应的分区列表可以看到前面的消费者会多分配一个分区assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start length));}}return assignment;} }举例说明构建消费组下两个消费者 test_topic_partition_one和test_topic_partition_two都是9个分区 进程一 props.put(group.id, group-one);props.put(auto.offset.reset, latest);KafkaConsumerString, byte[] consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(test_topic_partition_one, test_topic_partition_two));进程二 props.put(group.id, group-one);props.put(auto.offset.reset, latest);KafkaConsumerString, byte[] consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(test_topic_partition_one));通过上面的分配算法可以得到 消费者consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd消费的分区为 test_topic_partition_one-0, test_topic_partition_one-1, test_topic_partition_one-2, test_topic_partition_one-3, test_topic_partition_one-4, test_topic_partition_two-0, test_topic_partition_two-1, test_topic_partition_two-2, test_topic_partition_two-3, test_topic_partition_two-4, test_topic_partition_two-5, test_topic_partition_two-6, test_topic_partition_two-7, test_topic_partition_two-8消费者consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为 test_topic_partition_one-5, test_topic_partition_one-6, test_topic_partition_one-7, test_topic_partition_one-8如果进程二也消费两个主题则对应的关系变成 通过上面的分配算法可以得到 消费者consumer-group-one-1-11580834-fc23-468e-ae11-edbc3c4a74bd消费的分区为 test_topic_partition_one-0, test_topic_partition_one-1, test_topic_partition_one-2, test_topic_partition_one-3, test_topic_partition_one-4, test_topic_partition_two-0, test_topic_partition_two-1, test_topic_partition_two-2, test_topic_partition_two-3, test_topic_partition_two-4,消费者consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为 test_topic_partition_one-5, test_topic_partition_one-6, test_topic_partition_one-7, test_topic_partition_one-8, test_topic_partition_two-5, test_topic_partition_two-6, test_topic_partition_two-7, test_topic_partition_two-8可以看到第一个消费者比第二个消费者多消费一个test_topic_partition_one的分区而且是连续的。同时可以看到分类是按照topic粒度区分的也就是每个消费者消费一个topic的分区与其他topic是无关的。可以会导致第一个实例运行压力较大的问题。 三、RoundRobinAssignor 分区策略 public class RoundRobinAssignor extends AbstractPartitionAssignor {Overridepublic MapString, ListTopicPartition assign(MapString, Integer partitionsPerTopic,MapString, Subscription subscriptions) {MapString, ListTopicPartition assignment new HashMap();//存储消费组下所有的消费者构建两个消费者// 其中一个consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5// 另一个consumer-group-one-1-d227d230-8adc-4d4e-a092-77b63c07855aListMemberInfo memberInfoList new ArrayList();for (Map.EntryString, Subscription memberSubscription : subscriptions.entrySet()) {assignment.put(memberSubscription.getKey(), new ArrayList());memberInfoList.add(new MemberInfo(memberSubscription.getKey(),memberSubscription.getValue().groupInstanceId()));}//排序后的消费者CircularIteratorMemberInfo assigner new CircularIterator(Utils.sorted(memberInfoList));for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {final String topic partition.topic();//轮询指定消费者的分区while (!subscriptions.get(assigner.peek().memberId).topics().contains(topic)) {assigner.next();}assignment.get(assigner.next().memberId).add(partition);}return assignment;}//获取排序后的所有主题分区private ListTopicPartition allPartitionsSorted(MapString, Integer partitionsPerTopic,MapString, Subscription subscriptions) {SortedSetString topics new TreeSet();for (Subscription subscription : subscriptions.values())topics.addAll(subscription.topics());ListTopicPartition allPartitions new ArrayList();for (String topic : topics) {Integer numPartitionsForTopic partitionsPerTopic.get(topic);if (numPartitionsForTopic ! null)allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));}return allPartitions;}Overridepublic String name() {return roundrobin;} }举例说明构建消费组下两个消费者 test_topic_partition_one和test_topic_partition_two都是9个分区 进程一 props.put(group.id, group-one);props.put(auto.offset.reset, latest);//指定轮询策略props.put(partition.assignment.strategy, org.apache.kafka.clients.consumer.RoundRobinAssignor);KafkaConsumerString, byte[] consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(test_topic_partition_one, test_topic_partition_two));props.put(group.id, group-one);props.put(auto.offset.reset, latest);//指定轮询策略props.put(partition.assignment.strategy, org.apache.kafka.clients.consumer.RoundRobinAssignor);KafkaConsumerString, byte[] consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(test_topic_partition_one));通过上面的分配算法可以得到 消费者consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5消费的分区为 test_topic_partition_one-0, test_topic_partition_one-2, test_topic_partition_one-4, test_topic_partition_one-6, test_topic_partition_one-8, test_topic_partition_two-0, test_topic_partition_two-1, test_topic_partition_two-2, test_topic_partition_two-3, test_topic_partition_two-4, test_topic_partition_two-5, test_topic_partition_two-6, test_topic_partition_two-7, test_topic_partition_two-8消费者consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为 test_topic_partition_one-1, test_topic_partition_one-3, test_topic_partition_one-5, test_topic_partition_one-7可以看到test_topic_partition_one分区是轮流的分配给两个消费者的 对应的日志 2024-08-19 14:28:34 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator Line:626] [Consumer clientIdconsumer-group-one-1, groupIdgroup-one] Finished assignment for group at generation 44: {consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5Assignment(partitions[test_topic_partition_one-0, test_topic_partition_one-2, test_topic_partition_one-4, test_topic_partition_one-6, test_topic_partition_one-8, test_topic_partition_two-0, test_topic_partition_two-1, test_topic_partition_two-2, test_topic_partition_two-3, test_topic_partition_two-4, test_topic_partition_two-5, test_topic_partition_two-6, test_topic_partition_two-7, test_topic_partition_two-8]), consumer-group-one-1-d227d230-8adc-4d4e-a092-77b63c07855aAssignment(partitions[test_topic_partition_one-1, test_topic_partition_one-3, test_topic_partition_one-5, test_topic_partition_one-7])}如果进程二也消费两个主题则对应的关系变成 消费者consumer-group-one-1-6c946240-3ffc-4bba-806d-7d7a0ccc1ad5消费的分区为 test_topic_partition_one-0, test_topic_partition_one-2, test_topic_partition_one-4, test_topic_partition_one-6, test_topic_partition_one-8, test_topic_partition_two-1, test_topic_partition_two-3, test_topic_partition_two-5, test_topic_partition_two-7消费者consumer-group-one-1-504e90bc-c1cc-45d5-a687-5e4f98ee48c3消费的分区为 test_topic_partition_one-1, test_topic_partition_one-3, test_topic_partition_one-5, test_topic_partition_one-7 test_topic_partition_two-0, test_topic_partition_two-2, test_topic_partition_two-4, test_topic_partition_two-6, test_topic_partition_two-8也就是会把所有的分区轮流分给两个消费者所以这种模式就和主题个数与主题分区有关了。
http://www.hkea.cn/news/14564819/

相关文章:

  • 网站建设托管预算清单小贷做网站
  • 网站建设行业数据特别好的企业网站程序
  • 如何建一个公司网站怎么做辅助发卡网站
  • 移动论坛网站模板济宁做网站公司
  • nas怎么做自己的网站简历设计网官网
  • apple私人免费网站怎么下载群晖 wordpress加载慢
  • 花店网站建设毕设介绍全定制网站开发
  • 网站的pv uv域名服务器在哪个国家
  • 唐山网站建设哪家优惠广州购物网站建设
  • 微信做一元云购网站wordpress去除文章作者
  • 网站建设技术入股协议软件外包平台的服务商
  • 建设网站素材简历模板免费网页
  • 做h5网站设计游戏开发者大会
  • 学做美食看哪个网站产品互联网营销推广
  • 常德市建设局网站施工企业会计科目
  • 马蜂窝网络营销网站建设怎么注册一个企业邮箱
  • 网站泛解析线上购买链接
  • 网站怎么做架构图苏州网站建设集团
  • 大气的个人网站好的app设计网站
  • 网站页面统计代码是什么意思python 营销型网站建设
  • 番禺做网站要多少钱做一个静态网站要多少钱
  • 公司建站服务网站漏洞解决办法
  • 整站优化提升排名网络游戏排行榜前十名大型网络游戏
  • 最新免费下载ppt模板网站企业公示信息查询系统江西
  • 平度做网站公司网站技术解决方案
  • 一个网站 两个数据库网站横条广告
  • 网站建设 金疙瘩计划软件ui
  • 成都手机号码网站建设wordpress底部版权插件
  • asp网站源代码下载王烨晨
  • 襄阳建设网站河北住房和城乡建设厅网站首