python网站开发架构,怎么免费建商城网站吗,网络优化工资一般多少,中铁建设集团有限公司贵州分公司Broker
异步调用中用Broker进行事件订阅和调用#xff0c;完成解耦
没有强依赖#xff0c;不用担心级联失败
流量削峰 MQ 的下载
1.可以使用命令拉取镜像
docker pull rabbitmq:3-management
2.也可以直接去官网下载tar包#xff0c;然后上传到虚拟机上面 spring AMQP…Broker
异步调用中用Broker进行事件订阅和调用完成解耦
没有强依赖不用担心级联失败
流量削峰 MQ 的下载
1.可以使用命令拉取镜像
docker pull rabbitmq:3-management
2.也可以直接去官网下载tar包然后上传到虚拟机上面 spring AMQP 消息队列
Basic Queue 简单队列模型
只需要简单的引入amqp依赖
!--AMQP依赖包含RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
然后配置接收和发送端的地址
spring:rabbitmq:host: 192.168.xxx.xxx # 自己主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: xxxxxx # 用户名password: xxxxxx # 密码
然后调用方法发送或结合搜消息即可
发送
package cn.itcast.mq.spring;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;RunWith(SpringRunner.class)
SpringBootTest
public class SpringAmqpTest {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSimpleQueue() {// 队列名称String queueName simple.queue;// 消息String message hello, spring amqp!;// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}
接收
package cn.itcast.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;Component
public class SpringRabbitListener {RabbitListener(queues simple.queue)public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println(spring 消费者接收到消息【 msg 】);}
}
当然了如果不是在父工程里面配置的依赖则需要在单个项目里面单独配置 Work Queue 一队列多消费者
假设编辑五百条消息
/*** workQueue* 向队列中不停发送消息模拟消息堆积。*/
Test
public void testWorkQueue() throws InterruptedException {// 队列名称String queueName simple.queue;// 消息String message hello, message_;for (int i 0; i 500; i) {// 发送消息rabbitTemplate.convertAndSend(queueName, message i);Thread.sleep(20);}
}
定义两个接受者用不同效率接收
RabbitListener(queues simple.queue)
public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println(消费者1接收到消息【 msg 】 LocalTime.now());Thread.sleep(20);
}RabbitListener(queues simple.queue)
public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println(消费者2........接收到消息【 msg 】 LocalTime.now());Thread.sleep(200);
}
然后可以看出接受者接收消息并未在预定时间被消费原因是被队列平均分配了只要重新定制规则即可
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息处理完成才能获取下一个消息
交换机
Fanout
创建FanoutExchange交换机和Queue队列然后交换机和队列绑定
package cn.itcast.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class FanoutConfig {/*** 声明交换机* return Fanout类型交换机*/Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(itcast.fanout);}/*** 第1个队列*/Beanpublic Queue fanoutQueue1(){return new Queue(fanout.queue1);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/Beanpublic Queue fanoutQueue2(){return new Queue(fanout.queue2);}/*** 绑定队列和交换机*/Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
绑定完成后就可以写消费者和生产者代码了
生产者
Test
public void testFanoutExchange() {// 队列名称String exchangeName itcast.fanout;// 消息String message hello, everyone!;rabbitTemplate.convertAndSend(exchangeName, , message);
}
消费者
RabbitListener(queues fanout.queue1)
public void listenFanoutQueue1(String msg) {System.out.println(消费者1接收到Fanout消息【 msg 】);
}RabbitListener(queues fanout.queue2)
public void listenFanoutQueue2(String msg) {System.out.println(消费者2接收到Fanout消息【 msg 】);
}
交换机的作用是什么
* 接收publisher发送的消息 * 将消息按照规则路由到与之绑定的队列 * 不能缓存消息路由失败消息丢失 * FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么
* Queue * FanoutExchange * Binding