网站设计制作电影,杭州网站建设维护,网上购物网站建设方案,php网站建设思路RabbitMQ是一个广泛使用的开源消息代理软件#xff0c;它实现了高级消息队列协议#xff08;AMQP#xff09;。RabbitMQ支持多种消息传递模式#xff0c;其中最基本的是点对点#xff08;Point-to-Point#xff09;通讯方式。在这种模式下#xff0c;消息生产者将消息发…RabbitMQ是一个广泛使用的开源消息代理软件它实现了高级消息队列协议AMQP。RabbitMQ支持多种消息传递模式其中最基本的是点对点Point-to-Point通讯方式。在这种模式下消息生产者将消息发送到一个队列而消息消费者从该队列中接收消息。每个消息只会被一个消费者消费一次。
下面将通过一个简单的Java代码案例详细介绍如何在RabbitMQ中实现点对点通讯。
1. 原理分析
一个生产者一个默认的交换机一个队列一个消费者看起来是生产者直接发送到队列实际上是发送到了默认交换机 结构图 2. 环境准备
在开始之前请确保你已经安装了以下环境
Java Development Kit (JDK) 8 或更高版本Apache MavenRabbitMQ 服务器
你可以通过以下命令检查Java和Maven的安装情况
java -version
mvn -version3. 添加RabbitMQ依赖
首先我们需要在Maven项目中添加RabbitMQ的Java客户端依赖。在你的pom.xml文件中添加以下内容
dependenciesdependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.20.0/version/dependency
/dependencies4. 创建消息生产者
接下来我们创建一个消息生产者它将消息发送到RabbitMQ的队列中。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MessageProducer {private final static String QUEUE_NAME hello;public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.200.138);factory.setPort(5672);factory.setVirtualHost(/test);factory.setUsername(test);factory.setPassword(test);// 创建连接和通道try (Connection connection factory.newConnection();Channel channel connection.createChannel()) {// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 发送消息String message Hello World!aaa;// 发布消息到exchange同时指定路由的规则// 参数1指定exchange使用。代表默认交换机// 参数2指定路由的规则使用具体的队列名称。// 参数3指定传递的消息所携带的properties使用null。// 参数4指定发布的具体消息byte[]类型channel.basicPublish(, QUEUE_NAME, null, message.getBytes());// Psexchange是不会帮你将消息持久化到本地的Queue才会帮你持久化消息。System.out.println( [x] Sent message );}}
}代码解析
ConnectionFactory: 用于创建到RabbitMQ服务器的连接。Connection: 代表与RabbitMQ服务器的物理连接。Channel: 用于发送和接收消息的通道。queueDeclare: 声明一个队列如果队列不存在则会创建它。basicPublish: 将消息发送到指定的队列。
默认交换机上可以看到生产者发送的消息 5. 创建消息消费者
接下来我们创建一个消息消费者它将从RabbitMQ的队列中接收消息。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;public class MessageConsumer {private final static String QUEUE_NAME hello;public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.200.138);factory.setPort(5672);factory.setVirtualHost(/test);factory.setUsername(test);factory.setPassword(test);// 创建连接和通道Connection connection factory.newConnection();Channel channel connection.createChannel();// 声明队列//参数1queue - 指定队列的名称//参数2durable - 当前队列是否需要持久化true//参数3exclusive是否排外的有两个作用// 一当连接关闭时connection.close()该队列是否会自动删除// 二该队列是否是私有的private如果不是排外的可以使用两个消费者都访问同一个队列没有任何问题// 如果是排外的会对当前队列加锁其他通道channel是不能访问的如果强制访问会报异常// com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #methodchannel.close(reply-code405, reply-textRESOURCE_LOCKED - cannot obtain exclusive access to locked queue queue_name in vhost /, class-id50, method-id20)// 一般等于true的话用于一个队列只能有一个消费者来消费的场景//参数4autoDelete - 如果这个队列没有消费者在消费并且所有消息都消费完,队列自动删除//参数5arguments - 指定当前队列的其他信息channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println( [*] Waiting for messages. To exit press CTRLC);// 设置消息回调DeliverCallback deliverCallback (consumerTag, delivery) - {String message new String(delivery.getBody(), UTF-8);System.out.println( [x] Received message );};// 开始消费消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag - { });}
}代码解析
DeliverCallback: 用于处理接收到的消息的回调函数。basicConsume: 开始消费队列中的消息。
6. 运行代码
首先启动RabbitMQ服务器。运行MessageProducer类发送消息。运行MessageConsumer类接收消息。
你应该会看到类似以下的输出
Producer Output: [x] Sent Hello World!Consumer Output: [*] Waiting for messages. To exit press CTRLC[x] Received Hello World!7. 总结
通过以上步骤我们成功地在RabbitMQ中实现了点对点通讯。消息生产者将消息发送到队列而消息消费者从队列中接收消息。这种模式非常适合需要确保消息只被一个消费者处理一次的场景。