本地php网站搭建环境,怎么查看一个网站是不是伪静态,广东省建设厅官方网站网址,制作公司网站哪个好1、 订阅模型-Fanout
Fanout#xff0c;也称为广播。 流程图#xff1a;
在广播模式下#xff0c;消息发送流程是这样的#xff1a; 1#xff09; 可以有多个消费者 2#xff09; 每个消费者有自己的queue#xff08;队列#xff09; 3#xff09; 每个队列都要绑定…1、 订阅模型-Fanout
Fanout也称为广播。 流程图
在广播模式下消息发送流程是这样的 1 可以有多个消费者 2 每个消费者有自己的queue队列 3 每个队列都要绑定到Exchange交换机 4 生产者发送的消息只能发送到交换机交换机来决定要发给哪个队列生产者无法决定。 5 交换机把消息发送给绑定过的所有队列 6 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
1.1、生产者
两个变化 1 声明Exchange不再声明Queue 2 发送消息到Exchange不再发送到Queue
public class Send {private final static String EXCHANGE_NAME fanout_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明exchange指定类型为fanoutchannel.exchangeDeclare(EXCHANGE_NAME, fanout);// 消息内容String message Hello everyone;// 发布消息到Exchangechannel.basicPublish(EXCHANGE_NAME, , null, message.getBytes());System.out.println( [生产者] Sent message );channel.close();connection.close();}
}1.2、消费者1
public class Recv {private final static String QUEUE_NAME fanout_exchange_queue_1;private final static String EXCHANGE_NAME fanout_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者1] received : msg !);}};// 监听队列自动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}1.3、 消费者2
public class Recv2 {private final static String QUEUE_NAME fanout_exchange_queue_2;private final static String EXCHANGE_NAME fanout_exchange_test;public static void main(String[] argv) throws Exception {// 获取到连接Connection connection ConnectionUtil.getConnection();// 获取通道Channel channel connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 绑定队列到交换机channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, );// 定义队列的消费者DefaultConsumer consumer new DefaultConsumer(channel) {// 获取消息并且处理这个方法类似事件监听如果有消息的时候会被自动调用Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息体String msg new String(body);System.out.println( [消费者2] received : msg !);}};// 监听队列手动返回完成channel.basicConsume(QUEUE_NAME, true, consumer);}
}1.4、 测试
我们应该先启动生产者否则先启动消费者时由于要绑定交换机此时交换机并不存在所以会报错。 我们运行两个消费者然后发送1条消息