做网站可以用电脑当服务器吗,wordpress+全局设定,科技魏玄成,吉林seo推广系统Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架#xff0c;该框架提供了一个灵活的编程模型#xff0c;它建立在已经建立和熟悉的Spring熟语和最佳实践上#xff0c;包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。…Spring Cloud Stream是用于构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务框架该框架提供了一个灵活的编程模型它建立在已经建立和熟悉的Spring熟语和最佳实践上包括支持持久化的发布/订阅、消费组以及消息分区这三个核心概念。
【1】概念介绍
① 什么是Spring Cloud Stream
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。为一些供应商的消息中间件产品提供了个性化的自动化配置实现引用了发布-订阅、消费组、分区的三个核心概念。简单来讲就是屏蔽了底层XXMQ应用层不用关注底层是RabbitMQ还是Kafka 。类似于Spring Data抽离持久层屏蔽底层各种数据库的概念。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) 而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。 通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。 官网文档https://spring.io/projects/spring-cloud-stream#overview ② stream如何统一底层差异
在没有绑定器这个概念的情况下我们的SpringBoot应用要直接与消息中间件进行信息交互的时候由于各消息中间件构建的初衷不同它们的实现细节上会有较大的差异性。 Binder可以生成BindingBinding用来绑定消息容器的生产者和消费者它有两种类型INPUT和OUTPUTINPUT对应于消费者OUTPUT对应于生产者。 通过定义绑定器作为中间层完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装可以做到代码层面对中间件的无感知甚至于动态的切换中间件(rabbitmq切换为kafka)使得微服务开发的高度解耦服务可以关注更多自己的业务流程。 通过定义绑定器作为中间层完美地实现了应用程序与消息中间件细节之间的隔离。 通过向应用程序暴露统一的Channel通道使得应用程序不需要再考虑各种不同的消息中间件实现。 通过定义绑定器Binder作为中间层实现了应用程序与消息中间件细节之间的隔离。 ③ Spring Cloud Stream标准流程设计
Stream中的消息通信方式遵循了发布-订阅模式。
Binder很方便的连接中间件屏蔽差异Channel通道是队列Queue的一种抽象在消息通讯系统中就是实现存储和转发的媒介通过Channel对队列进行配置Source和Sink简单的可理解为参照对象是Spring Cloud Stream自身从Stream发布消息就是输出接受消息就是输入。 ④ 几个API注解
EnableBinding指信道channel和exchange绑定在一起。
StreamListener监听队列用于消费者的队列的消息接收
Input注解标识输入通道通过该输入通道接收到的消息进入应用程序。
Output注解标识输出通道发布的消息将通过该通道离开应用程序
下面以RabbitMQ为底层MQ来说明如何使用Stream当然同样要先安装好RabbitMQ。
【2】消息生产者
① pom依赖
dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependency② yml配置
server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于 binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 输出通道的名称destination: studyExchange #表示要使用的 Exchange 名称定义content-type: application/json # 消息类型binder: defaultRabbit
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: send-8001.com #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址
③ 消息服务类
public interface IMessageProvider {public String send();
}EnableBinding(Source.class)//定义消息推送管道
Slf4j
public class IMessageProviderImpl implements IMessageProvider {Resourceprivate MessageChannel output;//消息发送通道Overridepublic String send() {String serial UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());log.info(serial***********************);return serial;}
}编写控制器发送消息
RestController
public class IMessageController {Resourceprivate IMessageProvider provider;GetMapping(/sendMessage)public String send(){return provider.send();}
}
【3】消息消费者
① pom依赖
dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-stream-rabbit/artifactId/dependency② yml配置
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于 binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange #表示要使用的 Exchange 名称定义content-type: application/json # 消息类型binder: defaultRabbitgroup: group1
eureka:client:service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳时间间隔默认30slease-expiration-duration-in-seconds: 5 # 如果超过了5秒的间隔默认90sinstance-id: receive-8002.com #信息列表显示主机名称prefer-ip-address: true # 访问路径变为ip地址③ 消息接收服务
Component
Slf4j
EnableBinding(Sink.class)
public class StreamController {Value(${server.port})private String serverPort;StreamListener(Sink.INPUT)public void input(MessageStringmessage){log.info(消费者1号接收到消息message.getPayload()\t port:serverPort);}}上面【2】【3】即可实现消息的发送和接收但是假设有多个消费者我们还要考虑两个问题消息的重复消费和消息的持久化。
【4】group分组解决重复消费问题
比如在如下场景中订单系统我们做集群部署都会从RabbitMQ中获取订单信息那如果一个订单同时被两个服务获取到那么就会造成数据错误我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。
在Stream中处于同一个group中的多个消费者是竞争关系就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费)同一组内会发生竞争关系只有其中一个可以消费。
也就是说两个消费者微服务的group定义为同一个即可以解决重复消费问题。
【5】group分组解决消息丢失问题
即在你服务停机重启期间消息在不断发送而服务启动后并没有接收到发送的消息。这是由于你没有配置group属性导致的。
解决方案配置group属性。
spring:application:name: cloud-stream-consumercloud:stream:binders: #在此配置要绑定的 rabbitmq的服务信息defaultRabbit: # 表示定义的名称用于 binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 输出通道的名称destination: studyExchange #表示要使用的 Exchange 名称定义content-type: application/json # 消息类型binder: defaultRabbitgroup: group1 # 这个很重要