建设网站的标语,wordpress主题开发班,做网站i3够用吗,企业网站模板建设一、消息存储机制
不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。持久化的消息在到达队列时就被写入到磁盘#xff0c;非持久化的消息一般只保存在内存中#xff0c;在内存吃紧的时候会被换入到磁盘中#xff0c;以节省内存空间。这两种类型的消息的落盘处理都…一、消息存储机制
不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。持久化的消息在到达队列时就被写入到磁盘非持久化的消息一般只保存在内存中在内存吃紧的时候会被换入到磁盘中以节省内存空间。这两种类型的消息的落盘处理都在 RabbitMQ 的“持久层”中完成。
持久层是一个逻辑上的概念实际包含两个部分
1队列索引rabbit_queue_index
rabbit_queue_index 负责维护队列中落盘消息的信息包括消息的存储地点、是否已被交付给消费者、是否已被消费者 ack 等。每个队列都有与之对应的一个 rabbit_queue_index。
2消息存储rabbit_msg_store
rabbit_msg_store 以键值对的形式存储消息它被所有队列共享在每个节点中有且只有一个。 从技术层面上来说rabbit_msg_store 具体还可以分为
1 msg_store_persistent负责持久化消息的持久化重启后消息不会丢失2 msg_store_transient负责非持久化消息的持久化重启后消息会丢失。
1. 消息存储
消息包括消息体、属性和 headers可以直接存储在 rabbit_queue_index 中也可以被保存在 rabbit_msg_store 中。默认在 $RABBITMQ_HOME/var/lib/mnesia/rabbit$HOSTNAME/ 路径下包含 queues、msg_store_persistent、msg_store_transient 这 3 个文件夹其分别存储对应的信息不同版本目录位置有所不同 最佳的配备是较小的消息存储在 rabbit_queue_index 中而较大的消息存储在 rabbit_msg_store 中。这个消息大小的界定可以通过 queue_index_embed_msgs_below 来配置默认大小为 4096单位为 B。注意这里的消息大小是指消息体、属性及 headers 整体的大小。当一个消息小于设定的大小阈值时就可以存储在 rabbit_queue_index 中这样可以得到性能上的优化。
rabbit_queue_index 中以顺序文件名从 0 开始累加的段文件来进行存储后缀为“.idx”每个段文件中包含固定的 SEGMENT_ENTRY_COUNT 条记录SEGMENT_ENTRY_COUNT 默认值为16384。每个 rabbit_queue_index 从磁盘中读取消息的时候至少要在内存中维护一个段文件所以设置 queue_index_embed_msgs_below 值的时候要格外谨慎一点点增大也可能会引起内存爆炸式的增长。
经过 rabbit_msg_store 处理的所有消息都会以追加的方式写入到文件中当一个文件的大小超过指定的限制file_size_limit后关闭这个文件再创建一个新的文件以供新的消息写入。文件名文件后缀是“.rdq”从 0 开始进行累加因此文件名最小的文件也是最老的文件。在进行消息的存储时RabbitMQ 会在 ETSErlang Term Storage表中记录消息在文件中的位置映射Index和文件的相关信息FileSummary。
2. 消息读取
在读取消息的时候先根据消息的IDmsg_id找到对应存储的文件如果文件存在并且未被锁住则直接打开文件从指定位置读取消息的内容。如果文件不存在或者被锁住了则发送请求由 rabbit_msg_store 进行处理。
3. 消息删除
消息的删除只是从 ETS 表删除指定消息的相关信息同时更新消息对应的存储文件的相关信息。执行消息删除操作时并不立即对在文件中的消息进行删除也就是说消息依然在文件中仅仅是标记为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并在一个文件中并且所有的垃圾数据的大小和所有文件至少有 3 个文件存在的情况下的数据大小的比值超过设置的阈值 GARBAGE_FRACTION默认值为 0.5时才会触发垃圾回收将两个文件合并。
执行合并的两个文件一定是逻辑上相邻的两个文件。执行合并时首先锁定这两个文件并先对前面文件中的有效数据进行整理再将后面文件的有效数据写入到前面的文件同时更新消息在 ETS 表中的记录最后删除后面的文件。
二、队列结构实现
1. 队列结构
通常队列由两部分组成 1rabbit_amqqueue_processrabbit_amqqueue_process 负责协议相关的消息处理即接收生产者发布的消息、向消费者交付消息、处理消息的确认包括生产端的 confirm 和消费端的 ack等。 2backing_queue
backing_queue 是消息存储的具体形式和引擎并向 rabbit_amqqueue_process 提供相关的接口以供调用。
如果消息投递的目的队列是空的并且有消费者订阅了这个队列那么该消息会直接发送给消费者不会经过队列这一步。而当消息无法直接投递给消费者时需要暂时将消息存入队列以便重新投递。
消息存入队列后不是固定不变的它会随着系统的负载在队列中不断地流动消息的状态会不断发生变化。RabbitMQ 中的队列消息可能会处于以下4种状态 1alpha消息内容包括消息体、属性和 headers和消息索引都存储在内存中。 2beta消息内容保存在磁盘中消息索引保存在内存中。 3gamma消息内容保存在磁盘中消息索引在磁盘和内存中都有。 4delta消息内容和索引都在磁盘中。
对于持久化的消息消息内容和消息索引都必须先保存在磁盘上才会处于上述状态中的一种。而 gamma 状态的消息是只有持久化的消息才会有的状态。
RabbitMQ 在运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最大消息数量target_ram_count如果 alpha 状态的消息数量大于此值时就会引起消息的状态转换多余的消息可能会转换到 beta 状态、gamma 状态或者 delta 状态。区分这 4 种状态的主要作用是满足不同的内存和 CPU 需求。alpha 状态最耗内存但很少消耗 CPU。delta 状态基本不消耗内存但是需要消耗更多的 CPU 和磁盘 I/O 操作。delta 状态需要执行两次 I/O 操作才能读取到消息一次是读消息索引从 rabbit_queue_index 中一次是读消息内容从 rabbit_msg_store 中beta和 gamma 状态都只需要一次 I/O 操作就可以读取到消息从 rabbit_msg_store 中。
对于普通的没有设置优先级和镜像的队列来说backing_queue 的默认实现是 rabbit_variable_queue其内部通过 5 个子队列 Q1、Q2、Delta、Q3 和 Q4 来体现消息的各个状态。整个队列包括 rabbit_amqqueue_process 和 backing_queue 的各个子队列
其中 Q1、Q4 只包含 alpha 状态的消息Q2 和 Q3 包含 beta 和 gamma 状态的消息Delta 只包含 delta 状态的消息。一般情况下消息按照 Q1→Q2→Delta→Q3→Q4 这样的顺序步骤进行流动但并不是每一条消息都一定会经历所有的状态这取决于当前系统的负载状况。
2. 消息状态流转
从 Q1 至 Q4 基本经历内存到磁盘再由磁盘到内存这样的一个过程如此可以在队列负载很高的情况下能够通过将一部分消息由磁盘保存来节省内存空间而在负载降低的时候这部分消息又渐渐回到内存被消费者获取使得整个队列具有很好的弹性。消费者获取消息也会引起消息的状态转换。当消费者获取消息时首先会从 Q4 中获取消息如果获取成功则返回。如果 Q4 为空则尝试从 Q3 中获取消息系统首先会判断 Q3 是否为空如果为空则返回队列为空即此时队列中无消息。如果 Q3 不为空则取出 Q3 中的消息进而再判断此时 Q3 和 Delta 中的长度如果都为空则可以认为 Q2、Delta、Q3、Q4 全部为空此时将 Q1 中的消息直接转移至 Q4下次直接从 Q4 中获取消息。如果 Q3 为空Delta 不为空则将 Delta 的消息转移至 Q3 中下次可以直接从 Q3 中获取消息。在将消息从 Delta 转移到 Q3 的过程中是按照索引分段读取的首先读取某一段然后判断读取的消息的个数与 Delta 中消息的个数是否相等如果相等则可以判定此时 Delta 中已无消息则直接将 Q2 和刚读取到的消息一并放入到 Q3 中如果不相等仅将此次读取到的消息转移到 Q3。消息数据大致流向如下 即 Q1 消息最终会流向 Q4Q2、Delta 消息最终会流向 Q3消费者优先从 Q4 中读取消息若 Q4 为空再从 Q3 中读取若 Q3、Q4 都为空则可认为当前消息队列为空。
通常在负载正常时如果消息被消费的速度不小于接收新消息的速度对于不需要保证可靠不丢失的消息来说极有可能只会处于 alpha 状态。对于 durable 属性设置为 true 的消息它一定会进入 gamma 状态并且在开启 publisher confirm 机制时只有到了 gamma 状态时才会确认该消息已被接收若消息消费速度足够快、内存也充足这些消息也不会继续走到下一个状态。
当系统负载较高消息堆积较多处理每个消息的平均开销增大时可以有以下 3 种措施进行应对 1增加 prefetch_count 的值即一次发送多条消息给消费者加快消息被消费的速度 2采用 multiple ack降低处理 ack 带来的开销 3流量控制。
三、惰性队列
RabbitMQ 从 3.6.0 版本开始引入了惰性队列Lazy Queue的概念。惰性队列会尽可能地将消息存入磁盘中而在消费者消费到相应的消息时才会被加载到内存中它的一个重要的设计目标是能够支持更长的队列即支持更多的消息存储。当消费者由于各种各样的原因比如消费者下线、宕机或者由于维护而关闭等致使长时间内不能消费消息而造成堆积时惰性队列就很有必要了。
默认情况下当生产者将消息发送到 RabbitMQ 的时候队列中的消息会尽可能地存储在内存之中这样可以更加快速地将消息发送给消费者。即使是持久化的消息在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候会将内存中的消息换页至磁盘中这个操作会耗费较长的时间也会阻塞队列的操作进而无法接收新的消息。
惰性队列会将接收到的消息直接存入文件系统中而不管是持久化的或者是非持久化的这样可以减少了内存的消耗但是会增加 I/O 的使用如果消息是持久化的那么这样的 I/O 操作不可避免惰性队列和持久化的消息可谓是“最佳拍档”。注意如果惰性队列中存储的是非持久化的消息内存的使用率会一直很稳定但是重启之后消息一样会丢失。惰性队列和普通队列相比只有很小的内存开销。
队列具备两种模式default 和 lazy。lazy 模式即为惰性队列的模式可以通过调用 channel.queueDeclare 方法的时候在参数 x-queue-mode 中设置也可以通过 Policy 的方式设置如果一个队列同时使用这两种方式设置那么 Policy 的方式具备更高的优先级。 发送消息时惰性队列性能比普通队列好出现性能偏差的原因是普通队列会由于内存不足而不得不将消息换页至磁盘。如果有消费者消费时惰性队列会耗费将近 40MB 的空间来发送消息。
如果要将普通队列转变为惰性队列那么我们需要忍受同样的性能损耗首先需要将缓存中的消息换页至磁盘中然后才能接收新的消息。反之当将一个惰性队列转变为普通队列的时候和恢复一个队列执行同样的操作会将磁盘中的消息批量地导入到内存中。