当前位置: 首页 > news >正文

建站 哪个网站系统好用淮安企业网站制作

建站 哪个网站系统好用,淮安企业网站制作,二维码生成器网站源码,作图软件免费先看有哪些send方法 首先说红圈的 有3个红圈。归类成3种发送方式。假设前提条件#xff0c;发送的topic#xff0c;有3个broker#xff0c;每个broker总共4个write队列#xff0c;总共有12个队列。 普通发送。负载均衡12个队列。指定超时时间指定MessageQueue,发送#…先看有哪些send方法 首先说红圈的 有3个红圈。归类成3种发送方式。假设前提条件发送的topic有3个broker每个broker总共4个write队列总共有12个队列。 普通发送。负载均衡12个队列。指定超时时间指定MessageQueue,发送指定超时时间指定selector器指定特定参数指定超时时间。一般用于局部有序比如相同userId的到同一个队列 默认超时时间时3秒 再说蓝圈 sendDefaultImpl 负载均衡的方式选择队列。然后调sendKernelImplsendSelectImpl 指定队列selector和arg的方式选择队列。然后调sendKernelImplsendKernelImpl 最核心的方式。这里已经明确队列做真实的消息发送 很明显只需要简单解读sendDefaultImpl和sendSelectImpl如何选择队列。然后重点在于查看sendKernelImpl方法实现 sendDefaultImpl选择队列分析 先看源码 private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID random.nextLong();long beginTimestampFirst System.currentTimeMillis();long beginTimestampPrev beginTimestampFirst;long endTimestamp beginTimestampFirst;TopicPublishInfo topicPublishInfo this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo ! null topicPublishInfo.ok()) {boolean callTimeout false;MessageQueue mq null;Exception exception null;SendResult sendResult null;int timesTotal communicationMode CommunicationMode.SYNC ? 1 this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times 0;String[] brokersSent new String[timesTotal];for (; times timesTotal; times) {String lastBrokerName null mq ? null : mq.getBrokerName();MessageQueue mqSelected this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected ! null) {mq mqSelected;brokersSent[times] mq.getBrokerName();try {beginTimestampPrev System.currentTimeMillis();if (times 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime beginTimestampPrev - beginTimestampFirst;if (timeout costTime) {callTimeout true;break;}sendResult this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() ! SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl 第一步通过topic查找路由信息tryToFindTopicPublishInfo 先从内存中获取。内存是DefaultMQProducerImpl#topicPublishInfoTable 如果内存没有则从nameserver获取 org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String) 内存是什么时候添加的呢是有定时器任务更新的。详情看我写的文章rocketmq-push模式-消费侧重平衡-类流程图分析 第二步、设定默认重试3次包含首次,选择topic的其中一个队列 org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName null) {return selectOneMessageQueue();} else {for (int i 0; i this.messageQueueList.size(); i) {int index this.sendWhichQueue.incrementAndGet();int pos Math.abs(index) % this.messageQueueList.size();if (pos 0)pos 0;MessageQueue mq this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();} }可以发现topic对应的TopicPublishInfo维护者一个ThreadLocalIndex对象。 每个线程先会获取一个index然后对index取模得到某一个队列。 这意味着sendDefaultImpl中队列的负载均衡是线程独立的。每个线程维护着自己的index每发送一次index1。 public int incrementAndGet() {Integer index this.threadLocalIndex.get();if (null index) {index Math.abs(random.nextInt());this.threadLocalIndex.set(index);}this.threadLocalIndex.set(index);return Math.abs(index POSITIVE_MASK);}第三步、选择完MessageQueue后调用sendKernelImpl发送消息 sendSelectImpl选择队列分析 先看源码 private SendResult sendSelectImpl(Message msg,MessageQueueSelector selector,Object arg,final CommunicationMode communicationMode,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime System.currentTimeMillis();this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);TopicPublishInfo topicPublishInfo this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo ! null topicPublishInfo.ok()) {MessageQueue mq null;try {ListMessageQueue messageQueueList mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList());Message userMessage MessageAccessor.cloneMessage(msg);String userTopic NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace());userMessage.setTopic(userTopic);mq mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg));} catch (Throwable e) {throw new MQClientException(select message queue threw exception., e);}long costTime System.currentTimeMillis() - beginStartTime;if (timeout costTime) {throw new RemotingTooMuchRequestException(sendSelectImpl call timeout);}if (mq ! null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);} else {throw new MQClientException(select message queue return null., null);}}validateNameServerSetting();throw new MQClientException(No route info for this topic, msg.getTopic(), null);}org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl 第一步通过topic查找路由信息tryToFindTopicPublishInfo。分析同上 第二步通过MessageQueueSelector找出发送的MessageQueue MessageQueueSelector的实现方式可以自定义。提供了2种 SelectMessageQueueByRandom 随机一个 SelectMessageQueueByHash 根据arg的hashcode取模一个。适合局部有序 public class SelectMessageQueueByHash implements MessageQueueSelector {Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {int value arg.hashCode() % mqs.size();if (value 0) {value Math.abs(value);}return mqs.get(value);} }第三步、选择完MessageQueue后调用sendKernelImpl发送消息 sendKernelImpl发送分析 org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl 第一步、通过MessageQueue获取对应的master节点地址 第二步、设置消息的唯一id。详情看以下实现。明显是客户端生成的由于不是分布式唯一ID的创建方式有点怀疑会重复。后续查看 org.apache.rocketmq.common.message.MessageClientIDSetter#createUniqID 第三步、对消息body做消息压缩 第四步、判断该消息是否是事务消息。给sysFlag位标志变量加标志 第五步、发送前可做一些自定义的检查CheckForbiddenHook、SendMessageHook 第六步、构建SendMessageRequestHeader requestHeader将msg的一些内容设置到header上 第七部、根据发送模式communicationMode调用不同的sendMessage方法 org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage switch (communicationMode) {case ASYNC:Message tmpMessage msg;boolean messageCloned false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage MessageAccessor.cloneMessage(msg);messageCloned true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage MessageAccessor.cloneMessage(msg);messageCloned true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync System.currentTimeMillis() - beginStartTime;if (timeout costTimeAsync) {throw new RemotingTooMuchRequestException(sendKernelImpl call timeout);}sendResult this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync System.currentTimeMillis() - beginStartTime;if (timeout costTimeSync) {throw new RemotingTooMuchRequestException(sendKernelImpl call timeout);}sendResult this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break; }第八步、最终会调用NettyRemotingClient的发送方法 SYNC org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync ONEWAY: org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway ASYNC: org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeAsync 总结 product的发送有几种API模式其实目的都是为了选择MessageQueue 默认的发送是根据topic的队列做负载均衡的方式topicPublishInfo内部维护着ThreadLocalIndex对象做线程级别的负载均衡。而且默认都3次重试机会意味可以选择不同队列做发送指定messageQueue是调用方明确知道发送的MessageQueue这种失败不会做重试指定MessageQueueSelector等这种是通过传入的参数计算出对应的MessageQueue这种失败不会做重试适合作为局部有序的发送方式 选择好队列后就会调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl方法主要是构建SendMessageRequestHeader执行自定义的发送before和after的处理。 sendKernelImpl最终会调用NettyRemotingClient提供的接口分别处理SYNC、ONEWAY、ASYNC的三种模式
http://www.hkea.cn/news/14378118/

相关文章:

  • 西安网站开发公司定制开发公司工程项目管理总结经验教训
  • 广州市天河区工程建设监督网站网页设计素材主题
  • 山东省建设部网站网站降权不更新文章可以吗
  • 贵州网站建设gzzctyi绍兴建设网站
  • 重庆网站建设重庆网站设计注册会计师报名时间
  • 全球最热门网站百度网站没收录
  • 生成网站 目录个人网站风格设计
  • 网站内链检测工具如何开无货源网店
  • 网站开发+职位描述士兵突击网站怎么做
  • 网站模板内容怎么改学校网站建设推进会
  • 贵州省建设厅官方网站官网硬件外包平台
  • 哪里做网站的从事网站开发方向
  • 湖北网站推广WordPress协会学院主题模板
  • 金猪云高端网站建设可以做100张照片的软件
  • 汉川建设局网站wordpress添加flash
  • 樟木头镇做网站网站建立不安全怎么设置通过
  • 建设网站企业邮箱高端大气企业网站模板
  • 做网站工商局要不要备案呢wordpress怎么中文字体
  • 江西泰飞建设有限公司网站西宁专业做网站
  • 建网站公司汽车六万公里是否累变速箱油网站建设服务费怎么做会计分录
  • 花店网站建设实训总结深圳市住房和建设局网站住房
  • 太仓建设工程网站网站建设最重要的是什么
  • 深圳坪山新闻seo网站推广收费
  • 网站开发设计内容烟台网站建设找三硕科技
  • 站长统计官网牛商网做的网站
  • 浙江网站推广html网页设计介绍
  • 网站设计 加英文费用一卡2卡三卡4卡入口天堂
  • 个人网站的设计论文西安小程序开发费用
  • 网站建设公司有哪些内容机关门花网站建设
  • 做微信电影网站做网站设计哪里有