瀑布流 网站 php 源码,wordpress文字排版,西安建设市场诚信信息平台网站,为什么不建议学嵌入式目录 前言 1. 创建BrokerServer类 1.1 启动服务器 1.2 停止服务器 1.3 处理一个客户端的连接 1.3.1 解析请求得到Request对象 1.3.2 根据请求计算响应 1.3.3 将响应写回给客户端 1.3.4 遍历Session的哈希表,把断开的Socket对象的键值对进行删除 2. 处理订阅消息请求详解(补充) …目录 前言 1. 创建BrokerServer类 1.1 启动服务器 1.2 停止服务器 1.3 处理一个客户端的连接 1.3.1 解析请求得到Request对象 1.3.2 根据请求计算响应 1.3.3 将响应写回给客户端 1.3.4 遍历Session的哈希表,把断开的Socket对象的键值对进行删除 2. 处理订阅消息请求详解(补充) 3. 序列化/反序列化实现(补充) 结语 前言 上一章节,我们定义了本项目的应用层传输协议.并且创建了各种参数类.本章节的目标是对BrokerServer(实现一个TCP服务器)进行实现,对连接进行处理,根据请求计算响应返回给客户端. 1. 创建BrokerServer类 public class BrokerServer {// 当前考虑一个一个服务器中只有一个虚拟主机private VirtualHost virtualHost new VirtualHost(default);// 使用哈希表表示当前会话,也就是有哪些客户端在和服务器进行通信// key: channelId value:对应对的Socket对象private ConcurrentHashMapString, Socket sessions new ConcurrentHashMap();private ServerSocket serverSocket null;// 引入线程池来处理多个客户端private ExecutorService executorService null;// 控制服务器是否继续运行private volatile boolean runnable true;
}
1.1 启动服务器
1. 首先将线程池进行创建,用来处理多个连接.
2. 设置循环用来监听连接
3. 将处理连接交给线程池.
/*** 1. 启动服务器*/public void start() throws IOException {System.out.println([BrokerServer] 启动!);// newCachedThreadPool自动申请新的线程executorService Executors.newCachedThreadPool();try {while (runnable){Socket clientSocket serverSocket.accept();// 把处理连接的逻辑发送给线程池executorService.submit(()-{processConnection(clientSocket);});}}catch (SocketException e){System.out.println([BrokerServer] 服务器停止运行!);
// e.printStackTrace();}}
1.2 停止服务器
1. 将标志位runnable设置为false
2. 停止线程池的服务
3. 关闭服务器套接字
/*** 2. 停止服务器*/public void stop() throws IOException {runnable false;// 停止线程池executorService.shutdownNow();serverSocket.close();}1.3 处理一个客户端的连接
1. 我们是从请求中获取的信息是二进制文件,我们不能直接使用InputStream和OutputStream,我们借助DataInputStream和DataOutputStream进行操作字节流.
2. 使用DataInputStream进行读取请求的时候,读到末尾的时候会抛出一个异常,我们将这个异常视作为处理正确的业务逻辑.我么catch掉这个异常就可以.
3. 解析得到请求对象
4. 更具请求计算响应
5. 当处理完响应之后,要进行关闭连接,并且将一个连接中其他Channel进行关闭.
/*** 3. 处理一个客户端的连接* 在一个连接中会出现多个请求和多个响应.在一个连接中要循环的处理*/private void processConnection(Socket clientSocket) {try(InputStream inputStream clientSocket.getInputStream();OutputStream outputStream clientSocket.getOutputStream()){try(DataInputStream dataInputStream new DataInputStream(inputStream);DataOutputStream dataOutputStream new DataOutputStream(outputStream)){while (true){// 1. 读取并解析请求Request request readRequest(dataInputStream);// 2. 根据请求计算响应Response response process(request,clientSocket);// 3. 将响应写回客户端writeResponse(dataOutputStream, response);}}}catch (EOFException | SocketException e){// 处理正确的业务逻辑// 上述进行读取数据的时候,如果数据读到末尾(EOF) ,就会抛出一个异常// 借助这个异常结束上述循环System.out.println([BrokerServer] 连接关闭! 客户端地址: clientSocket.getInetAddress().toString() ,端口号: clientSocket.getPort());}catch (ClassNotFoundException | MqException e) {e.printStackTrace();} catch (IOException e) {// 处理真正的异常System.out.println([BrokerServer] connection 出现异常);e.printStackTrace();}finally {try {// 当连接处理完成之后,进行关闭连接clientSocket.close();// 一个连接中可能会包含多个channel,需要把当前这个Socket对应的所有channel进行关闭clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}
1.3.1 解析请求得到Request对象
1. 根据我们自定义的格式,先读前4个字节是请求的类型,在读4个字节是payload的长度,在读就是payload. 2. 读取payload的时候,我们先根据长度创建字符数组,然后按照字符数组进行获取payload,比较读取完的长度是否与原来请求的长度一致,不一致说明有消息的丢失.进行抛出异常.
3. 最后得到完整的请求对象,交给下面的方法进行处理. /*** 3.1 解析请求得到Request对象*/private Request readRequest(DataInputStream dataInputStream) throws IOException {Request request new Request();// 1. 首先读取四个字节,为请求的typerequest.setType(dataInputStream.readInt());// 2. 在读四个字节就是payload的长度request.setLength(dataInputStream.readInt());// 3. 创建字符数组,并进行读取到数组中byte[] payload new byte[request.getLength()];int n dataInputStream.read(payload);if (n ! request.getLength()){throw new IOException([BrokerServer] 请求格式出错);}// 4. 将读取的数组内容写入到实体的Request对象中request.setPayload(payload);return request;}
1.3.2 根据请求计算响应
1. 我们根据请求对象的payload进行解析,此处需要注意的是,我们读取到的payload是字节数组,我们需要进行反序列化成字符数组.
2. 根据请求对象的Type值进行区分,到底客户端要调用服务器那些功能.
3. 处理完请求之后就要进行构造响应了.
4. 返回响应对象
/*** 3.2 根据请求计算响应* param request* param clientSocket* return*/private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 根据request中的payload进行解析// payload 是根据 request 中 type 进行变化的BasicArguments basicArguments (BasicArguments) BinaryTool.fromBytes(request.getPayload());// 打印请求的信息System.out.println([Request] rid basicArguments.getRid() , channelId basicArguments.getChannelId() type request.getType() ,length request.getLength());// 2. 根据type的值,区分调用哪种功能boolean ok true;if (request.getType() 0X1){// 1. 创建一个channelsessions.put(basicArguments.getChannelId(),clientSocket);System.out.println([BrokerServer] 创建channel完成 getChannelId basicArguments.getChannelId());}else if (request.getType() 0x2){// 2. 销毁一个channelsessions.remove(basicArguments.getChannelId());System.out.println([BrokerServer] 销毁channel完成 getChannelId basicArguments.getChannelId());}else if (request.getType() 0x3){// 3. 创建交换机ExchangeDeclareArguments arguments (ExchangeDeclareArguments) basicArguments;// 调用虚拟主机的功能方法ok virtualHost.exchangeDeclare(arguments.getExchangeName(),arguments.getExchangeType(),arguments.isDurable(),arguments.isAutoDelete(),arguments.getArguments());System.out.println([BrokerServer] 创建交换机完成 ExchangeName arguments.getExchangeName());}else if (request.getType() 0x4){// 4. 销毁交换机ExchangeDeleteArguments arguments (ExchangeDeleteArguments) basicArguments;ok virtualHost.exchangeDelete(arguments.getExchangeName());System.out.println([BrokerServer] 删除交换机完成 ExchangeName arguments.getExchangeName());}else if (request.getType() 0x5) {// 5. 创建队列QueueDeclareArguments arguments (QueueDeclareArguments) basicArguments;ok virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(),arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments());System.out.println([BrokerServer] 创建队列完成 QueueName arguments.getQueueName());} else if (request.getType() 0x6) {// 6. 删除队列QueueDeleteArguments arguments (QueueDeleteArguments) basicArguments;ok virtualHost.queueDelete((arguments.getQueueName()));System.out.println([BrokerServer] 删除队列完成 QueueName arguments.getQueueName());} else if (request.getType() 0x7) {// 7. 创建绑定QueueBindArguments arguments (QueueBindArguments) basicArguments;ok virtualHost.queueBind(arguments.getQueueName(),arguments.getExchangeName(), arguments.getBindingKey());System.out.println([BrokerServer] 创建绑定完成 QueueName arguments.getQueueName() ,ExchangeName arguments.getExchangeName());} else if (request.getType() 0x8) {// 8. 删除绑定QueueUnbindArguments arguments (QueueUnbindArguments) basicArguments;ok virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName());System.out.println([BrokerServer] 删除绑定完成 QueueName arguments.getQueueName() ,ExchangeName arguments.getExchangeName());} else if (request.getType() 0x9) {// 9. 发布消息BasicPublishArguments arguments (BasicPublishArguments) basicArguments;ok virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(),arguments.getBasicProperties(), arguments.getBody());System.out.println([BrokerServer] 发布消息完成 ExchangeName arguments.getExchangeName());}else if (request.getType() 0xa) {// 10. 订阅消息BasicConsumeArguments arguments (BasicConsumeArguments) basicArguments;ok virtualHost.basicConsume(arguments.getConsumeTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws IOException, MqException {// 将服务器收到的消息进行推送给客户端// 先知道当前这个收到的消息, 要发给哪个客户端.// 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的// socket 对象了, 从而可以往里面发送数据了// 1. 根据 channelId 找到 socket 对象Socket clientSocket sessions.get(consumerTag);if (clientSocket null || clientSocket.isClosed()) {throw new MqException([BrokerServer] 订阅消息的客户端已经关闭!);}// 2. 构造响应数据SubScribeReturns subScribeReturns new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要.subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload BinaryTool.toBytes(subScribeReturns);Response response new Response();// 0xc 表示服务器给消费者客户端推送的消息数据.response.setType(0xc);// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把数据写回给客户端.// 注意! 此处的 dataOutputStream 这个对象不能 close !!!// 如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了.// 此时就无法继续往 socket 中写入后续数据了.DataOutputStream dataOutputStream new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});}else if (request.getType() 0xb) {// 10. 调用 basicAck 确认消息.BasicAckArguments arguments (BasicAckArguments) basicArguments;ok virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId());System.out.println([BrokerServer] 消费者确认消息完成 QueueName arguments.getQueueName() , MessageId arguments.getMessageId());} else {// 当前的 type 是非法的.throw new MqException([BrokerServer] 未知的 type! type request.getType());}// 3. 构造响应BasicReturns basicReturns new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload BinaryTool.toBytes(basicReturns);Response response new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);System.out.println([Response] rid basicReturns.getRid() , channelId basicReturns.getChannelId() , type response.getType() , length response.getLength());return response;}
1.3.3 将响应写回给客户端
1. 注意写入类型和长度是写入固定的4个字节,那么我们就使用dataOutputStream.writeInt()
2. 写完响应之后,记得要刷新缓冲区 dataOutputStream.flush();
/*** 3.3 将响应写回给客户端* param dataOutputStream* param response*/private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException {// 将响应的属性从计算好的响应中进行设置dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 刷新缓冲区dataOutputStream.flush();}
1.3.4 遍历Session的哈希表,把断开的Socket对象的键值对进行删除
由于Socket都已经断开连接了,那么存储在内存中的Session也就没有存在的必要了.这个集合中存放的是一个连接中的change对应的Session,当连接断开之后,Channel也就不会再进行工作了,新的连接会创建新的Channel.
注意:我们在使用Map.entrySet进行遍历Map的时候,不要一遍遍历一遍进行删除,这样是不稳定的,我们遍历Map将需要进行移除的Session进行添加到待删除的链表中,最后遍历待删除的数据结构进行删除. 上述就是整个封装好的BrokerServer服务器. 下面呢,我对有关根据请求计算响应中订阅消息这一功能,再进行详细的阐述,这块比较难以理解,因为涉及到回调函数,大家可能不知道这个回调函数掉用的时机是哪里.
2. 处理订阅消息请求详解(补充) 第二个红框部分是回调函数. 只有消费者订阅的队列中有消息了,并且轮询的方式选中了这个消费者,才会获得消息的本体,此时线程池才会执行到这个回调方法,此时才拿到消息的本体,可以将消息的属性和本体写入到SubscribeReturn中,进而推送给消费者进行消费消息.如果没有消息给这个消费者,那么也不会进行断开连接,只要服务器不断开连接客户端一直在等待分配的消息进行消费.这一点希望,读者能够进一步的理解.等总结完客户端,那么我就会带着大家,再来理一遍这个订阅消息的这个思路.
3. 序列化/反序列化实现(补充)
要想能进行序列化和反序列化就必须对目标对象进行实现serializable接口.
1. 我们使用ByteArrayOutputStream和ObjectOutputStream进行将一个对象序列化为字节数组(输出的是字节用output)
2. 我们ByteArrayInputStream和ObjectInputStream将一个字节数组反序列化成一个对象(输入的是字节用Input) 结语 至此,我们就彻底的完成了mqserver 的搭建,只剩下mqclient的搭建,我们在下一系列完成客户端的搭建,请持续关注,谢谢!!! 完整的项目代码已上传Gitee,欢迎大家访问.
模拟实现消息队列https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq