泰州营销型网站建设,百度识别图片找图,有没学做早餐的网站,好的网站推荐目录
引言
约定存储方式
消息序列化
重点理解
针对 MessageFileManager 单元测试
小结 统一硬盘操作 引言 问题#xff1a; 关于 Message#xff08;消息#xff09;为啥在硬盘上存储#xff1f; 回答#xff1a; 消息操作并不涉及到复杂的增删查改消…目录
引言
约定存储方式
消息序列化
重点理解
针对 MessageFileManager 单元测试
小结 统一硬盘操作 引言 问题 关于 Message消息为啥在硬盘上存储 回答 消息操作并不涉及到复杂的增删查改消息数量可能会非常多使用数据库的访问效率是并不高因此我们不使用数据库进行存储而是直接将消息存储到文件中 约定存储方式 此处设定了消息具体如何在文件中存储 约定一 消息依附于队列因此在存储时我们可以将消息按照 队列 纬度展开之前我们因为引入 SQLite 已经设置了一个 data 目录meta.db 就在该目录中所以我们可以在现有的 data 目录中存储一些子目录每个子目录对应一个队列 即 子目录名 就是 队列名 约定目录结构如上图所示文件 queue_data.txt 保存消息的内容文件 queue_stat.txt 保存消息的统计信息 // 约定消息文件所在的目录和文件名
// 这个方法用来获取到指定队列对应的消息文件所在路径private String getQueueDir(String queueName) {return ./data/ queueName;}// 这个方法用来获取该队列的消息数据文件路径
// 二进制文件使用 txt 作为后缀不太合适txt 一般表示文本此处我们也就不改了
// .bin / .datprivate String getQueueDataPath(String queueName) {return getQueueDir(queueName) /queue_data.txt;}// 这个方法用来获取该队列的消息统计文件路径private String getQueueStatPath(String queueName) {return getQueueDir(queueName) /queue_stat.txt;}// 创建队列对应的文件和目录public void createQueueFiles(String queueName) throws IOException{
// 1、先创建队列对应的消息目录File baseDir new File(getQueueDir(queueName));if(!baseDir.exists()) {
// 不存在就创建这个目录boolean ok baseDir.mkdirs();if(!ok) {throw new IOException(创建目录失败baseDir baseDir.getAbsolutePath());}}
// 2、创建队列数据文件File queueDataFile new File(getQueueDataPath(queueName));if(!queueDataFile.exists()){boolean ok queueDataFile.createNewFile();if(!ok) {throw new IOException(创建文件失败queueDataFile queueDataFile.getAbsolutePath());}}
// 3、创建消息统计文件File queueStatFile new File(getQueueStatPath(queueName));if(!queueStatFile.exists()){boolean ok queueStatFile.createNewFile();if(!ok) {throw new IOException(创建文件失败queueStatFile queueStatFile.getAbsolutePath());}}
// 4、给消息统计文件设定初始值 0\t0Stat stat new Stat();stat.totalCont 0;stat.validCount 0;writeStat(queueName,stat);}// 删除队列的目录和文件
// 队列也是可以被删除的当队列删除之后对应的消息文件啥的自然也要随之删除public void destroyQueueFiles(String queueName) throws IOException{
// 先删除里面的文件在删除目录File queueDataFile new File(getQueueDataPath(queueName));boolean ok1 queueDataFile.delete();File queueStatFile new File(getQueueStatPath(queueName));boolean ok2 queueStatFile.delete();File baseDir new File(getQueueDir(queueName));boolean ok3 baseDir.delete();if(!ok1 || !ok2 || !ok3) {
// 有任意一个删除失败都算整体删除throw new IOException(删除队列目录和文件失败 baseDir baseDir.getAbsolutePath());}}// 检查队列的目录和文件是否存在
// 比如后续有生产者给 broker server 生产消息了这个消息就可能需要记录到文件上取决于消息是否要持久化public boolean checkFilesExits(String queueName) {
// 判定队列的数据文件 和统计文件是否都存在File queueDataFile new File(getQueueDataPath(queueName));if(!queueDataFile.exists()) {return false;}File queueStatFile new File(getQueueStatPath(queueName));if(!queueStatFile.exists()) {return false;}return true;} 约定二 queue_data 是一个二进制格式的文件该文件中包含若干个消息每个消息均以二进制的方式存储 约定每个消息的组成部分如上图所示 消息序列化 序列化就是将一个对象结构化的数据转成一个 字符串 或 字节数组 注意点一 序列化完成之后对象的信息不丢失因此在后面进行反序列化操作时才能将序列化的 字符串 或 字节数组 重新转化成对象 注意点二 将对象序列化后更方便存储和传输存储一般存储在文件中文件只能存 字符串/二进制数据不能直接存对象传输通过网络传输 JSON 格式 在 Java 中Jackson 是一个流行的 JSON 处理库它提供了 ObjectMapper 类来处理 JSON 数据的序列化和反序列化 问题 Message 中存储的 body 部分为二进制数据可以用 JSON 进行序列化吗 回答 JSON 格式通常用于标识文本数据而无法直接存储二进制数据JSON 格式中包含一些特殊符号 : { } 如果直接存储二进制数据可能会受到这些特殊符号的影响导致 JSON 解析错误 具体理解 如果存 文本数据你的键值对中不会包含上述特殊符号如果存 二进制数据且万一某一二进制的字节正好就与上述特殊符号的 ASCII 一样此时便可能会引起 JSON 解析格式错误 解决方案A 针对二进制数据进行 Base64 编码将其转化为文本数据然后再存储在 JSON 格式中 注意点一 Base64 将每 3 个字节的二进制数据转换为 4 个文本字符从而确保所有字符都是文本字符避免了特殊符号的问题相当于是把二进制数据转成文本了比如在 HTML 中嵌入一个图片图片其本身为二进制数据此时便可以将图片的二进制 数据进行 Base64 编码然后便可以把图片直接以文本的形式嵌入到 HTML 中 注意点二 Base64 这种方案效率低且伴随有额外转码开销同时还会使数据变得更大 解决方案B 放弃使用 JSON 格式直接使用二进制的序列化方式针对 Message 对象进行序列化 注意点一 针对二进制序列化有很多种解决方案 Java 标准库提供了序列化的方案 ObjectInputStream 和 ObjectOutputStreamHessian protobufferthrift 注意点二 我们将直接使用 标准库自带的序列化方案该方案最大的好处就是 不必引入额外的依赖 import java.io.*;//下列的逻辑不仅仅是 Message其他的 Java 中的对象也是可以通过这样的逻辑进行序列化和反序列化的
//如果要想让这个对象能够序列化或者反序列化需要让这个类能够实现 Serializable 接口
public class BinaryTool {
// 把一个对象序列化成一个字节数组public static byte[] toBytes(Object object) throws IOException {
// 这个流对象相当于一个边长的字节数组
// 就可以把 object 序列化的数据给逐渐写入到 byteArrayOutputStream 中再同一转成 byte[]try (ByteArrayOutputStream byteArrayOutputStream new ByteArrayOutputStream()){try (ObjectOutputStream objectOutputStream new ObjectOutputStream(byteArrayOutputStream)){
// 此处的 writerObject 就会把对象进行序列化生成的二进制字节数据就会写入到
// ObjectOutputStream 中
// 由于 ObjectOutputStream 有关联到了 ByteArrayOutputStream最终结果就写入到 ByteArrayOutputStream 里了objectOutputStream.writeObject(object);}
// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据取出来转成 byte[]return byteArrayOutputStream.toByteArray();}}// 把一个数组反序列化成一个对象public static Object fromBytes(byte[] data) throws IOException,ClassNotFoundException{Object object null;try (ByteArrayInputStream byteArrayInputStream new ByteArrayInputStream(data)){try (ObjectInputStream objectInputStream new ObjectInputStream(byteArrayInputStream)){
// 此处的 readObject 就是从 data 这个 byte[] 中读取数据并进行反序列化object objectInputStream.readObject();}}return object;}
} 约定三 对于 Broker Server 来说消息既需要新增也需要删除 具体理解 生产者生产一个消息过来就得新增这个消息消费者把这个消息消费掉这个消息就得删除 注意 新增和删除对于内存来说好办~直接使用一些集合类即可但是在文件上就麻烦了新增消息可以直接将新消息追加到文件末尾删除消息不好搞 具体理解 文件可以视为是一个 顺序表 这样的结构如果直接删除中间元素就需要涉及到类似于 顺序表搬运 这样的操作效率是非常低的因此使用这种搬运的方式删除 是不合适的所以我们采取逻辑删除 约定一个 isValid 成员变量给 Message 如上图所示isValid 为 1 表示该条 Message 为有效消息isValid 为 0 表示该条 Message 为无效消息即已经被删除 // 这个方法用来把一个新的消息当到队列对应的文件中
// queue 表示要把消息写入的队列 message 则是要写的消息public void sendMessage(MSGQueue queue, Message message) throws MqException, IOException {
// 1、检查一下当前要写入的队列对应的文件是否存在if(!checkFilesExits(queue.getName())) {throw new MqException([MessageFileManager] 队列对应的文件不存在 queueName queue.getName());}
// 2、把 Message 对象进行序列化转成二进制的字节数组byte[] messageBinary BinaryTool.toBytes(message);synchronized (queue) {
// 3、先获取到当前的队列数据文件的长度用这个来计算出 Message 对象的 offsetBeg 和 offsetEnd
// 把新的 Message 数据写入到队列数据文件的末尾此时 Message 对象的 offsetBeg就是当前文件长度 4
// offsetEnd 就是当前文件长度 4 message 自身长度File queueDataFile new File(getQueueDataPath(queue.getName()));
// 通过这个方法 queueDataFile.length() 就能获取到文件到长度单位字节message.setOffsetBeg(queueDataFile.length() 4);message.setOffsetEnd(queueDataFile.length() 4 messageBinary.length);
// 4、写入消息到数据文件注意是追加写入到数据文件末尾try (OutputStream outputStream new FileOutputStream(queueDataFile,true)){
// 接下来要先写当前消息的长度占据 4 个字节的
// outputStream.write(messageBinary.length); 实际只写入 1 字节try (DataOutputStream dataOutputStream new DataOutputStream(outputStream)){dataOutputStream.writeInt(messageBinary.length);
// 写入消息本体dataOutputStream.write(messageBinary);}}
// 5、更新消息统计文件Stat stat readStat(queue.getName());stat.totalCont 1;stat.validCount 1;writeStat(queue.getName(),stat);}}// 这个是删除消息的方法
// 这里的删除是逻辑删除也就是把硬盘上存储的这个数据里面的那个 isValid 属性设置成 0
// 1、先把文件中的这一段数据读出来还原回 Message 对象
// 2、把 isValid 改成 0
// 3、把上述数据重新写回到文件
// 此处这个参数中的 message 对象必须包含有效的 offsetBeg 和 offsetEndpublic void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException {synchronized (queue) {try (RandomAccessFile randomAccessFile new RandomAccessFile(getQueueDataPath(queue.getName()),rw)){
// 1、线程文件中读取对应的 Message 数据byte[] bufferSrc new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);
// 2、把当前读出来的二进制数据转换成 Message 对象Message diskMessage (Message) BinaryTool.fromBytes(bufferSrc);
// 3、把 isValid 设置成无效diskMessage.setIsValid((byte) 0x0);
// 此处不需要给参数的这个 message 的 isValid 设为 0因为这个参数代表的是内存中管理的 Message 对象
// 而这个对象马上也要被从内存中销毁了
// 4、重新写入文件
// 虽然上面已经 seek 过了但是上面 seek 完了之后进行了读操作这一读就导致文件光标往后移动移动到下一个消息的位置了
// 因此要想让接下来的写入能够刚好写回到之前的位置就需要重新调整文件光标byte[] bufferDest BinaryTool.toBytes(diskMessage);randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(bufferDest);
// 通过上述这通折腾对于文件来说只是有一个字节发生改变而已了}
// 不要忘了更新统计文件把一个消息设为无效了此时有效消息个数就需要 -1Stat stat readStat(queue.getName());if(stat.validCount 0) {stat.validCount -1;}writeStat(queue.getName(),stat);}}// 使用这个方法从文件中读取出所有的消息内容加载到内存中(具体来说是放到一个链表里)
// 这个方法准备在程序启动的时候进行调用
// 这里使用一个 LinkedList主要目的是为了后续进行头删操作
// 这个方法的参数只是一个 queueName 而不是 MSGQueue 对象 因为这个方法无需加锁只使用 queueName 就够了
// 由于该方法是在程序启动时调用此时服务器还不能处理请求即不涉及多线程操作文件public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedListMessage messages new LinkedList();try (InputStream inputStream new FileInputStream(getQueueDataPath(queueName))){try (DataInputStream dataInputStream new DataInputStream(inputStream)){
// 这个变量记录当前文件光标long currentOffset 0;
// 一个文件包含了很多消息此处势必要循环读取while (true) {
// 1、读取当前消息的长度这里的 readInt() 可能会读到文件的末尾(EOF)
// readInt() 方法读到文件末尾会抛出一个 EOFException 异常这一点和之前的很多流对象不太一样int messageSize dataInputStream.readInt();
// 2、按照这个长度读取消息内容byte[] buffer new byte[messageSize];int actualSize dataInputStream.read(buffer);if (messageSize ! actualSize){
// 如果不匹配说明文件有问题格式错误了即文件剩余的空间不够throw new MqException([MessageFileManager] 格式文件错误 queueName queueName);}
// 3、把读到的二进制数据反序列化回 Message 对象Message message (Message) BinaryTool.fromBytes(buffer);
// 4、判定一下看看这个消息对象是不是无效对象if(message.getIsValid() ! 0x1) {
// 无效数据直接跳过continue;}
// 5、有效数据则需要把这个 Message 对象加入到链表中加入之前还需要填写 offsetBeg 和 offsetEnd
// 进行计算 offset 的时候需要知道当前文件光标的位置的 由于当下使用的 DataInputStream 并不方便直接获取到文件光标
// 因此就需要手动计算下文件光标message.setOffsetBeg(currentOffset 4);message.setOffsetEnd(currentOffset 4 messageSize);currentOffset (4 messageSize);messages.add(message);}}catch (EOFException e) {
// 这个 catch 并非真是处理 异常而是处理 正常 的业务逻辑文件读到末尾会被 readInt 抛出该异常
// 这个 catch 语句中也不需要做啥特殊的事情System.out.println([MessageFileManager] 恢复 Message 数据完成);}}return messages;}约定四 使用 逻辑删除 会衍生出一个问题随着时间的推移queue_data 消息文件可能会越来越大并且其中的无效消息也会随之增加针对这种情况就需考虑对当前队列对应的 queue_data 消息数据文件进行垃圾回收 注意 此处我们使用 复制算法针对 queue_data 消息数据文件中的无效消息进行回收直接遍历原有的消息数据文件将所有的有效消息拷贝到一个新文件中再把之前整个旧文件都删除复制算法 比较适用的前提是当前空间里有效消息不多且大部分都是无效数据 问题 究竟什么时候触发一次 GC 什么时候才知道当前有效消息不多无效消息很多呢 回答 约定当总消息数目超过 2000且有效消息数目低于总消息数目的 50%就触发一次 GC 约定五 约定四中的数字 2000 是为了避免 GC 的太频繁比如一共 4 个消息其中 2 个消息无效了就触发 GC属实没必要当然2000 和 50% 这两个数字均可根据实际场景进行灵活调整 注意 约定 queue_stat 这个文件来保存消息的统计信息该文件仅存一行数据 文本格式: 这一行里有两列第一列是 queue_data.txt 中总的消息的数目totalCont第二列是 queue_data.txt 中有效消息的数目validCount两者使用 \t 分割形如 2000\t500 那么此时就需要触发 GC // 此处定义一个内部类来表示该队列的统计信息
// 有限考虑使用 static静态内部类static public class Stat {
// 此处直接定义成 public, 就不再搞 get set 方法了
// 对于这样的简单的类就直接使用成员类似于 C 的结构体了public int totalCont;public int validCount;}private Stat readStat(String queueName) {
// 由于当前的消息统计文件是文本文件可以直接使用 Scanner 来读取文件内容Stat stat new Stat();try (InputStream inputStream new FileInputStream(getQueueStatPath(queueName))){Scanner scanner new Scanner(inputStream);stat.totalCont scanner.nextInt();stat.validCount scanner.nextInt();return stat;}catch (IOException e) {e.printStackTrace();}return null;}private void writeStat(String queueName,Stat stat) {
// 使用 PrintWrite 来写文件
// OutputStream 打开文件默认情况下会直接把原文件清空 此时相当于新的数据覆盖了旧的try (OutputStream outputStream new FileOutputStream(getQueueStatPath(queueName))){PrintWriter printWriter new PrintWriter(outputStream);printWriter.write(stat.totalCont \t stat.validCount);printWriter.flush();}catch (IOException e) {e.printStackTrace();}}// 检查当前是否要针对该队列的消息数据文件进行 GCpublic boolean checkGC(String queueName) {
// 判定是否要 GC是根据总消息数和有效消息数这两个值都是在 消息统计文件 中的Stat stat readStat(queueName);if(stat.totalCont 2000 (double)stat.validCount / (double) stat.totalCont 0.5){return true;}return false;}private String getQueueDataNewPath(String queueName) {return getQueueDir(queueName) /queue_data_new.txt;}// 通过这个方法真正执行消息数据文件的垃圾回收操作
// 使用复制算法来完成
// 创建一个新的文件名字就是 queue_data_new.txt
// 把之前消息数据文件中的有效消息都读取出来写到新的文件中
// 删除旧的文件再把新的文件改名回 queue_data.txtpublic void gc(MSGQueue queue) throws MqException, IOException, ClassNotFoundException {
// 进行 GC 的时候是针对消息数据文件进行大洗牌在这个过程中其他线程不能针对该队列的消息文件做任何修改synchronized (queue) {
// 由于 GC 操作可能比较耗时此时统计一下执行消耗的时间long gcBeg System.currentTimeMillis();// 1、创建一个新的文件File queueDataNewFile new File(getQueueDataNewPath(queue.getName()));if(queueDataNewFile.exists()) {
// 正常情况下这个文件不应该存在如果存在就是意外即上次 gc 了一半程序意外崩溃了throw new MqException([MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在 queueName queue.getName());}boolean ok queueDataNewFile.createNewFile();if(!ok) {throw new MqException([MessageFileManager] 创建文件失败 queueDataNewFile queueDataNewFile.getAbsolutePath());}// 2、从旧文件中读取出所有的有效消息对象了 (这个逻辑直接调用上述方法即可不必重新写了)LinkedListMessage messages loadAllMessageFromQueue(queue.getName());// 3、把有效消息写入到新的文件中try (OutputStream outputStream new FileOutputStream(queueDataNewFile)){try (DataOutputStream dataOutputStream new DataOutputStream(outputStream)){for (Message message : messages) {byte[] buffer BinaryTool.toBytes(message);
// 先写四个字节消息的长度dataOutputStream.writeInt(buffer.length);dataOutputStream.write(buffer);}}}
// 4、删除旧的数据文件并且把新的文件进行重命名File queueDataOldFile new File(getQueueDataPath(queue.getName()));ok queueDataOldFile.delete();if(!ok) {throw new MqException([MessageFileManager] 删除旧的数据文件失败 queueDataOldFile queueDataOldFile.getAbsolutePath());}
// 把 queue_data_new.txt queue_data.txtok queueDataNewFile.renameTo(queueDataOldFile);if(!ok) {throw new MqException([MessageFileManager] 文件重命名失败 queueDataNewFile queueDataNewFile.getAbsolutePath() ,queueDataOldFile queueDataOldFile.getAbsolutePath());}
// 5、更新统计文件Stat stat readStat(queue.getName());stat.totalCont messages.size();stat.validCount messages.size();writeStat(queue.getName(), stat);long gcEnd System.currentTimeMillis();System.out.println([MessageFileManager] gc 执行完毕 queueName queue.getName() ,time (gcEnd - gcBeg) ms);}} 引入问题 如果某个队列中消息特别多而且这些都是有效消息此时便会导致整个 queue_data 消息数据文件变得特别大后续针对这个文件的各种操作其成本就会上升很多假设某文件的大小为 10G此时如果触发一次 GC其整体的耗时就会非常高了 解决方案 对于 RabbitMQ 来说其解决方案是把一个大文件拆分成若干个小文件文件拆分当单个文件长度达到一定阈值后便会拆分成两个文件拆着拆着就成了很多文件文件合并每个单独的文件都会进行 GC如果 GC 之后发现文件变小了很多就可能会和相邻的其他文件合并通过上述方式便可在消息特别多时同时保证性能上的及时响应 注意 这一块的逻辑还比较复杂~ 此处我们仅考虑单个文件的情况 实现该机制的大致思路 需要专门的数据结构来存储当前队列中有多少个数据文件每个文件大小是多少消息数目是多少无效消息是多少设计策略什么时候触发文件的拆分什么时候触发文件的合并 重点理解 理解一 此处的 serialVersionUID 用于验证版本 注意 在实际开发中代码是不断修改更新的 具体理解 有一个 Message 且对该 Message 进行序列化并将序列化的结果存储到对对应的 queue_data.txt 文件中如果在这期间该 Message 里的东西更新了但还未重新序列化更新如果此时想要进行反序列化操作时那么拿到的将是一个旧版本的 Message所以我们通过设置一个 serialVersionUID 来验证代码是否与序列化的数据相互匹配如果不匹配就不允许反序列化直接报错提醒从程序员数据有问题 理解二 此处我们需要往对应的 queue_data.txt 文件中先写入 Message 消息的长度上文我们已经约定好 Message 消息的长度为 4个字节所以此处需写 4个字节的数据 注意 如上图所示虽然这个 write 方法的参数为 int 类型但是实际上只能写 1个字节在流对象中经常会涉及到使用 int 表示 byte 的情况 问题 是否可以将 int 的 4个字节分别取出来然后一个一个字节的写入文件呢 回答 通过位运算即可将每个字节按照位运算的方式取出来再按照字节写入到文件中~ 小总结 上述这种方式固然可以但是还是比较繁琐的Java 标准库已经提供了现成的类即已经帮我们封装好了上述操作DataOutputStream / DataInputStream 理解三 当两个线程同时写 同一个队列对应的 queue_data.txt 文件时可能存在线程安全问题 当两个线程同时写 同一个队列对应的 queue_stat.txt 文件时也可能存在线程安全问题与经典线程安全问题的 count 相类似 所以我们需要对上述代码操作加锁 问题 此处的锁对象是什么即需要写到 synchronized () 里的对象是什么 回答 当前以 队列对象 进行加锁即可如果两个线程是往同一个队列中写消息此时需要阻塞等待如果两个线程往不同队列中写消息此时不需要阻塞等待不同队列对应不同的文件各写各的不会产生冲突 注意 这个代码在编写时IDEA 会给一个警告当前的加锁是针对方法的参数加锁的IDEA 分析不出来这个方法的实参究竟会传啥IDEA 不确认你这个加锁是否能真的达到预期效果后续调用这个方法传入的 queue 对象是后续通过内存管理的 queue 对象总而言之上述写法必须是 两个线程针对同一个 queue 对象进行加锁才能有效 理解四 之前用过的 FileInputStream 和 FileOutputStream 都是从文件头进行读写的而此处我们想要删除 queue_data.txt 中的某条消息所以需要能够在 queue_data.txt 文件中的指定位置进行读写操作即针对文件进行随机访问 解决方案 此处我们用到的类为 RandomAccessFileread 方法用来读write 方法用来写seek 方法用来调整当前文件光标即当前要读写文件的位置 注意点一 seek 方法虽然可以使使文件光标移动 但是使用 read 和 write 方法也会引起光标移动 注意点二 内存就支持随机访问内存的随机访问访问内存的任意一个地址其开销成本都差不多典型的例子为 数组取下标操作的时间复杂度为 O(1)硬盘也能支持随机访问即上述文件光标的移动但是硬盘的随机访问其成本/开销比内存是要高很多的(尤其机器硬盘) 理解五 此处红框中的 Message 对象是在内存中管理的 消息对象刚才从硬盘上读出来的 diskMessage这是硬盘上管理的消息对象 问题 什么时候调用我们刚刚写的这个删除硬盘上的消息对象的操作方法呢 回答 显然是确实要删除这个消息即消费者已经将该消息正确处理完便可删除这个删除就是把内存的 Message 对象 和 硬盘的 Message 对象都删除而我们此处的 deleteMessage 方法仅用来逻辑删除 硬盘中的 Message 对象isValid 属性只是用来在文件中标识这个消息有效这样的作用的相较于删除内存中的 Message 对象删除内存中的 Message 对象要容易很多 针对 MessageFileManager 单元测试 编写测试用例代码是十分重要的 package com.example.demo;import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.MSGQueue;
import com.example.demo.mqserver.core.Message;
import com.example.demo.mqserver.datacenter.MessageFileManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;SpringBootTest
public class MessageFileManagerTest {private MessageFileManager messageFileManager new MessageFileManager();private static final String queueName1 testQueue1;private static final String queueName2 testQueue2;// 这个方法是每个用例执行之前的准备工作BeforeEachpublic void setUp() throws IOException {
// 准备阶段创建出两个队列以后备用messageFileManager.createQueueFiles(queueName1);messageFileManager.createQueueFiles(queueName2);}// 这个方法就是每个用例执行完毕之后的收尾工作AfterEachpublic void tearDown() throws IOException {
// 收尾阶段就把刚才的队列给干掉messageFileManager.destroyQueueFiles(queueName1);messageFileManager.destroyQueueFiles(queueName2);}Testpublic void testCreateFiles() {
// 创建队列文件已经在上面 setUp 阶段执行过了此处主要是验证看看文件是否存在File queueDataFile1 new File(./data/ queueName1 /queue_data.txt);Assertions.assertEquals(true,queueDataFile1.isFile());File queueStatFile1 new File(./data/ queueName1 /queue_stat.txt);Assertions.assertEquals(true,queueStatFile1.isFile());File queueDataFile2 new File(./data/ queueName2 /queue_data.txt);Assertions.assertEquals(true,queueDataFile2.isFile());File queueStatFile2 new File(./data/ queueName2 /queue_stat.txt);Assertions.assertEquals(true,queueStatFile2.isFile());}Testpublic void testReadWriteStat() {MessageFileManager.Stat stat new MessageFileManager.Stat();stat.totalCont 100;stat.validCount 50;
// 此处就需要使用反射的方式来调用 writeStat 和 readStat 了
// Java 原生的反射 API 其实非常难用
// 此处使用 Spring 帮我们封装好的 反射 的工具类ReflectionTestUtils.invokeMethod(messageFileManager,writeStat,queueName1,stat);// 写入完毕之后再调用一下读取验证读取的结果和写入的数据是一致的MessageFileManager.Stat newStat ReflectionTestUtils.invokeMethod(messageFileManager,readStat,queueName1);Assertions.assertEquals(100,newStat.totalCont);Assertions.assertEquals(50,newStat.validCount);}private MSGQueue createTestQueue(String queueName) {MSGQueue queue new MSGQueue();queue.setName(queueName);queue.setDurable(true);queue.setAutoDelete(false);queue.setExclusive(false);return queue;}private Message createTestMessage(String content) {Message message Message.createMessageWithId(testRoutingKey,null,content.getBytes());return message;}Testpublic void testSendMessage() throws IOException, MqException, ClassNotFoundException {
// 构造出消息并且构造出队列Message message createTestMessage(testMessage);
// 此处创建的 queue 对象的 name不能随便写只能用 queueName1 和 queueName2需要保证这个队列对象对应的目录和文件啥的都存在才行MSGQueue queue createTestQueue(queueName1);// 调用发送消息方法messageFileManager.sendMessage(queue,message);// 检查 stat 文件MessageFileManager.Stat stat ReflectionTestUtils.invokeMethod(messageFileManager,readStat,queueName1);Assertions.assertEquals(1,stat.totalCont);Assertions.assertEquals(1,stat.validCount);// 检查 data 文件LinkedListMessage messages messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(1,messages.size());Message curMessage messages.get(0);Assertions.assertEquals(message.getMessageId(),curMessage.getMessageId());Assertions.assertEquals(message.getRoutingKey(),curMessage.getRoutingKey());Assertions.assertEquals(message.getDeliverMode(),curMessage.getDeliverMode());
// 比较两个字节数组的内容是否相同不能直接使用 assertEquals 了Assertions.assertArrayEquals(message.getBody(),curMessage.getBody());System.out.println(message : curMessage);}Testpublic void testLoadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {
// 往队列中插入 100 条消息然后验证看看这 100 条消息从文件中读取之后是否和最初是一致的MSGQueue queue createTestQueue(queueName1);ListMessage expectedMessages new LinkedList();for (int i 0; i 100; i) {Message message createTestMessage(testMessage i);messageFileManager.sendMessage(queue,message);expectedMessages.add(message);}// 读取所有消息LinkedListMessage actualMessages messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(expectedMessages.size(),actualMessages.size());for (int i 0; i expectedMessages.size(); i) {Message expectedMessage expectedMessages.get(i);Message actualMessage actualMessages.get(i);System.out.println([ i ] actualMessage actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(),actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());}}Testpublic void testDeleteMessage() throws IOException, MqException, ClassNotFoundException {
// 创建队列写入 10 个消息删除其中的几个消息再把所有消息读取出来判定是否符合预期MSGQueue queue createTestQueue(queueName1);ListMessage expectedMessages new LinkedList();for (int i 0; i 10; i) {Message message createTestMessage(testMessage i);messageFileManager.sendMessage(queue,message);expectedMessages.add(message);}// 删除其中的三个消息messageFileManager.deleteMessage(queue,expectedMessages.get(7));messageFileManager.deleteMessage(queue,expectedMessages.get(8));messageFileManager.deleteMessage(queue,expectedMessages.get(9));// 对比这里的内容是否正确LinkedListMessage actualMessages messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(7,actualMessages.size());for (int i 0; i actualMessages.size(); i) {Message expectedMessage expectedMessages.get(i);Message actualMessage actualMessages.get(i);System.out.println([ i ] actualMessage actualMessage);Assertions.assertEquals(expectedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(),actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());}}Testpublic void testGC() throws IOException, MqException, ClassNotFoundException {
// 先往队列中写 100 个消息
// 再把 100 个消息中的一半都给删除掉(比如把下标为偶数的消息都删除)
// 再手动调用 gc 方法检测得到的新的文件的大小是否比之前缩小了MSGQueue queue createTestQueue(queueName1);LinkedListMessage expectedMessages new LinkedList();for (int i 0; i 100; i) {Message message createTestMessage(testMessage i);messageFileManager.sendMessage(queue,message);expectedMessages.add(message);}
// 获取 gc 前的文件大小File beforeGCFile new File(./data/ queueName1 ./queue_data.txt);long beforeGCLength beforeGCFile.length();// 删除偶数下标的消息for (int i 0; i 100; i2) {messageFileManager.deleteMessage(queue,expectedMessages.get(i));}// 手动调用 gcmessageFileManager.gc(queue);// 重新读取文件验证新的文件的内容是不是和之前的内容匹配LinkedListMessage actualMessages messageFileManager.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50,actualMessages.size());for (int i 0; i actualMessages.size(); i) {
// 把之前消息偶数下标的删了剩下的就是奇数下标的元素了
// actual 中的 0 对应 expected 的 1
// actual 中的 1 对应 expected 的 3
// actual 中的 2 对应 expected 的 5
// actual 中的 i 对应 expected 的 2 * i 1Message expectedMessage expectedMessages.get(2 * i 1);Message actualMessage actualMessages.get(i);Assertions.assertEquals(expectedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertEquals(expectedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(expectedMessage.getDeliverMode(),actualMessage.getDeliverMode());Assertions.assertArrayEquals(expectedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());}
// 获取新的文件的大小File afterGCFile new File(./data/ queueName1 /queue_data.txt);long afterGCLength afterGCFile.length();System.out.println(before: beforeGCLength);System.out.println(after: afterGCLength);Assertions.assertTrue(beforeGCLength afterGCLength);}
}小结 MessageFileManager 类主要是负责管理消息在文件中的存储 设计了目录结构和文件格式实现了目录创建和删除实现了统计文件的读写实现了消息的写入实现了消息的删除 随机访问文件实现了加载所有消息垃圾回收 统一硬盘操作 此处我们创建一个 DiskDataCenter 类来管理所有硬盘上的数据 数据库交换机、绑定、队列数据文件消息 我们将这两个部分通过 DiskDataCenter 类整合在一起对上层提供统一的一套接口 package com.example.demo.mqserver.datacenter;import com.example.demo.common.MqException;
import com.example.demo.mqserver.core.Binding;
import com.example.demo.mqserver.core.Exchange;
import com.example.demo.mqserver.core.MSGQueue;
import com.example.demo.mqserver.core.Message;import java.io.IOException;
import java.util.LinkedList;
import java.util.List;/*
* 使用这个类来管理所有硬盘上的数据
* 1、数据库交换机、绑定、队列
* 2、数据文件消息
* 上层逻辑如果需要操作硬盘统一都通过这个类来使用 (上层代码不关系当前数据是存储在数据库还是文件中的)
* */
public class DiskDataCenter {
// 这个实例用来管理数据库中的数据private DataBaseManager dataBaseManager new DataBaseManager();
// 这个实例用来管理数据文件中的数据private MessageFileManager messageFileManager new MessageFileManager();public void init() {
// 针对上述两个实例进行初始化dataBaseManager.init();
// 当前 messageFileManager.init 是空的方法只是先列在这里一旦后续需要扩展就在这里进行初始化即可messageFileManager.init();}// 封装交换机操作public void insertExchange(Exchange exchange) {dataBaseManager.insertExchange(exchange);}public void deleteExchange(String exchangeName) {dataBaseManager.deleteExchange(exchangeName);}public ListExchange selectAllExchanges() {return dataBaseManager.selectAllExchanges();}// 封装队列操作public void insertQueue(MSGQueue queue) throws IOException {dataBaseManager.insertQueue(queue);
// 创建队列的同时不仅仅是把队列对象写到数据库中还需要创建出对应的目录和文件messageFileManager.createQueueFiles(queue.getName());}public void deleteQueue(String queueName) throws IOException {dataBaseManager.deleteQueue(queueName);
// 删除队列的同时不仅仅是把队列从数据库删除还需要删除对应的目录和文件messageFileManager.destroyQueueFiles(queueName);}public ListMSGQueue selectAllQueues() {return dataBaseManager.selectAllQueues();}// 封装绑定操作public void insertBinding(Binding binding) {dataBaseManager.insertBinding(binding);}public void deleteBinding(Binding binding) {dataBaseManager.deleteBinding(binding);}public ListBinding selectAllBindings() {return dataBaseManager.selectAllBindings();}// 封装消息操作public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException {messageFileManager.sendMessage(queue,message);}public void deleteMessage(MSGQueue queue,Message message) throws IOException, ClassNotFoundException, MqException {messageFileManager.deleteMessage(queue,message);if(messageFileManager.checkGC(queue.getName())) {messageFileManager.gc(queue);}}public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManager.loadAllMessageFromQueue(queueName);}
}