县城乡建设局网站,中国建设信息化期刊官网,网站服务器排名,discuz可以做商城网站吗MQ异步通信
初始MQ
同步通信
优点#xff1a;时效性较强#xff0c;可以以及得到结果
Feign就属于同步方式–问题#xff1a;
耦合问题性能下降#xff08;中间的等待时间#xff09;资源浪费级联失败
异步通信
优点
耦合度低性能提升#xff0c;吞吐量高故障隔离…MQ异步通信
初始MQ
同步通信
优点时效性较强可以以及得到结果
Feign就属于同步方式–问题
耦合问题性能下降中间的等待时间资源浪费级联失败
异步通信
优点
耦合度低性能提升吞吐量高故障隔离服务无强依赖解决级联失败问题流量削峰
缺点
依赖于broker的可靠性安全性吞吐能力架构复杂了业务没有明显的流程先不好追踪管理
MQ常见框架
什么是MQ信息队列存放消息的队列也就是时间驱动架构中的broker
RabbitMQActiveMQRocketMQKafka公司/社区RabbitApache阿里Apache开发语言ErlangJavaJavaScalaJava协议支持AMQPXMPPSMTPSTOMPOpenWire,STOMPREST,XMPP,AMQP自定义协议自定义协议可用性高一般高高单机吞吐量一般差高非常高消息延迟微秒级毫秒级毫秒级毫秒以内消息可靠性高一般高一般
推荐RabbitMQ
RabbitMQ快速入门
案例
发布者
package cn.itcast.mq.helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(xxx.xx.xxx.xxx);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(xxx);factory.setPassword(xxxxx);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message hello, rabbitmq1!;channel.basicPublish(, queueName, null, message.getBytes());System.out.println(发送消息成功【 message 】);// 5.关闭通道和连接channel.close();connection.close();}
}消费者
package cn.itcast.mq.helloworld;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory new ConnectionFactory();// 1.1.设置连接参数分别是主机名、端口号、vhost、用户名、密码factory.setHost(xxx.xxx.xx.xxx);factory.setPort(5672);factory.setVirtualHost(/);factory.setUsername(xxx);factory.setPassword(xxx);// 1.2.建立连接Connection connection factory.newConnection();// 2.创建通道ChannelChannel channel connection.createChannel();// 3.创建队列String queueName simple.queue;channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message new String(body);System.out.println(接收到消息【 message 】);}});System.out.println(等待接收消息。。。。);}
}运行消费者后运行发布者就可以接收到发布者的信息因为消费者的函数是异步所以发布者可以随时发消费者一直等着 SpringAMQP
什么是AMQP一种协议API规范
SpringAMQP底层是rabbitMQ
基础实现
使用SpringAMQP实现Helllo World中的基础消息队列功能
1引入依赖
!--AMQP依赖包含RabbitMQ--
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency2基础配置
配置基础配置并且工具类并测试
基础配置
spring:rabbitmq:host: xxx.xx.xxx.xxport: 5672username: xxxpassword: xxxvirtual-host: /测试类
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
RunWith(SpringRunner.class)
SpringBootTest
public class SpringAmqpTest {Autowiredprivate RabbitTemplate rabbitTemplate;Testpublic void testSendMessage(){String queueName simple.queue;String message hello,spring amqp;rabbitTemplate.convertAndSend(queueName,message);}
}测试结果
前面几个是之前的第三个和后面的是我自己设置的 上述为发布者消费者需要
1配置yml文件
2定义一个类
这里的核心是使用Component自动装配到Spring,使用RabbitListener进行队列的监听得到返回值msg就打印
package cn.itcast.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
Component
public class SpringRabbitListener {RabbitListener(queues simple.queue)public void listenSimpleQueue(String msg){System.out.println(消费者接收到消息【msg】);}
}WorkQueue模型
当一个消费者能力足够强而另一个比较弱合理的处理方式应该是强的消费者处理更多的消息而弱的处理少的根据能力分工
所以WorkQueue模型就是为了让同一个队列的消费者能力强的处理更多的消息核心在于prefetch控制预提取的信息数量
生产者循环发送消息到simple.queue发送消息
Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName simple.queue;
// 消息
String message hello, message__;
for (int i 0; i 50; i) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message i);// 避免发送太快Thread.sleep(20);}
}消费者监听两个消费者
RabbitListener(queues simple.queue)
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println(spring 消费者1接收到消息【 msg 】);Thread.sleep(25);
}RabbitListener(queues simple.queue)
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
System.err.println(spring 消费者2接收到消息【 msg 】);
Thread.sleep(100);
}
Work模型的使用 多个消费者绑定到一个队列同一条消息只会被一个消费者处理 通过设置prefetch来控制消费者预取的消息数量
交换机exchange
exchangeexchange负责消息路由而不是存储路由失败则消息丢失
FanoutExchange的使用
创建两个队列并进行绑定
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Configuration
public class FanoutConfig {//itcast.exchageBeanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(ylw.fanout);}//fanout.queue1Beanpublic Queue fanoutQueue1() {return new Queue(fanout.queue1);}Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}//fanout.queue2Beanpublic Queue fanoutQueue2() {return new Queue(fanout.queue2);}Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}Consumer声明两个消费者
RabbitListener(queues fanout.queue1)public void listenFanoutQueue1(String msg)
{System.out.println(消费者1接收到Fanout消息【 msg 】);}
RabbitListener(queues fanout.queue2) public void listenFanoutQueue2(String msg)
{System.out.println(消费者2接收到Fanout消息【 msg 】);}发布者发送消息
Test
public void testFanoutExchange() { // 队列名称 String exchangeName itcast.fanout; // 消息String message hello, everyone!; // 发送消息参数分别是交互机名称、RoutingKey暂时为空、消息 rabbitTemplate.convertAndSend(exchangeName, , message);}