深圳网站设计哪家好,企业网站服务器多少钱,广州网站搭建多少钱,wordpress访问密码消费者获取消息是从Master Broker还是Slave Broker获取#xff1f;
Master Broker宕机#xff0c;Slave Broker会自动切换为Master Broker吗#xff1f;
这种Master-Slave模式不是彻底的高可用模式#xff0c;他没法实现自动把Slave切换为Master。在RocketMQ 4.5之后
Master Broker宕机Slave Broker会自动切换为Master Broker吗
这种Master-Slave模式不是彻底的高可用模式他没法实现自动把Slave切换为Master。在RocketMQ 4.5之后这种情况得到了改变因为RocketMQ支持了一种新的机制叫做Dledger基于Raft协议
老齐
当同一类消息被送入不同队列且这些消息在处理上并不需要按时序消费时可以考虑使用并发消费模式。并发消费模式生产者会将消息轮询发送到不同的队列当中这些队列会和消费者实例建立多个连接(线程)将消息并发送入到不同的消费者。因为消费者处理速度有快慢所以并不能保证物流数据会按0~9的顺序依次消费。并发消费模式处理效率很高但无法保证有序性。
有序消费
有序消息是指生产者在产生数据的时候根据Hash规则指定让消息放入哪个队列在消费者消费时会保证不同消费者针对每一个队列只有唯一的连接(线程)用于消费指定队列。
有序消费模式可以保证消息按队列FIFO顺序依次被消费但因此失去并发性能有序消费模式只有在业务要求必须按顺序消费的场景下才允许使用。
RocketMQ如何实现有序消息
要实现RocketMQ有序消息需要两点调整: 生产者端要求按id等唯一标识分配消息队列消费者端采用专用的监听器保证对队列的单线程应用下面咱们来看一下代码
生产者端
com.itlaogi.rocketmg.seqmsg.SequenceMessageProvider 核心代码是在向Broker发送消息时附加MessageQueueSelector对象在实现select方法时指定存放到哪个队列中。
public class RocketMQProducerExample {public static void main(String[] args) throws MQClientException {// 创建生产者实例DefaultMQProducer producer new DefaultMQProducer(producer_group_name);// 设置NameServer地址这里需要替换为实际的地址producer.setNamesrvAddr(localhost:9876);producer.start();try {for (Integer orderId 1; orderId 10; orderId) {for (int i 0; i 3; i) {String data ;switch (i % 3) {case 0:data orderId 号创建订单;break;case 1:data orderId 号订单减库存;break;case 2:data orderId 号订单加积分;break;}// 创建消息对象 topicorder, tagsorder, keyorderIdMessage message new Message(order, order, orderId.toString(), data.getBytes(UTF-8));// 发送消息实现MessageQueueSelector接口SendResult result producer.send(message, new MessageQueueSelector() {// select方法决定向broker哪一个队列发送消息Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {int currentOrderId Integer.parseInt(msg.getKeys());int index currentOrderId % mqs.size();MessageQueue messageQueue mqs.get(index);Logger.getLogger(RocketMQProducerExample.class.getName()).log(Level.INFO,id:{0},data:{1},queue:{2}, new Object[]{currentOrderId, new String(msg.getBody()), messageQueue});return messageQueue;}}, null);}}} catch (Exception e) {e.printStackTrace();} finally {producer.shutdown();}}
}public class RocketMQConsumerExample {public static void main(String[] args) throws Exception {// 创建消费者实例并指定消费者组名DefaultMQPushConsumer consumer new DefaultMQPushConsumer(consumer_group_name);// 设置NameServer地址需替换为实际地址consumer.setNamesrvAddr(localhost:9876);// 设置从哪里开始消费消息比如从上次消费的位置开始consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 订阅主题和标签consumer.subscribe(order, *);// 注册顺序消费的监听器用于实现有序队列consumer.registerMessageListener(new MessageListenerOrderly() {Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {try {for (MessageExt msg : msgs) {// 在这里处理具体的消息按照顺序来处理每个消息System.out.println(new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;} catch (Exception e) {e.printStackTrace();return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}});consumer.start();System.out.println(Consumer started.);}
}运行结果
如何实现消息全局顺序消费?
只需要在生产者固定将所有消息发送到0号队列即可保证全局有序这也意味着全局采用单线程消费执行效率极差
Override
public MessageQueue select(ListMessageQueue mgs, Message msg, object arg){MessageQueue messageQueue mgs.get(0);return messageQueue;
}有序消费有什么使用限制吗?
有序消费模式只支持集群模式(CLUSTERING)不支持广播模式(BROADCASTING):采用广播模式会无法接收到数据
//设置为集群模式
consumer.setMessageModel(MessageMode1.CLUSTERING)://支持有序消息默认模式。
consumer.setMessageModel(MessageMode1.BROADCASTING)://不支持有序消息