如何在网站做淘宝页面,saas系统是什么意思啊,百度开户需要什么条件,永久免费虚拟主机申请大纲 Pom.xml监听队列实时返回消息测试完整代码工程代码 在之前的案例中#xff0c;我们在管理后台收发消息都是通过短连接的形式。本文我们将探索对队列中消息的实时读取#xff0c;并通过流式数据返回给客户端。 webflux是反应式Web框架#xff0c;客户端可以通过一个长连… 大纲 Pom.xml监听队列实时返回消息测试完整代码工程代码 在之前的案例中我们在管理后台收发消息都是通过短连接的形式。本文我们将探索对队列中消息的实时读取并通过流式数据返回给客户端。 webflux是反应式Web框架客户端可以通过一个长连接和服务端相连后续服务端可以通过该连接持续给客户端发送消息。可以达到发送一次多次接收的效果。
Pom.xml
由于我们要使用Rabbitmq所以要新增如下依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit-stream/artifactId/dependencywebflux的依赖如下 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-webflux/artifactId/dependencydependencygroupIdio.projectreactor/groupIdartifactIdreactor-core/artifactIdversion3.6.7/version/dependency监听队列
下面代码会返回一个监听队列的Container private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener listeners.get(queueName);if (listener null messageListener ! null) {listener new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}实时返回消息
一旦消费者读取到消息onMessage方法会被调用。然后Flux的消费者会将消息投递到流上。 public FluxString listen(String queueName) {return Flux.create(emitter - {SimpleMessageListenerContainer container getListener(queueName, new MessageListener() {Overridepublic void onMessage(Message message) {String msg new String(message.getBody());System.out.println(listen function Received message: msg);emitter.next(msg);}});container.start();});}测试
由于OpenApi不能支持实时展现流式数据所以我们采用Postman来测试。 发送请求后该页面一直处于滚动状态。 在管理后台发送一条消息 可以看到Postman收到了该消息 然后在发一条Postman又会收到一条 这样我们就完成了“请求一次多次返回”的效果。
完整代码
需要注意的是返回的格式需要标记为produces “text/event-stream”。
// controller
package com.rabbitmq.consumer.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;import com.rabbitmq.consumer.service.ConsumerService;import reactor.core.publisher.Flux;RestController
RequestMapping(/consumer)
public class ConsumerController {Autowiredprivate ConsumerService comsumerService;GetMapping(value /listen, produces text/event-stream)public FluxString listen(RequestParam String queueName) {return comsumerService.listen(queueName);}
}// service
package com.rabbitmq.consumer.service;import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import jakarta.annotation.PostConstruct;
import reactor.core.publisher.Flux;Service
public class ConsumerService {Autowiredprivate RabbitTemplate rabbitTemplate;private ConnectionFactory connectionFactory;private final ReentrantLock lock new ReentrantLock();private MapString, SimpleMessageListenerContainer listeners new java.util.HashMap();PostConstructpublic void init() {connectionFactory rabbitTemplate.getConnectionFactory();}public FluxString listen(String queueName) {return Flux.create(emitter - {SimpleMessageListenerContainer container getListener(queueName, new MessageListener() {Overridepublic void onMessage(Message message) {String msg new String(message.getBody());System.out.println(listen function Received message: msg);emitter.next(msg);}});container.start();});}private SimpleMessageListenerContainer getListener(String queueName, MessageListener messageListener) {lock.lock();try {SimpleMessageListenerContainer listener listeners.get(queueName);if (listener null messageListener ! null) {listener new SimpleMessageListenerContainer();listener.setConnectionFactory(connectionFactory);listener.setQueueNames(queueName);listener.setMessageListener(messageListener);listeners.put(queueName, listener);}return listener;} finally {lock.unlock();}}
}工程代码
https://github.com/f304646673/RabbitMQDemo/tree/main/consumer