游戏网站设计论文,超全的开源建站系统大全,商城类网站如何做seo,dedecms 5.7 关闭网站目录 引出点对点(simple)Work queues 一对多发布订阅/fanout模式以登陆验证码为例pom文件导包application.yml文件rabbitmq的配置生产者生成验证码#xff0c;发送给交换机消费者消费验证码 topic模式配置类增加配置生产者发送信息进行发送控制台查看 rabbitmq回调确认配置类验… 目录 引出点对点(simple)Work queues 一对多发布订阅/fanout模式以登陆验证码为例pom文件导包application.yml文件rabbitmq的配置生产者生成验证码发送给交换机消费者消费验证码 topic模式配置类增加配置生产者发送信息进行发送控制台查看 rabbitmq回调确认配置类验证生产者发送是否成功 延迟队列死信设计java代码步骤创建正常死信队列配置类常量生产者到正常队列消费者进行延迟消费 延迟队列插件安装访问官网进入rabbitmq docker容器上传到linux服务器拷贝插件到容器中进入容器安装插件打开管理页面 总结 引出 1.rabbitmq队列方式的梳理点对点一对多 2.发布订阅模式交换机到消费者以邮箱和手机验证码为例 3.topic模式根据规则决定发送给哪个队列 4.rabbitmq回调确认setConfirmCallback和setReturnsCallback 5.死信队列延迟队列创建方法正常—死信设置延迟时间
点对点(simple) 点对对方式传输 Work queues 一对多 1个生产者多个消费者 发布订阅/fanout模式 生产者通过fanout扇出交换机群发消息给消费者同一条消息每一个消费者都可以收到。 以登陆验证码为例
pom文件导包
!-- qq邮箱--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-mail/artifactId/dependency!-- 阿里云短信验证码相关包--dependencygroupIdcom.aliyun/groupIdartifactIdaliyun-java-sdk-core/artifactIdversion4.5.3/version/dependency!-- queue的包--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencyapplication.yml文件
server:port: 9099spring:# 模块的名字application:name: user-auth# 邮箱的配置mail:host: smtp.qq.comport: 587username: xxxxpassword: xxxxx# rabbitmq的配置rabbitmq:host: 192.168.111.130port: 5672username: adminpassword: 123logging:level:com.tianju.auth: debugrabbitmq的配置 需要用到的常量 package com.tianju.auth.util;/*** rabbitmq的常量*/
public interface RabbitMqConstants {String MQ_MAIL_QUEUEmq_email_queue;String MQ_PHONE_QUEUEmq_phone_queue;String MQ_FANOUT_EXCHANGEmq_fanout_exchange;// 参数 String name, boolean durable, boolean exclusive, boolean autoDeleteboolean durable true;boolean exclusive false;boolean autoDelete false;}RabbitMqConfig.java配置 邮箱队列电话队列交换机 邮箱绑定交换机电话绑定交换机 创建队列参数说明
参数说明name字符串值queue的名称。durable布尔值表示该 queue 是否持久化。 持久化意味着当 RabbitMQ 重启后该 queue 是否会恢复/仍存在。 另外需要注意的是queue 的持久化不等于其中的消息也会被持久化。exclusive布尔值表示该 queue 是否排它式使用。排它式使用意味着仅声明他的连接可见/可用其它连接不可见/不可用。autoDelete布尔值表示当该 queue 没“人”connection用时是否会被自动删除。
不指定 durable、exclusive 和 autoDelete 时默认为 true 、 false 和 false 。表示持久化、非排它、不用自动删除。
创建交换机参数说明
参数说明name字符串值exchange 的名称。durable布尔值表示该 exchage 是否持久化。 持久化意味着当 RabbitMQ 重启后该 exchange 是否会恢复/仍存在。autoDelete布尔值表示当该 exchange 没“人”queue用时是否会被自动删除。 不指定 durable 和 autoDelete 时默认为 true 和 false 。表示持久化、不用自动删除 package com.tianju.auth.config;import com.tianju.auth.util.RabbitMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitMqConfig {Bean // 邮箱的队列public Queue mailQueue(){return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}Bean // 电话的队列public Queue phoneQueue(){return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}Bean // 交换机public FanoutExchange fanoutExchange(){return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,RabbitMqConstants.durable,RabbitMqConstants.autoDelete);}Beanpublic Binding mailBinding(){return BindingBuilder.bind(mailQueue()).to(fanoutExchange());}Beanpublic Binding phoneBinding(){return BindingBuilder.bind(phoneQueue()).to(fanoutExchange());}}
生产者生成验证码发送给交换机 接口 package com.tianju.auth.service;public interface IUserService {/*** 生产者生成信息发送给交换机* param msg 信息这里是验证码*/void sendCode(String msg);
}实现 package com.tianju.auth.service.impl;import com.tianju.auth.service.IUserService;
import com.tianju.auth.util.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;Service
Slf4j
public class UserServiceImpl implements IUserService {Autowiredprivate RabbitTemplate rabbitTemplate;Overridepublic void sendCode(String msg) {rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_FANOUT_EXCHANGE,routingkey.fanout,msg);log.debug([生产者向交换机] 发送一条信息{},msg);}}测试类生成验证码发给交换机 package com.tianju.auth.service.impl;import cn.hutool.core.lang.Snowflake;
import com.tianju.auth.service.IUserService;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;SpringBootTest
RunWith(SpringJUnit4ClassRunner.class)public class UserServiceImplTest {Autowiredprivate IUserService userService;Testpublic void sendCode() {String code new Snowflake().nextIdStr().substring(0, 6);System.out.println(code);userService.sendCode(code);}
}消费者消费验证码
package com.tianju.auth.consumer;import com.tianju.auth.service.IEmailService;
import com.tianju.auth.util.RabbitMqConstants;
import com.tianju.auth.util.SMSUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;Slf4j
Service
public class UserConsumer {Autowiredprivate IEmailService emailService;RabbitListener(queues RabbitMqConstants.MQ_MAIL_QUEUE)public void emailConsumer(String msg){log.debug([email消费者]消费{},msg);emailService.sendEmail(xxxxqq.com, 登陆验证码, msg);}RabbitListener(queues RabbitMqConstants.MQ_PHONE_QUEUE)public void phoneConsumer(String msg){log.debug([phone消费者]消费{},msg);SMSUtil.send(xxxx, msg);}}topic模式 例如 routingkey: my.orange.rabbit —- Q1,Q2 配置类增加配置
package com.tianju.auth.util;/*** rabbitmq的常量*/
public interface RabbitMqConstants {String MQ_MAIL_QUEUEmq_email_queue;String MQ_PHONE_QUEUEmq_phone_queue;String MQ_FANOUT_EXCHANGEmq_fanout_exchange;String MQ_TOPIC_EXCHANGEmq_topic_exchange;String MQ_TOPIC_QUEUE_A mq_topic_queue_a;String MQ_TOPIC_QUEUE_B mq_topic_queue_b;// 参数 String name, boolean durable, boolean exclusive, boolean autoDeleteboolean durable true;boolean exclusive false;boolean autoDelete false;}
package com.tianju.auth.config;import com.tianju.auth.util.RabbitMqConstants;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;Configuration
public class RabbitMqConfig {Bean // 邮箱的队列public Queue mailQueue(){return new Queue(RabbitMqConstants.MQ_MAIL_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}Bean // 电话的队列public Queue phoneQueue(){return new Queue(RabbitMqConstants.MQ_PHONE_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}Bean // 交换机public FanoutExchange fanoutExchange(){return new FanoutExchange(RabbitMqConstants.MQ_FANOUT_EXCHANGE,RabbitMqConstants.durable,RabbitMqConstants.autoDelete);}Beanpublic Binding mailBinding(){return BindingBuilder.bind(mailQueue()).to(fanoutExchange());}Beanpublic Binding phoneBinding(){return BindingBuilder.bind(phoneQueue()).to(fanoutExchange());}Bean // A队列public Queue topicAQueue(){return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_A,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}/*** topic模式相关配置*/Bean // B队列public Queue topicBQueue(){return new Queue(RabbitMqConstants.MQ_TOPIC_QUEUE_B,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}Bean // topic的交换机public TopicExchange topicMyExchange(){return new TopicExchange(RabbitMqConstants.MQ_TOPIC_EXCHANGE,RabbitMqConstants.durable,RabbitMqConstants.autoDelete);}Beanpublic Binding topicAQueueBinding(){return BindingBuilder.bind(topicAQueue()).to(topicMyExchange()).with(topic.xxx); // 规则 topic.xxx}Beanpublic Binding topicBQueueBinding(){return BindingBuilder.bind(topicBQueue()).to(topicMyExchange()).with(topic.*); // 规则 topic.xxx}}
生产者发送信息 /*** topic模式下生产者发送信息给交换机可以决定给哪个队列发信息* param msg 发送的信息* param routingKey 类似正则表达式决定给谁发* .with(topic.xxx); // 规则 topic.xxx ---- A队列* .with(topic.*); // 规则 topic.xxx ---- B队列* 在配置类中如上所述配置则如果输入的routingKey为 topic.xxx则给A和B发* 如果输入的routingKey为 topic.yyy 则 只给B队列发*/void sendMsg(String msg,String routingKey); 实现 package com.tianju.auth.service.impl;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.tianju.auth.entity.UserPrivs;
import com.tianju.auth.mapper.UserMapper;
import com.tianju.auth.service.IUserService;
import com.tianju.auth.util.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;Service
Slf4j
public class UserServiceImpl implements IUserService {Autowiredprivate RabbitTemplate rabbitTemplate;Overridepublic void sendCode(String msg) {rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_FANOUT_EXCHANGE,routingkey.fanout,msg);log.debug([生产者向交换机] 发送一条信息{},msg);}Overridepublic void sendMsg(String msg,String routingKey) {rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_TOPIC_EXCHANGE,routingKey, // topic.yyy,此时只有B队列有信息msg);log.debug([生产者向交换机] 发送一条信息{},msg);}}
进行发送
package com.tianju.auth.service.impl;import cn.hutool.core.lang.Snowflake;
import com.tianju.auth.service.IUserService;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;SpringBootTest
RunWith(SpringJUnit4ClassRunner.class)public class UserServiceImplTest {Autowiredprivate IUserService userService;Testpublic void sendCode() {String code new Snowflake().nextIdStr().substring(0, 6);System.out.println(code);userService.sendCode(code);}Testpublic void sendTopic() {String code new Snowflake().nextIdStr().substring(0, 6);System.out.println(code);userService.sendMsg(code,topic.yyy);}
}控制台查看 rabbitmq回调确认
配置类
spring:# rabbitmq的配置rabbitmq:host: 192.168.111.130port: 5672username: adminpassword: 123# 确认收到publisher-confirm-type: correlatedpublisher-returns: true验证生产者发送是否成功 使用RabbitTemplate的回调方法。 先设置 setConfirmCallbacksetReturnsCallback Autowiredprivate RabbitTemplate rabbitTemplate;Overridepublic void sendCode(String msg) {rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_FANOUT_EXCHANGE,routingkey.fanout,msg);log.debug([生产者向交换机] 发送一条信息{},msg);}Overridepublic void sendMsg(String msg,String routingKey) {// 如果发到交换机看一下有没有反馈rabbitTemplate.setConfirmCallback((c,ack,message)-{log.debug(***** setConfirmCallbackack--{}, ack); // 是否发送到交换机log.debug(***** setConfirmCallbackc--{},c);// channel error; protocol method: #methodchannel.close(reply-code404,// reply-textNOT_FOUND - no exchange aaaa in vhost /, class-id60, method-id40)log.debug(***** setConfirmCallbackm--{},message);if (ack){log.debug([生产者] 发送信息到交换机{},RabbitMqConstants.MQ_TOPIC_EXCHANGE);}else {log.debug(message);}});rabbitTemplate.setReturnsCallback(r-{log.debug(返回文字{}, r.getReplyText());log.debug(返回code{}, r.getReplyCode());log.debug(返回Exchange{}, r.getExchange());log.debug(返回RoutingKey{}, r.getRoutingKey());});rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_TOPIC_EXCHANGE,
// aaaa,// 失败的情况routingKey, // topic.yyy,此时只有B队列有信息msg);log.debug([生产者向交换机] 发送一条信息{},msg);}rabbitTemplate.setConfirmCallback((c,ack,message)-{log.debug(******* setConfirmCallback:ack-{},ack);log.debug(******* setConfirmCallback:c-{},c);log.debug(******* setConfirmCallback:chanel-{},message);if(ack){log.debug([生产者]发送信息到达交换机{},RabbitMqConstants.MQ_TOPIC_EXCHANGE);}else {log.debug(message);}
});
rabbitTemplate.setReturnsCallback(r-{log.debug(返回文字:{},r.getReplyText());log.debug(返回code:{},r.getReplyCode());log.debug(返回Exchange:{},r.getExchange());log.debug(返回RoutingKey:{},r.getRoutingKey());
});
rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_TOPIC_EXCHANGE,abc.xxx,msg
);Testpublic void sendTopic() {String code new Snowflake().nextIdStr().substring(0, 6);System.out.println(code);userService.sendMsg(code,topic.rrr);}延迟队列死信设计
Documentation: Table of Contents — RabbitMQ java代码步骤
创建正常死信队列
package com.tianju.mq.config;import com.tianju.mq.constants.RabbitMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;Configuration
public class RabbitMqConfig {Beanpublic DirectExchange normalExchange(){return new DirectExchange(RabbitMqConstants.MQ_NORMAL_EXCHANGE,RabbitMqConstants.durable,RabbitMqConstants.autoDelete);}Beanpublic Queue normalQueue(){MapString, Object map new HashMap(2);map.put(x-dead-letter-exchange,RabbitMqConstants.MQ_DELAY_EXCHANGE);map.put(x-dead-letter-routing-key,RabbitMqConstants.MQ_DELAY_ROUTING_KEY);return new Queue(RabbitMqConstants.MQ_NORMAL_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete,map);}Beanpublic Binding normalBinding(){return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(RabbitMqConstants.MQ_NORMAL_ROUTING_KEY);}//------------------死信队列设计--------------------------/*** 死信(延迟)队列* return*/Beanpublic Queue delayQueue(){return new Queue(RabbitMqConstants.MQ_DELAY_QUEUE,RabbitMqConstants.durable,RabbitMqConstants.exclusive,RabbitMqConstants.autoDelete);}/*** 死信交换机* return*/Beanpublic DirectExchange delayExchange(){return new DirectExchange(RabbitMqConstants.MQ_DELAY_EXCHANGE,RabbitMqConstants.durable,RabbitMqConstants.autoDelete);}/*** 死信交换机队列绑定* return*/Beanpublic Binding delayBinding(){return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(RabbitMqConstants.MQ_DELAY_ROUTING_KEY);}
}配置类常量
package com.tianju.mq.constants;public interface RabbitMqConstants {String MQ_DELAY_QUEUE mq_delay_queue; // 延迟队列死信队列String MQ_DELAY_EXCHANGE mq_delay_exchange; // 死信交换机String MQ_DELAY_ROUTING_KEY mq_delay_routing_key; // 死信路由// 正常的队列交换机路由String MQ_NORMAL_QUEUE mq_normal_queue;String MQ_NORMAL_EXCHANGE mq_normal_exchange;String MQ_NORMAL_ROUTING_KEY mq_normal_routing_key;// 参数boolean durable true;boolean exclusive false;boolean autoDelete false;
}
server:port: 9099spring:# 邮箱的配置mail:host: smtp.qq.comport: 587username: 826465890qq.compassword: sdxgilesroqbbbje# rabbitmq的配置rabbitmq:host: 192.168.111.130port: 5672username: adminpassword: 123# 确认收到publisher-confirm-type: correlatedpublisher-returns: truelogging:level:com.tianju.mq: debug
生产者到正常队列
package com.tianju.mq.service;public interface IUserService {/*** 延迟队列的生产者* param msg 发送的信息* param delayTime 延迟的时间毫秒*/void sendDelay(String msg,int delayTime);
}
package com.tianju.mq.service.impl;import com.tianju.mq.constants.RabbitMqConstants;
import com.tianju.mq.service.IUserService;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;Service
Slf4j
public class UserServiceImpl implements IUserService {Autowiredprivate RabbitTemplate rabbitTemplate;Overridepublic void sendDelay(String msg, int delayTime) {rabbitTemplate.convertAndSend(RabbitMqConstants.MQ_NORMAL_EXCHANGE,RabbitMqConstants.MQ_NORMAL_ROUTING_KEY,msg,process-{process.getMessageProperties().setExpiration(String.valueOf(delayTime));return process;});log.debug([生产者]发送消息:{},时间{},延迟{}秒,msg,new Date(),delayTime/1000);}
} 消费者进行延迟消费
package com.tianju.mq.consumer;import com.tianju.mq.constants.RabbitMqConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;import java.util.Date;Service
Slf4j
public class UserConsumer {RabbitListener(queues RabbitMqConstants.MQ_DELAY_QUEUE)public void delayConsume(String msg){log.debug([消费者消费信息:{},时间:{},msg,new Date());}
} 延迟队列插件安装
访问官网
Community Plugins — RabbitMQ 进入rabbitmq docker容器
[rootlocalhost ~]# docker exec -it rabbitmq bash查询插件列表是否存在延迟插件
root6d2342d51b11:/plugins# rabbitmq-plugins list
Listing plugins with pattern .* ...Configured: E explicitly enabled; e implicitly enabled| Status: * running on rabbit6d2342d51b11|/
[ ] rabbitmq_amqp1_0 3.9.11
[ ] rabbitmq_auth_backend_cache 3.9.11
[ ] rabbitmq_auth_backend_http 3.9.11
[ ] rabbitmq_auth_backend_ldap 3.9.11
[ ] rabbitmq_auth_backend_oauth2 3.9.11
[ ] rabbitmq_auth_mechanism_ssl 3.9.11
[ ] rabbitmq_consistent_hash_exchange 3.9.11
[ ] rabbitmq_event_exchange 3.9.11
[ ] rabbitmq_federation 3.9.11
[ ] rabbitmq_federation_management 3.9.11
[ ] rabbitmq_jms_topic_exchange 3.9.11
[E*] rabbitmq_management 3.9.11
[e*] rabbitmq_management_agent 3.9.11
[ ] rabbitmq_mqtt 3.9.11
[ ] rabbitmq_peer_discovery_aws 3.9.11
[ ] rabbitmq_peer_discovery_common 3.9.11
[ ] rabbitmq_peer_discovery_consul 3.9.11
[ ] rabbitmq_peer_discovery_etcd 3.9.11
[ ] rabbitmq_peer_discovery_k8s 3.9.11
[E*] rabbitmq_prometheus 3.9.11
[ ] rabbitmq_random_exchange 3.9.11
[ ] rabbitmq_recent_history_exchange 3.9.11
[ ] rabbitmq_sharding 3.9.11
[ ] rabbitmq_shovel 3.9.11
[ ] rabbitmq_shovel_management 3.9.11
[ ] rabbitmq_stomp 3.9.11
[ ] rabbitmq_stream 3.9.11
[ ] rabbitmq_stream_management 3.9.11
[ ] rabbitmq_top 3.9.11
[ ] rabbitmq_tracing 3.9.11
[ ] rabbitmq_trust_store 3.9.11
[e*] rabbitmq_web_dispatch 3.9.11
[ ] rabbitmq_web_mqtt 3.9.11
[ ] rabbitmq_web_mqtt_examples 3.9.11
[ ] rabbitmq_web_stomp 3.9.11
[ ] rabbitmq_web_stomp_examples 3.9.11下载支持3.9.x的插件 退出容器
root6d2342d51b11:/plugins# exit
exit上传到linux服务器
在/usr/local/software/下创建文件夹rabbitmq/plugins
[rootlocalhost software]# mkdir -p rabbitmq/plugins拷贝插件到容器中
[rootlocalhost plugins]# docker cp ./rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins进入容器安装插件
[rootlocalhost plugins]# docker exec -it rabbitmq bash
root6d2342d51b11:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange打开管理页面 进入Exchange页面下拉Type看是否已经安装成功。 总结
1.rabbitmq队列方式的梳理点对点一对多 2.发布订阅模式交换机到消费者以邮箱和手机验证码为例 3.topic模式根据规则决定发送给哪个队列 4.rabbitmq回调确认setConfirmCallback和setReturnsCallback 5.死信队列延迟队列创建方法正常—死信设置延迟时间