当前位置: 首页 > news >正文

长春网络优化最好的公司seo网站推广方法

长春网络优化最好的公司,seo网站推广方法,网络服务的重要性,游戏排行榜前十名为什么要使用MQ? 在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢? 首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事…

为什么要使用MQ?

在Spring Boot Event这篇文章中已经通过Guava或者SpringBoot自身的Listener实现了事件驱动,已经做到了对业务的解耦。为什么还要用到MQ来进行业务解耦呢?

首先无论是通过Guava还是Spring Boot自身提供的监听注解来实现的事件驱动他都是处于同一进程中的,意思就是当前事件推送后只有当前的进程可以进行消费。通过MQ可以实现将事件推送到进程外的Broker中,在多实例/分布式环境下,其他的服务在订阅同一事件(Topic)时,可以在各自的服务中进行消费,最大化空闲服务的利用。

image-20231227120405997

源码地址:Gitee

整合RocketMQ

依赖版本

  • JDK 17
  • Spring Boot 3.2.0
  • RocketMQ-Client 5.0.4
  • RocketMQ-Starter 2.2.0

可以参考这篇进行RocketMQ安装

Spring Boot 3.0+ 取消了对spring.factories的支持。所以在导入时需要手动引入RocketMQ的配置类。

引入RocketMQ依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.4</version>
</dependency>
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>

解决Spring Boot3+不兼容 spring.factories

rocketmq-spring-boot-starter:2.2.2版本中:
image-20231227062105302

参考配置文件

# RocketMQ 配置
rocketmq:name-server: 127.0.0.1:9876consumer:group: event-mq-group# 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值pull-batch-size: 1producer:# 发送同一类消息的设置为同一个group,保证唯一group: event-mq-group# 发送消息超时时间,默认3000sendMessageTimeout: 10000# 发送消息失败重试次数,默认2retryTimesWhenSendFailed: 2# 异步消息重试此处,默认2retryTimesWhenSendAsyncFailed: 2# 消息最大长度,默认1024 * 1024 * 4(默认4M)maxMessageSize: 4096# 压缩消息阈值,默认4k(1024 * 4)compressMessageBodyThreshold: 4096# 是否在内部发送失败时重试另一个broker,默认falseretryNextServer: false

参考Issue

  • 方法一 :通过@Import(RocketMQAutoConfiguration.class)在配置类中引入

  • 方法二:在resources资源目录下创建文件夹及文件META-INF/springorg.springframework.boot.autoconfigure.AutoConfiguration.imports
    文件内容为RocketMQ自动配置类路径:org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration

RocketMQ 使用

解决Spring Boot3+不支持spring.factories的问题

import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;/*** 启动类*/
@Import(RocketMQAutoConfiguration.class)
@SpringBootApplication
public class MQEventApplication {public static void main(String[] args) {SpringApplication.run(MQEventApplication.class, args);}
}

RocketMQ操作工具

RocketMQ Message实体

import cn.hutool.core.util.IdUtil;
import jakarta.validation.constraints.NotBlank;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;import java.io.Serializable;
import java.util.List;/*** RocketMQ 消息*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RocketMQMessage<T> implements Serializable {/*** 消息队列主题*/@NotBlank(message = "MQ Topic 不能为空")private String topic;/*** 延迟级别*/@Builder.Defaultprivate DelayLevel delayLevel = DelayLevel.OFF;/*** 消息体*/private T message;/*** 消息体*/private List<T> messages;/*** 使用有序消息发送时,指定发送到队列*/private String hashKey;/*** 任务Id,用于日志打印相关信息*/@Builder.Defaultprivate String taskId = IdUtil.fastSimpleUUID();
}

RocketMQTemplate 二次封装

import com.yiyan.study.domain.RocketMQMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;/*** RocketMQ 消息工具类*/
@Slf4j
@Component
public class RocketMQService {@Resourceprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.sendMessageTimeout}")private int sendMessageTimeout;/*** 异步发送消息回调** @param taskId 任务Id* @param topic  消息主题* @return the send callback*/private static SendCallback asyncSendCallback(String taskId, String topic) {return new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("ROCKETMQ 异步消息发送成功 : [TaskId:{}] - [Topic:{}] - [SendStatus:{}]", taskId, topic, sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {log.error("ROCKETMQ 异步消息发送失败 : [TaskId:{}] - [Topic:{}] - [ErrorMessage:{}]", taskId, topic, throwable.getMessage());}};}/*** 发送同步消息,使用有序发送请设置HashKey** @param message 消息参数*/public <T> void syncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessage(), sendMessageTimeout, message.getDelayLevel().getLevel());}log.info("ROCKETMQ 同步消息发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 批量发送同步消息** @param message 消息参数*/public <T> void syncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 同步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());SendResult sendResult;if (StringUtils.isNotBlank(message.getHashKey())) {sendResult = rocketMQTemplate.syncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey());} else {sendResult = rocketMQTemplate.syncSend(message.getTopic(), message.getMessages());}log.info("ROCKETMQ 同步消息-批量发送结果 : [TaskId:{}] - [Topic:{}] - [MessageId:{}] - [SendStatus:{}]",message.getTaskId(), message.getTopic(), sendResult.getMsgId(), sendResult.getSendStatus());}/*** 异步发送消息,异步返回消息结果** @param message 消息参数*/public <T> void asyncSend(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息发送 : [TaskId:{}] - [Topic:{}]", message.getTaskId(), message.getTopic());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessage(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessage(),asyncSendCallback(message.getTaskId(), message.getTopic()), sendMessageTimeout, message.getDelayLevel().getLevel());}}/*** 批量异步发送消息** @param message 消息参数*/public <T> void asyncSendBatch(RocketMQMessage<T> message) {log.info("ROCKETMQ 异步消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount:{}]",message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {rocketMQTemplate.asyncSendOrderly(message.getTopic(), message.getMessages(), message.getHashKey(),asyncSendCallback(message.getTaskId(), message.getTopic()));} else {rocketMQTemplate.asyncSend(message.getTopic(), message.getMessages(),asyncSendCallback(message.getTaskId(), message.getTopic()));}}/*** 单向发送消息,不关心返回结果,容易消息丢失,适合日志收集、不精确统计等消息发送;** @param message 消息参数*/public <T> void sendOneWay(RocketMQMessage<T> message) {sendOneWay(message, false);}/*** 单向消息 - 批量发送** @param message 消息体* @param batch   是否为批量操作*/public <T> void sendOneWay(RocketMQMessage<T> message, boolean batch) {log.info((batch ? "ROCKETMQ 单向消息发送 : [TaskId:{}] - [Topic:{}]": "ROCKETMQ 单向消息-批量发送 : [TaskId:{}] - [Topic:{}] - [MessageCount{}]"),message.getTaskId(), message.getTopic(), message.getMessages().size());if (StringUtils.isNotBlank(message.getHashKey())) {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWayOrderly(message.getTopic(), msg, message.getHashKey()));} else {rocketMQTemplate.sendOneWayOrderly(message.getTopic(), message.getMessage(), message.getHashKey());}} else {if (batch) {message.getMessages().forEach(msg -> rocketMQTemplate.sendOneWay(message.getTopic(), msg));} else {rocketMQTemplate.sendOneWay(message.getTopic(), message.getMessage());}}}
}

定义RocketMQ消费者

import com.yiyan.study.constants.MQConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;/*** MQ消息监听*/
@Component
@Slf4j
@RocketMQMessageListener(topic = MQConfig.EVENT_TOPIC,consumerGroup = MQConfig.EVENT_CONSUMER_GROUP)
public class MQListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("MQListener 接收消息 : {}", message);}
}

定义测试类发送消息

import cn.hutool.core.thread.ThreadUtil;
import com.yiyan.study.constants.MQConfig;
import com.yiyan.study.domain.RocketMQMessage;
import com.yiyan.study.utils.RocketMQService;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;/*** MQ测试*/
@SpringBootTest
public class MQTest {@Resourceprivate RocketMQService rocketMQService;@Testpublic void sendMessage() {int count = 1;while (count <= 50) {rocketMQService.syncSend(RocketMQMessage.builder().topic(MQConfig.EVENT_TOPIC).message(count++).build());}// 休眠等待消费消息ThreadUtil.sleep(2000L);}
}

测试

springboot3-RocketMQ

http://www.hkea.cn/news/710217/

相关文章:

  • 2022网页设计尺寸规范和要求怎么做seo关键词优化
  • 北京大学两学一做网站十大收益最好的自媒体平台
  • 网站开发服务费企业网站建设的一般要素
  • 台州企业网站制作公司郴州网站推广
  • 如何做移动端网站邮件营销
  • 网站制作佛山crm管理系统
  • 网站综合营销方案设计网页设计教程
  • 东莞做网站制作宁波技术好的企业网站制作
  • 广州做网站公司哪家好如何注册一个网站
  • 网站备案协议书互联网营销师证书含金量
  • 广州企业网站建设报价免费推广网站大全
  • 宁波网站排名怎么提交网址让百度收录
  • 杭州 手机网站建设活动营销
  • 加网络网站建设工作室做一个企业网站大概需要多少钱
  • 张家港优化网站seo百度网盘下载
  • 烟台有没有做网站网站安全
  • 网站建设与制作设计公司惠州seo代理商
  • 东营新闻网今日头条常州网站seo
  • 东莞全网合一网站黄页引流推广网站软件免费
  • wordpress的数据库在那里百度seo如何快速排名
  • wordpress手机客服代码免费seo快速排名工具
  • web网站开发作品关键词歌词图片
  • 汕头行业网站seo培训公司
  • 网站背景图片优化关键词歌曲免费听
  • 郑州做网站哪家专业我要发布信息
  • 西安做网站优化的公司石家庄seo按天扣费
  • 2022年西安封城通知自动app优化下载
  • 无锡做网站哪家公司好一个公司可以做几个百度推广
  • 专题网站建设工作关键词林俊杰无损下载
  • adobe 网站开发软件软文写作兼职