当前位置: 首页 > news >正文

网站集约化平台建设专注赣州网站建设

网站集约化平台建设,专注赣州网站建设,少儿编程课程,暴富建站 网址文章目录介绍RocketMQ特点Spring Cloud StreamWindow搭建部署RocketMQ下载启动NameServer服务启动Broker服务示例创建 RocketMQ 消息生产者创建 RocketMQ 消息消费者使用示例示例关联项目运行示例测试介绍 RocketMQ 是一款开源的分布式消息系统#xff0c;基于高可用分布式集… 文章目录介绍RocketMQ特点Spring Cloud StreamWindow搭建部署RocketMQ下载启动NameServer服务启动Broker服务示例创建 RocketMQ 消息生产者创建 RocketMQ 消息消费者使用示例示例关联项目运行示例测试介绍 RocketMQ 是一款开源的分布式消息系统基于高可用分布式集群技术提供低延时的、高可靠的消息发布与订阅服务广泛应用于多个领域包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。 RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件目前已经捐赠给 Apache 软件基金会并于2017年9月25日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。 RocketMQ特点 是一个队列模型的消息中间件具有高性能、高可靠、高实时、分布式等特点Producer、Consumer、队列都可以分布式Producer 向一些队列轮流发送消息队列集合称为 TopicConsumer 如果做广播消费则一个 Consumer 实例消费这个 Topic 对应的所有队列如果做集群消费则多个 Consumer 实例平均消费这个 Topic 对应的队列集合能够保证严格的消息顺序支持拉pull和推push两种消息模式高效的订阅者水平扩展能力实时的消息订阅机制亿级消息堆积能力支持多种消息协议如 JMS、OpenMessaging 等较少的依赖 Spring Cloud Stream Spring Cloud Stream 是一个构建消息驱动微服务的框架。 Spring Cloud Stream 提供了消息中间件配置的统一抽象推出了 pub/subconsumer groupssemanticsstateful partition 这些统一的模型支持。 Spring Cloud Stream 核心构件有Binders、Bindings和Message应用程序通过 inputs 或者 outputs 来与 binder 交互通过我们配置来 binding 而 binder 负责与中间件交互Message为数据交换的统一数据规范格式。 Binding: 包括 Input Binding 和 Output Binding。 Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可屏蔽了开发者与底层消息中间件的接触。 Binder: 跟外部消息中间件集成的组件用来创建 Binding各消息中间件都有自己的 Binder 实现。 比如 Kafka 的实现 KafkaMessageChannelBinderRabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder。 Message是 Spring Framework 中的一个模块其作用就是统一消息的编程模型。 比如消息 Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header。 spring-cloud-stream 官网 Window搭建部署RocketMQ 下载 当前最新版本为4.6.0 下载出来解压到D:\rocketmq 目录目录最好不要带空格和太深否则服务运行可能会报错 启动NameServer服务 在启动之前需要配置系统环境不然会报错。 Please set the ROCKETMQ_HOME variable in your environment! 系统环境变量名ROCKETMQ_HOME 根据你解压的目录配置环境变量比如我的变量值为D:\rocketmq 进入window命令窗口进入D:\rocketmq\bin目录下执行 start mqnamesrv.cmd 如上则NameServer启动成功。使用期间窗口不要关闭。 启动Broker服务 进入bin目录下输入 start mqbroker.cmd -n localhost:9876 如上的 ipport 是rocketmq的服务地址和端口。 运行如上命令可能会报如下错误。找不到或无法加载主类 如果出此情况打开bin–runbroker.cmd修改%CLASSPATH%成%CLASSPATH% 保存再次执行如上命令。执行成功后提示boot success 代表成功。 示例 本示例实现三种消息的发布以及订阅接收。 创建 RocketMQ 消息生产者 创建 ali-rocketmq-producer 工程端口为28081 pom.xml添加依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdparentartifactIdcloud-alibaba/artifactIdgroupIdcom.easy/groupIdversion1.0.0/version/parentmodelVersion4.0.0/modelVersionartifactIdali-rocketmq-producer/artifactIdpackagingjar/packagingdependencies!--rocketmq依赖--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-stream-rocketmq/artifactId/dependency!--web依赖--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build/project配置 Output 的 Binding 信息并配合 EnableBinding 注解使其生效 application.yml配置 server:port: 28081spring:application:name: ali-rocketmq-producercloud:stream:rocketmq:binder:# RocketMQ 服务器地址name-server: 127.0.0.1:9876bindings:output1: {destination: test-topic1, content-type: application/json}output2: {destination: test-topic2, content-type: application/json}management:endpoints:web:exposure:include: *endpoint:health:show-details: alwaysArProduceApplication.java SpringBootApplication EnableBinding({MySource.class}) public class ArProduceApplication {public static void main(String[] args) {SpringApplication.run(ArProduceApplication.class, args);} }消息生产者服务 MySource.java package com.easy.arProduce;import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel;public interface MySource {Output(output1)MessageChannel output1();Output(output2)MessageChannel output2(); }SenderService.java package com.easy.arProduce;import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.util.MimeTypeUtils;Service public class SenderService {Autowiredprivate MySource source;/*** 发送字符串** param msg*/public void send(String msg) {Message message MessageBuilder.withPayload(msg).build();source.output1().send(message);}/*** 发送带tag的字符串** param msg* param tag*/public void sendWithTags(String msg, String tag) {Message message MessageBuilder.withPayload(msg).setHeader(RocketMQHeaders.TAGS, tag).build();source.output1().send(message);}/*** 发送对象** param msg* param tag* param T*/public T void sendObject(T msg, String tag) {Message message MessageBuilder.withPayload(msg).setHeader(RocketMQHeaders.TAGS, tag).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build();source.output2().send(message);} }编写 TestController.java 控制器方便测试 package com.easy.arProduce;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController;RestController RequestMapping(value test) public class TestController {AutowiredSenderService senderService;RequestMapping(value /send, method RequestMethod.GET)public String send(String msg) {senderService.send(msg);return 字符串消息发送成功!;}RequestMapping(value /sendWithTags, method RequestMethod.GET)public String sendWithTags(String msg) {senderService.sendWithTags(msg, tagStr);return 带tag字符串消息发送成功!;}RequestMapping(value /sendObject, method RequestMethod.GET)public String sendObject(int index) {senderService.sendObject(new Foo(index, foo), tagObj);return Object对象消息发送成功!;} }创建 RocketMQ 消息消费者 创建 ali-rocketmq-consumer 工程端口为28082 pom.xml添加依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsdparentartifactIdcloud-alibaba/artifactIdgroupIdcom.easy/groupIdversion1.0.0/version/parentmodelVersion4.0.0/modelVersionartifactIdali-rocketmq-consumer/artifactIdpackagingjar/packagingdependenciesdependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-stream-rocketmq/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependency/dependenciesbuildpluginsplugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactId/plugin/plugins/build /project-配置 Input 的 Binding 信息并配合 EnableBinding 注解使其生效 application.yml配置 server:port: 28082spring:application:name: ali-rocketmq-consumercloud:stream:rocketmq:binder:name-server: 127.0.0.1:9876 #rocketmq 服务地址bindings:input1: {consumer.orderly: true} #是否排序input2: {consumer.tags: tagStr} #订阅 带tag值为tagStr的字符串input3: {consumer.tags: tagObj} #订阅 带tag值为tabObj的字符串bindings:input1: {destination: test-topic1, content-type: text/plain, group: test-group1, consumer.maxAttempts: 1}input2: {destination: test-topic1, content-type: application/plain, group: test-group2, consumer.maxAttempts: 1}input3: {destination: test-topic2, content-type: application/plain, group: test-group3, consumer.maxAttempts: 1}management:endpoints:web:exposure:include: *endpoint:health:show-details: alwaysArConsumerApplication.java package com.easy.arConsumer;import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding;SpringBootApplication EnableBinding({MySource.class}) public class ArConsumerApplication {public static void main(String[] args) {SpringApplication.run(ArConsumerApplication.class, args);} }消息消费者服务 MySource.java package com.easy.arConsumer;import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel;public interface MySource {Input(input1)SubscribableChannel input1();Input(input2)SubscribableChannel input2();Input(input3)SubscribableChannel input3(); }ReceiveService.java package com.easy.arConsumer; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service;Service Slf4j public class ReceiveService {StreamListener(input1)public void receiveInput1(String receiveMsg) {log.info(input1 接收到了消息 receiveMsg);}StreamListener(input2)public void receiveInput2(String receiveMsg) {log.info(input2 接收到了消息 receiveMsg);}StreamListener(input3)public void receiveInput3(Payload Foo foo) {log.info(input3 接收到了消息 foo);} }使用示例 示例关联项目 本示例我们创建了两个项目实现 ali-rocketmq-producerRocketMQ 消息服务生产者服务名ali-rocketmq-producer端口28081 ali-rocketmq-consumerRocketMQ 消息服务消费者服务名ali-rocketmq-producer端口28082 运行示例测试 首先要启动ali-rocketmq-producer服务及ali-rocketmq-consumer服务 访问消息服务生产者地址 http://localhost:28081/test/send?msgyuntian 查看服务消费者控制台输出 2019-12-04 15:37:47.859 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input1 接收到了消息yuntian 2019-12-04 15:37:47.859 INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDA70E0014 cost: 1 ms表示字符串消费成功被input1消费了 访问消息服务生产者地址 http://localhost:28081/test/sendWithTags?msgtagyuntian 查看服务消费者控制台输出 2019-12-04 15:38:09.586 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input2 接收到了消息tagyuntian 2019-12-04 15:38:09.592 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input1 接收到了消息tagyuntian 2019-12-04 15:38:09.592 INFO 6356 --- [MessageThread_1] s.b.r.c.RocketMQListenerBindingContainer : consume C0A8096E200818B4AAC212CDFCD30015 cost: 6 ms表示带tag的字符串成功被input2和input1消费了因为input1也订阅了test-topic1并且没有我们没有加tag过滤默认表示接收所有消息所以也能成功接收tagyuntian字符串 访问消息服务生产者地址 http://localhost:28081/test/sendObject?index1 查看服务消费者控制台输出 2019-12-04 15:41:15.285 INFO 6356 --- [MessageThread_1] com.easy.arConsumer.ReceiveService : input3 接收到了消息Foo{id1, barfoo}表示input3成功接收到了tag带tagObj的对象消息了而input1却没有输出消息这是因为sendObject发布的消息走的是test-topic2消息管道所以不会发布给input1及input2订阅者
http://www.hkea.cn/news/14266903/

相关文章:

  • 网站建设运维情况2019做网站需要营业执照吗
  • 怎么在自己做的网站上发视频教程做家装网站源码
  • 上海网站建设口碑好大同网络公司
  • 淮安新港建设有限公司网站wordpress主题公园
  • 阿里云服务器 放多个网站装饰工程施工流程步骤
  • 电子商务网站建设实训报告文章孩子学编程一年要多少钱
  • 西乡专业做网站公司给客户做一个网站ppt怎么做
  • redis做网站保定网站建设浩森宇特
  • 那个网站的域名便宜营销自己的网站
  • 石家庄建设一个网站多少钱网页设计心得体会100
  • 网站的基本知识怎样做一个公司网站
  • 沈阳点金网站建设网站后台用什么
  • 网站优化哪里可以做爱有声小说网站捡个校花做老婆
  • 利用wordpress做api提供者电商网站产品设计优化技术主要是
  • 网站开发外包维护合同范本营销策划书怎么写格式
  • 网站开发页面静态化技术谷歌seo外贸推广
  • 如何做视频教程网站惠州seo外包平台
  • 镇江网站建设优化案例分析杭州设计公司装修
  • 在服务器上部署网站企业信息
  • 苏州建设网站平台WordPress海报封面主题
  • 网站建设 站内页面连接如何搭建一个自己的服务器
  • 阿里云网站地图是怎么做的wordpress友链插件
  • 百度投诉电话24小时巩义自助建站优化
  • 未成年人做网站多少钱能注册500万公司
  • 温州网站建设价格技术山西太原网络推广
  • 馆陶网站建设公司体育用品东莞网站建设
  • 做毕业设计免费网站建设用什么程序做资讯类网站
  • 百度数据网站贵阳市白云区官方网站
  • 网站服务器租用怎样收费西安wordpress
  • 河北省电力建设第一工程公司网站做网站需要学多久