对网站开发实训的建议,做个网站需要多少钱.,徐州市网站开发,网站播放器源码RabbitMQ的概念
RabbitMQ是一个消息中间件#xff1a;他可以接受并转发消息。例如你可以把它当做一个快递站点#xff0c;当你要发送一个包 裹时#xff0c;你把你的包裹放到快递站#xff0c;快递员最终会把你的快递送到收件人那里#xff0c;按照这种逻辑 RabbitMQ 是 …RabbitMQ的概念
RabbitMQ是一个消息中间件他可以接受并转发消息。例如你可以把它当做一个快递站点当你要发送一个包 裹时你把你的包裹放到快递站快递员最终会把你的快递送到收件人那里按照这种逻辑 RabbitMQ 是 一个快递站一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于它不处理快件而是接收 存储和转发消息数据
四大核心概念
生产者产生数据发送消息的程序
交换机交换机是RabbitMQ中的一个重要的部件一方面它接受来自生产者的消息另一方面它将消息推送到队列中。交换机必须确切的知道如何处理它接受到的消息是将这些消息推送到特定队列还是推送到多个队列或者是把消息丢弃这个得有交换机类型决定
队列队列是RabbitMQ内部使用的一种数据结构尽管消息流经RabbitMQ和应用程序但是它们只能存储在的队列中。队列仅受主机内存和磁盘限制的约束本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费者消费者大多数是一个等待接收消息的程序。请注意生产者消费者和消息中间件很多时候并不在一个机器上。同一个程序既可以是生产者也可以是消费者
RabbitMQ核心部分 各种名词介绍 Broker接收和分发消息的应用RabbitMQ Server 就是 Message Broker
Virtual host出于多租户和安全因素设计的把 AMQP 的基本组件划分到一个虚拟的分组中类似 于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时可以划分出 多个 vhost每个用户在自己的 vhost 创建 exchangequeue 等
Connectionpublisherconsumer 和 broker 之间的 TCP 连接
Channel如果每一次访问RabbitMQ都建立一个Connection在消息量大的时候建立TCP Connection的开销是非常大的效率是很低效的。Channel是在connect内部建立的逻辑连接如果应用成型鼓支持多线程通常每个 thread 创建单独的 channel 进行通讯AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP Connection 的开销
Exchangemessage 到达 broker 的第一站根据分发规则匹配查询表中的 routing key分发 消息到 queue 中去。常用的类型有direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue消息最终被送到这里等待 consumer 取走
Bindingexchange 和 queue 之间的虚拟连接binding 中可以包含 routing keyBinding 信息被保 存到 exchange 中的查询表中用于 message 的分发依据
RabbitMQ的安装
官网地址rabbitmq.com/download.html
百度网盘大家自取
链接https://pan.baidu.com/s/1U7rdXf2yk9PRGxOJxhcY8A?pwdd0jd 提取码d0jd 文件上传
以CentOS系统举例将以上的软件安装使用命令scp
scp D:\Java学习课件\MQ\软件\rabbitmq-server-3.8.8-1.el7.noarch.rpm root118.31.6.100:/root
安装文件
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
常用命令按照以下顺序
添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
启动服务
/sbin/service rabbitmq-server start
查看服务状态
/sbin/service rabbitmq-server status 停止服务(选择执行)
/sbin/service rabbitmq-server stop
开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
用默认账号密码(guest)访问地址 http://真实IP:15672/出现权限问题使用云服务器的一定要把云服务器的防火墙的打开 添加一个新的用户
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
set permissions-p vhostpathuserconf writereadrabbitmqctl set_permissions -p / admin .* .* .*
用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
当前用户和角色
rabbitmqctl list_users
利用 admin 用户登录 重置命令
关闭应用的命令
rabbitmqctl stop_app
清除的命令
rabbitmqctl reset
重新启动命令
rabbitmqctl start_app
Hello World
引入依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu.rabbitmq/groupIdartifactIdrabbitmq-hello/artifactIdversion1.0-SNAPSHOT/version!--指定 jdk 编译版本--buildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource8/sourcetarget8/target/configuration/plugin/plugins/builddependencies!--rabbitmq 依赖客户端--dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.8.0/version/dependency!--操作文件流的一个依赖--dependencygroupIdcommons-io/groupIdartifactIdcommons-io/artifactIdversion2.6/version/dependencydependencygroupIdch.qos.logback/groupIdartifactIdlogback-classic/artifactIdversion1.2.6/version !-- 使用最新版本 --/dependency/dependencies/project
生产者代码
package com.rabbitmq.one;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** 生产者发消息*/public class Produce {public static final String QUEUE_NAMEhello;public static void main(String[] args) throws IOException, TimeoutException {//创建工厂ConnectionFactory factory new ConnectionFactory();//工厂IP连接rabbitmqfactory.setHost(118.31.6.132);//用户名factory.setUsername(admin);//密码factory.setPassword(123);//创建连接Connection connection factory.newConnection();//获取信道Channel channel connection.createChannel();/**生成一个队列* 1.队列名称* 2.队列里面的信息是否持久化磁盘默认情况时在内存* 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许* 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除* 5.其他参数 延迟消息等*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//发消息String message hello world;/*** 发送一个消息* 1.发送到那个交换机* 2.路由的KEY值是哪个 本次是队列的名称* 3.其他参数信息* 4.发送消息的消息体*/channel.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(消息发送完毕);}
}消费者代码
package com.rabbitmq.one;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeoutException;/*** 消费者*/
public class Consume {//队列名称public static final String QUEUE_NAME hello;//接受消息public static void main(String[] args) throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(118.31.6.132);factory.setUsername(admin);factory.setPassword(123);Connection connection factory.newConnection();Channel channel connection.createChannel();//声明 接受消息DeliverCallback deliverCallback (consumerTag,message)-{System.out.println(new String(message.getBody()));};//取消消息的回调CancelCallback cancelCallback consumerTag-{System.out.println(消息消费被中断);};/*** 消费者信息* 1.消费哪个队列* 2.消费成功之后是否要自动应答 true自动应答 false手动应答* 3.消费者微车才能更改消费的回调* 4.消费者取消消费回调*/channel.basicConsume(QUEUE_NAME,true, deliverCallback,cancelCallback);}
}运行结果