纯html静态网站,wordpress所见即所得编辑器,做网站大约要多少钱,wordpress与ftp消息队列是一种应用程序之间通过异步通信进行数据交换的通信模式
消息队列的类型#xff1a;
点对点#xff0c;一对一的消息传递模型#xff0c;其中每个消息只能被一个接收者消费。发送者将消息发送到队列中#xff0c;而接收者从队列中获取消息并进行处理#xff0c;…消息队列是一种应用程序之间通过异步通信进行数据交换的通信模式
消息队列的类型
点对点一对一的消息传递模型其中每个消息只能被一个接收者消费。发送者将消息发送到队列中而接收者从队列中获取消息并进行处理一旦消息被接收者消费它将从队列中删除。这种模型适用于需要可靠传递的消息以及需要确保消息只被一个接收者处理的场景。发布订阅一对多的消息传递模型其中消息被发送到一个主题Topic而订阅该主题的所有接收者都会接收到该消息。 总结ZeroMQ小而美RabbitMQ大而稳Kakfa和RocketMQ快而强劲
RabbitMQ
一、安装
官网https://www.rabbitmq.com/download.html文件上传/usr/local/software目录erland提前的语言环境 rabbitmq-server 3. 常用命令进行设置 4. 查看服务状态 停止服务 /sbin/service rabitmq-server stop 开启web管理 查看防火墙状态需要关闭防火墙关闭防火墙并指定下次开机不需要开启防火墙 /sbin/service rebbitmq-server start 重新启动然后通过浏览器进行访问
创建用户以及权限
重新使用admin的方式进行登录
通过用户界面添加新用户
docker方式安装
二、使用Rabbit
/**消息生产者
*/
public class Producer{//队列名称public static final String Queue_NAME hello;//发消息public static void main(){//创建连接工厂ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.200.129);factory.setUsername(admin);factory.setPassword(123);//创建连接Connection conneciton factory.newConnection();//获取信道Channel channel connection.createChannel();/**生成一个队列参数队列名是否持久化默认是存储内存是否提供一个消费者进行消费true多个消费者共享是否自动删除true是不自动删除其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);String message hello world;//发送消息//参数发送到哪个交换机路由key是哪个其他参数发送消息的消息体channel.basicPublish(,QUEUE_NAME,null,message.getBytes())System.out.println(消息发送完毕);}
}/**消息消费者
*/
public class Consumer{//队列名称(必须和生产者的队列名称一致)public static final String QUEUE_NAME hello;//接收消息public static void main(String[] args){ConnectionFactory factory new ConnectionFactory();factory.setHost(192.168.200.129);factory.setUsername(admin);factory.setPassword(123);//创建连接Connection conneciton factory.newConnection();//获取信道Channel channel connection.createChannel();/**消费者消费消息参数消费哪个队列消费成功后是否应答true标识自动应答false手动应答消费者未成功消费的回调*/DeliverCallback deliverCallback (consumerTag,messaage) - {System.out.println(new String(message.getBody()));}//取消消息时回调CancelCallack cancelCallback consumerTag - {System.out.println(消息消费被中断);}channel.basicConsume(QUEUE_NAME,trye,deliverCallback,cancelCallback);}
}三、工作模式的场景 work queues一个消息只能被处理一次不可以处理多次消费者工作线程采用轮询的方式依次接收消息。 默认是轮询机制如果两个消费者的处理时间差距大必须采用处理消息快的能够多次消费到消息
//抽取工具类将连接工厂、创建信道抽取为一个工具
public class RabbitMqUtils{public static Channel getChannel() throws Exception{ConnectionFactory factory new ConnectionFactory();factory.setHost(182.92.234.71);factory.setUsername(admin);factory.setPassword(123);Connection connection factory.newConnection();Channel channel connection.createChannel();return channel;}
}//消费者工作线程
publci class Worker01{//队列名称public static final String QUEUE_NAME hello;public static void main(String[] args){Channel channel RabbitMqUtils.getChannel();//消息接收DeliverCallback deliverCallback (sonsumerTag,message) - {System.out.println(接收到的消息new String(message.getBody()));};//消息被取消执行CancelCallback cancelCallback (consumerTag) - {System.out.println(consumerTag消费者取消消费接口回调逻辑);};/**参数消费哪个队列消费成功之后是否要自动应答true手动应答false消费者未成功消费的回调消费者取消消费的回调*/channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}采用idea工具模拟多线程消费
//生产者发送大量消息
public class Task01{//队列名称public static final String QUEUE_NAME hello;//发送大量消息public static void main(String[] args){Channel channel RabbitMqUtils.getChannel();/**参数队列名称队列里的消息是否持久化默认是内存中队列消息是否只供一个消费者进行消费true多个消费者,false一个消费者是否自动删除最后一个消费者断开连接true自动删除false不自动删除其他参数*/channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台接收信息Scanner scanner new Scanner(System.in);while(scanner.hasNext()){String message scanner.next();channel.basicPublish(,QUEUE_NAME,null,message.getBytes());System.out.println(发送消息完成message);}}
}不公平分发消费者 接收消息设置参数channelasicQos(1)每个消费者都需要设置 预取值提前指定消费者 设置方式和不公平分发一样
四、集群
镜像模式搭建RabbitMQ集群
3台节点每台节点都执行如下启动RabbitMQ 通过浏览器ip依次访问3台节点 对3个节点依次操作 浏览器进行访问验证 对普通集群进行镜像集群的转化 通过浏览器进行对任意一个队列的创建队列发送消息。看其他节点是否一致
五、RabbitMQ 如何保证全链路数据100%不丢失
消息从生产端到消费端消费要经过3个步骤
生产端发送消息到RabbitMQRabbitMQ发送消息到消费端消费端消费这条消息
生产端可靠性投递
丢失原因消息在网络传输的过程中发生网络故障消息丢失消息投递到RabbitMQ时RabbitMQ挂了那消息也可能丢失 针对以上情况RabbitMQ提供的机制 ①、事务消息机制由于会严重降低性能一般不采用而采用另一种轻量级的解决方案confirm消息确认机制 ②、confirm消息确认机制生产端投递的消息一旦投递到RabbitMQ后RabbitMQ就会发送一个确认消息给生产端让生产端知道我已经收到消息了否则这条消息就可能已经丢失了需要生产端重新发送消息了。 通过以下代码来开启确认模式
channel.confirmSelect();//开启发送方确认模式然后异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener(){//消息正确到达brokerOverridepublic void handleAck(long deliveryTag,boolean multiple) throws IOException{System.out.println(已收到消息);//做一些其他处理}//RabbitMQ因为自身内部错误导致消息丢失就会发送一条nack消息Overridepublic void handlerNack(long deliveryTag,boolean multiple) throws IOException{System.out.println(未确认消息标识 deliveryTag);//做一些其他处理比如消息重发等}
});消息持久化
RabbitMQ收到消息后将这个消息暂时存在了内存中那这就会有个问题如果RabbitMQ挂了那重启后数据就丢失了所以相关的数据应该持久化到硬盘中这样就算RabbitMQ重启后也可以到硬盘中取数据恢复。那如何持久化呢 message消息到达RabbitMQ后先是到exchange交换机中然后路由给queue队列最后发送给消费端。 需要给exchange、queue和message都进行持久化
①、exchange持久化
//第三个参数true标识这个exchange持久化
channel.exchangeDeclare(EXCHANGE_NAME,direct,true);②、queue持久化
//第二个参数true表示这个queue持久化
channel.queueDeclare(QUEUE_NAME,true,false,false,null);队列持久化声明队列的时候把durable参数设置为持久化当rabbitmq重启后队列不会被删除掉 如果之前声明的队列不是持久化需要把原先队列删除或者重新创建一个队列否职责会报错
③、message持久化
//第三个参数MessageProperties.PERSISTENT_TEXT_PLAIN表示这条消息持久化
channel.basicPublish(EXCHENGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));消息持久化消息生产者MessageProperties.PERSISTENT_TEXT_PLAIN
这样如果RabbitMQ收到消息后挂了重启后会自行恢复消息。
RabbitMQ收到消息还没来得及将消息持久化到硬盘时RabbitMQ挂了这样消息还是丢失了或者RabbitMQ在发送确认消息给生产端的过程中由于网络故障而导致生产端没有收到确认消息这样生产端就不知道RabbitMQ到底有没有收到消息就不好做接下来的处理。 所以除了RabbitMQ提供的一些机制外我们自己也要做一些消息补偿机制以应对一些极端情况。其中的一种解决方案——消息入库。 将要发送的消息保存到数据库中。 首先发送消息前先将消息保存到数据库中有一个状态字段status0表示生产端将消息发送给了RabbitMQ但还没收到确认。在生产端收到确认后将status设为1表示RabbitMQ已收到消息。 这里有可能会出现上面说的两种情况所以生产端这边开一个定时器定时检索消息表将status0并且超过固定时间后可能消息刚发出去还没来得及确认这边定时器刚好检索到这条status0的消息所以给个时间还没收到确认的消息取出重发第二种情况下这里会造成消息重复消费者端要做幂等性可能重发还会失败所以可以做一个最大重发次数超过就做另外的处理。
这样消息就可以可靠性投递到RabbitMQ中了而生产端也可以感知到了。
消费端可靠性接收
一、在RabbitMQ将消息发出后消费掉还没接收之前发送网络故障消费端与RabbitMQ断开连接此时消息会丢失。 二、在RabbitMQ将消息发出后消费端还没接收到消息消费端挂了消息会丢失 三、消费端接收到消息但在处理消息的过程中发生异常宕机。消息也会丢失。 上述3中情况导致消息丢失归根结底是因为RabbitMQ的自动ack机制即默认RabbitMQ在消息发出后就立即将这条消息删除而不管消费端是否接收到是否处理完导致消费端消息丢失时RabbitMQ自己又没有这条消息了。
所以就需要将自动ack机制改为手动ack机制 当autoAck参数置为false对于RabbitMQ服务端而言队列中的消息分成了两个部分
一部分是等待投递给消费端的消息一部分是已经投递给消费端但是还没有收到消费端确认信号的消息
应答机制
自动应答手动应答手动应答三个方法
DeliverCallback deliverCallback (consumerTag,delivery) - {try{//接收到消息做处理//手动确认channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}catch(Exception e){//出错处理这可以让消息重回队列重新发送或直接丢弃消息}
};//第二个参数autoAck设为false表示自动关闭确认机制需要手动确认
channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag - {});当消息被消费者完全处理后队列才能删除消息。否则会一旦消费者出现故障消息未被完全处理就被队列删除造成消息丢失。
消息自动重新入队 当接收消息的C1突然宕机队列会重新安排消息进入队列让其他消费者进行处理
//消息生产者
public class Task02{//队列名称public static final String task_queue_name ack_queue;publix static void main(String[] args){Channel channel RabbitMqUtils.getChannel();channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);Scanner scanner new Scanner(System.in);while(scanner.hasNext()){String message scanner.next();channel.basicPublish(,TASK_QUEUE_NAME,null,message.getBytes(UTF-8));System.out.println(生产者发出消息:message)}}
}
//消息消费者一消息在手动应答时不丢失放回队列中重新消费
public class Work01{public static final String TASK_QUEUE_NAME ack_queue;public static void main(String[] args){Channel channel RabbitMqUtils.getChannel();System.out.println(C1等待接收消息处理时间较短);DeliverCallback deliverCallback (consumerTag,message) - {//模拟处理业务Thread.sleep(1000);System.out.println(接收到的消息 new String(message.getBody(),UTF-8));//手动应答(消息的标记,不批量处理应答)channel.basicAck(message.getEnvelope(),getDeliveryTag(),false);};//采用手动应答boolean autoAck false;channel.basicConsumer(TASK_QUEUE_NAME,autoAck,(consumerTag - {System.out.println(consumerTag消费者取消消费接口回调逻辑);}));}
}//消息消费者二消息在手动应答时不丢失放回队列中重新消费
public class Work02{public static final String TASK_QUEUE_NAME ack_queue;public static void main(String[] args){Channel channel RabbitMqUtils.getChannel();System.out.println(C1等待接收消息处理时间较短);DeliverCallback deliverCallback (consumerTag,message) - {//模拟处理业务较长Thread.sleep(1000000);System.out.println(接收到的消息 new String(message.getBody(),UTF-8));//手动应答(消息的标记,不批量处理应答)channel.basicAck(message.getEnvelope(),getDeliveryTag(),false);};//采用手动应答boolean autoAck false;channel.basicConsumer(TASK_QUEUE_NAME,autoAck,(consumerTag - {System.out.println(consumerTag消费者取消消费接口回调逻辑);}));}
}消息生产者发送消息后C2处理时间较长还未处理完就宕机此时会看到C1接收到了 说明消息队列被重新入队了。
如果RabbitMQ一直没有收到消费端的确认信号并且消费此消息的消费端已经断开连接或宕机RabbitMQ会自己感知到则RabbitMQ会安排该消息重新进入队列放在队列头部等待投递给下一个消费者当然也有能还是原来的那个消费端当然消费端也需要确保幂等性。
RocketMQ
RocketMQ主要由NameServer、Broker、Producer以及Consumer四部分构成。
NameServer主要负责对于源数据的管理包括了对于Topic和路由信息的管理。每个NameServer节点互相之间是独立的没有任何信息交互Broker消息中转角色负责存储消息转发消息。单个Broker节点与所有的NameServer节点保持长连接及心跳并会定时将Topic信息注册到NameServer顺带一提底层的通信和连接都是基于Netty实现的。Producer负责产生消息一般由业务系统负责产生消息。由用户进行分布式部署消息由Producer通过多种负载均衡模式发送到Broker集群发送低延时支持快速失败。Consumer负责消费消息一般是后台系统负责异步消费。由用户部署支持PUSH和PULL两种消费模式支持集群消费和广播消息提供实时的消息订阅机制
流程
Broker在启动的时候会去向NameServer注册并且定时发送心跳Producer在启动的时候会到NameServer上去拉取Topic所属的Broker具体地址然后向具体的Broker发送消息。
消息领域模型分为Message、Topic、Queue、Offset以及Group这几部分。
Topic标识消息的第一级类型比如一个电商系统的消息可以分为交易消息、物流消息等。一条消息必须有一个Topic。Tag表示消息的第二级类型比如交易消息又可以分为交易创建消息交易完成消息等。RocketMQ提供2级消息分类Group组一个组可以订阅多个Topic。Message Queue消息的物理管理单位。一个Topic下可以有多个QueueQueue的引入使得消息的存储可以分布式集群化具有了水平扩展能力 RocketMQ 中所有消息队列都是持久化长度无限的数据结构所谓长度无限是指队列中的每个存储单元都是定长访问其中的存储单元使用Offset来访问Offset来访问Offset为Java long类型64位。 Message Queue是一个长度无限的数组Offset就是下标。
RocketMQ的关键特性 ①消息的顺序值得是消息消费时能按照发送的顺序来消费。 例如一个订单产生了 3 条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才有意义。但同时订单之间又是可以并行消费的。 RocketMQ是通过将“相同的ID的消息发送到同一个队列而一个队列的消息只由一个消费者来处理”来实现顺序消息。
②消息的重复消息领域有一个对消息投递的Qos服务质量定义分为
最多一次至少一次仅一次
③消息去重 原则使用业务端逻辑保持幂等性 就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的不会因为多次点击而产生了副作用数据库的结果都是唯一的不可变的。
去重策略保证每条消息都有唯一编号唯一流水号且保证消息处理成功与去重的日志同时出现。 建立一个消息表拿到这个消息作数据库的insert操作给这个消息做一个唯一主键或者唯一约束那么就算出现重复消费的情况就会导致主键冲突那么就不再处理这条消息。
Kafka
体系结构若干个Producer若干个Broker若干个Consumer一个Zookeeper集群。 Zookeeper用来负责集群元数据的管理、控制器的选举。 Producer将消息发送到Broker Broker将受到的消息存储到磁盘中 Consumer负责从Broker订阅并消费消息
JDKZookeeperkafka 的安装与配置
# jdk安装包的下载并解压
ll jdk-8u181-linux- x64.tar.gz
tar zxvf jdk-8u181-linux- x64.tar.gz
#解压之后当前/opt目录下生成一个名为jdk1.8.0_181的文件夹
cd jdk1.8.0_181/
pwd
/opt/jdk1.8.0_181配置JDK环境变量修改/etc/prifile文件并向其添加如下配置
export JAVA_HOME/opt/jdk1.8.0_181
export JRE_HOME$JAVA_HOME/jre
export PATH$PATH:$JAVA_HOME/bin
export CLASSPATH./://$JAVA_HOME/LIB:$JRE_HOME/lib生产和消费