江苏城乡建设网站,如何处理并发量大的购物网站,微信seo什么意思,我要买房网系列文章目录
Springboot项目集成Netty组件 Netty新增解析数据包指定长度功能 文章目录系列文章目录前言一、Netty是什么#xff1f;二、使用步骤1. 项目引入依赖1.1 项目基础版本信息#xff1a;1.2 Netty依赖2. 项目配置2.1 在 yml 配置文件中配置以下#xff1a;2.2 创建…系列文章目录
Springboot项目集成Netty组件 Netty新增解析数据包指定长度功能 文章目录系列文章目录前言一、Netty是什么二、使用步骤1. 项目引入依赖1.1 项目基础版本信息1.2 Netty依赖2. 项目配置2.1 在 yml 配置文件中配置以下2.2 创建Netty配置类3. Netty 服务端3.1 Netty 服务端3.2 Netty channle注册类3.3 Netty 协议初始化解码器3.4 Netty 全局通道池3.5 WebSocket 协议通道数据处理3.5.1 WebSocket 处理URI参数3.5.2 WebSocket 数据处理类3.6 Socket 协议通道数据处理3.7 Netty 公共工具类3.8 Netty 启动类总结前言
项目中有时候会需与其他客户端或者系统建立Sokcet连接 或者WebSocket连接,满足业务需求。Netty 是一个利用 Java 的高级网络的能力隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。我们可以便捷的利用此组件搭建自己的客户端/服务器框架进行二次开发满足自己的业务需求。
本文介绍了Springboot集成Netty的配置以及使用方式项目是基于ruoyi分离版作为基础项目进行二次开发如果需要完整项目可以私聊我如果有时间会整理出来。 一、Netty是什么
Netty是由JBOSS提供的一个java开源框架现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具用以快速开发高性能、高可靠性的网络服务器和客户端程序。 也就是说Netty 是一个基于NIO的客户、服务器端编程框架使用Netty 可以确保你快速和简单的开发出一个网络应用例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程例如基于TCP和UDP的socket服务开发。 “快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议包括FTP、SMTP、HTTP等各种二进制文本协议的实现经验并经过相当精心设计的项目。最终Netty 成功的找到了一种方式在保证易于开发的同时还保证了其应用的性能稳定性和伸缩性。
简单说 Netty是一个高性能、高可靠性的基于NIO封装的网络应用框架
二、使用步骤
1. 项目引入依赖
1.1 项目基础版本信息
Java 版本1.8 SpringBoot 版本 2.2.13.RELEASE
1.2 Netty依赖 dependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactIdversion4.1.68.Final/version/dependency2. 项目配置
2.1 在 yml 配置文件中配置以下
# netty-server 配置
netty:# 监听端口port: 8180# 队列池 - 最大队列数 200queue-max: 200# 线程队列容量 10pool-queue-init: 10# netty 请求路径path: /ws2.2 创建Netty配置类
Data
Component
//application.yml此文件在ruoyi-admin模块中
PropertySource(nameNettyProperties,value {classpath:application.yml},ignoreResourceNotFoundfalse, encodingUTF-8)
ConfigurationProperties(prefix netty)
public class NettyProperties {private Integer port;private Integer queueMax;private String path;
}3. Netty 服务端
3.1 Netty 服务端
import com.ruoyi.entrust.constant.NettyProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** program: ruoyi* Author LiuZedi* Date 2021/12/26 18:18*/
Slf4j
Component
public class NettySocketServer {Autowiredprivate NettyProperties properties;public void start() throws Exception {// Boss线程由这个线程池提供的线程是boss种类的用于创建、连接、绑定socket 有点像门卫然后把这些socket传给worker线程池。// 在服务器端每个监听的socket都有一个boss线程来处理。在客户端只有一个boss线程来处理所有的socket。EventLoopGroup bossGroup new NioEventLoopGroup();// Worker线程Worker线程执行所有的异步I/O即处理操作EventLoopGroup workgroup new NioEventLoopGroup();try {// ServerBootstrap 启动NIO服务的辅助启动类,负责初始话netty服务器并且开始监听端口的socket请求ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.group(bossGroup, workgroup)// 设置非阻塞,用它来建立新accept的连接,用于构造serversocketchannel的工厂类.channel(NioServerSocketChannel.class)// ServerChannelInitializer 对出入的数据进行的业务操作,其继承ChannelInitializer.childHandler(new ServerChannelInitializer())//设置队列大小.option(ChannelOption.SO_BACKLOG, properties.getQueueMax())// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文.childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture channelFuture serverBootstrap.bind(properties.getPort()).sync();log.info(Netty服务器开启等待客户端连接, 监听端口:{}, properties.getPort());channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {// 关闭主线程组bossGroup.shutdownGracefully();// 关闭工作线程workgroup.shutdownGracefully();}}
}
3.2 Netty channle注册类
对出入的数据进行的业务操作进行初始化
目前通道支持连接协议 WebSocket和 Socket
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;/*** program: ruoyi* Author LiuZedi* Date 2021/12/26 18:32** channle注册类*/
public class ServerChannelInitializer extends ChannelInitializerSocketChannel {Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline ch.pipeline();//添加日志pipeline.addLast(logging,new LoggingHandler(LogLevel.WARN));// 判断 连接类型 此处 用来支持多种连接 目前支持 WebSocket 和 Socketpipeline.addLast(socketChoose,new SocketChooseHandler());}
}3.3 Netty 协议初始化解码器
此类用于 解析多种协议调用对应协议的处理器
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;import java.nio.ByteOrder;
import java.util.List;/*** program: ruoyi* Author LiuZedi* Date 2022/1/18 10:34*** 协议初始化解码器.* 用来判定实际使用什么协议.*/
Slf4j
public class SocketChooseHandler extends ByteToMessageDecoder {/** 默认暗号长度为23 */private static final int MAX_LENGTH 23;/** WebSocket握手的协议前缀 */private static final String WEBSOCKET_PREFIX GET /;Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception {String protocol getBufStart(in);in.resetReaderIndex();// 输出数据log.debug(获取的数据为: {},protocol);if (protocol.startsWith(WEBSOCKET_PREFIX)) {log.info(解析 webSocket);// websocket连接时执行以下处理// HttpServerCodec将请求和应答消息解码为HTTP消息ctx.pipeline().addLast(http-codec, new HttpServerCodec());// HttpObjectAggregator将HTTP消息的多个部分合成一条完整的HTTP消息ctx.pipeline().addLast(aggregator, new HttpObjectAggregator(65535));// ChunkedWriteHandler向客户端发送HTML5文件,文件过大会将内存撑爆ctx.pipeline().addLast(http-chunked, new ChunkedWriteHandler());ctx.pipeline().addLast(WebSocketAggregator, new WebSocketFrameAggregator(65535));// 若超过6000秒未收到约定心跳则主动断开channel释放资源ctx.pipeline().addLast(new IdleStateHandler(6000,0,0));// 用于处理websocket URI参数ctx.pipeline().addLast(new WebSocketURIHandler());//用于处理websocket, /ws为访问websocket时的urictx.pipeline().addLast(ProtocolHandler, new WebSocketServerProtocolHandler(/ws));ctx.pipeline().addLast(new WebSocketServerHandler());} else {log.info(解析 socket);// 常规TCP连接时执行以下处理ctx.pipeline().addLast(new IdleStateHandler(60,0,0));// 数据读取使用小端模式ctx.pipeline().addLast(length-decoder,new LengthFieldBasedFrameDecoder(ByteOrder.LITTLE_ENDIAN,Integer.MAX_VALUE, 0, 4, 0, 4,true));ctx.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));ctx.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));ctx.pipeline().addLast(new SocketServerHandler());}ctx.pipeline().remove(this.getClass());}private String getBufStart(ByteBuf in){int length in.readableBytes();if (length MAX_LENGTH) {length MAX_LENGTH;}// 标记读位置in.markReaderIndex();byte[] content new byte[length];in.readBytes(content);return new String(content);}
}在处理 Socket协议的处理中代码中使用了小端处理方案具体原因可见我的另外一篇博客Netty新增解析数据包指定长度功能
3.4 Netty 全局通道池
由于作为Netty 服务器端实际业务中会创建很多通道有可能查找指定已有通道并进行数据交互。因此开发此全局通道池业务中可以进行保存通道复用以及查询通道进行交互等功能。
以下注释是 本人实际项目中的注释理解后根据实际项目进行对应修改但是此类是通用的
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.springframework.util.StringUtils;
import java.util.List;
import java.util.stream.Collectors;/*** program: ruoyi* Author LiuZedi* Date 2022/1/15 16:07*/
public class GlobalChannelPool {// 临时存放连接客户端public static ChannelGroup temporaryChannels new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);// 存放已绑定用户信息连接客户端public static ChannelGroup channels new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 在集合中获取对应的通道** param module 模块名 exam 和 entrust 可根据实际业务进行修改* param protocol 连接类型 webSocket 和 socket 可根据实际业务进行修改* param keyName 数据类型 exam模块key是mac | entrust模块key是uid 可根据实际业务进行修改* param key 数据类型值 可根据实际业务进行修改* return*/public static ListChannel getChannelByKey(String module, String protocol ,String keyName, String key){if (StringUtils.isEmpty(module) StringUtils.isEmpty(protocol) StringUtils.isEmpty(keyName) StringUtils.isEmpty(key)){return null;}ListChannel channelList null;String moduleName module;String protocolName protocol;try{// 先过滤 模块 examination | detection | entrustAttributeKeyString moduleKey AttributeKey.valueOf(moduleName);channelList channels.stream().filter(channel - channel.attr(moduleKey).get().equals(module)).collect(Collectors.toList());// 过滤 socket类型 webSocket 和 socketAttributeKeyString protocolKey AttributeKey.valueOf(protocolName);channelList channelList.stream().filter(channel - channel.attr(protocolKey).get().equals(protocol)).collect(Collectors.toList());// 过滤 数据类型 examination模块 身份证读卡器 key是mac | detection模块 身份证读卡器 key是mac | entrust模块 签名功能 key是signatureAttributeKeyString dataKey AttributeKey.valueOf(keyName);channelList channelList.stream().filter(channel - channel.attr(dataKey).get().equals(key)).collect(Collectors.toList());}catch (Exception e){return null;}return channelList;}/*** 通道添加属性 区分通道** param channel 通道* param module 模块名 exam 和 entrust* param protocol 连接协议 webSocket 和 socket* param keyName 数据类型 exam模块key是mac | entrust模块key是uid* param key 数据类型值*/public static void addChannelAttrKey(Channel channel, String module, String protocol ,String keyName, String key){if (channel ! null){// 存储 模块信息 examination | detection | entrustAttributeKeyString moduleKey AttributeKey.valueOf(module);channel.attr(moduleKey).set(module);// 存储 连接协议 webSocket 和 socketAttributeKeyString protocolKey AttributeKey.valueOf(protocol);channel.attr(protocolKey).set(protocol);// 存储 数据类型 examination模块 身份证读卡器 key是mac | detection模块 身份证读卡器 key是mac | entrust模块 签名功能 key是signatureAttributeKeyString dataKey AttributeKey.valueOf(keyName);channel.attr(dataKey).set(key);}}
}3.5 WebSocket 协议通道数据处理
由于项目中WebSocket请求链接可以携带参数因此对于此参数需要提前处理才可以进一步进行处理WebSocket链接中交互的数据。
3.5.1 WebSocket 处理URI参数
此函数用于处理 WebSocket中参数
import com.ruoyi.entrust.constant.NettyProperties;
import com.ruoyi.examination.utils.SpringContextUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;/*** program: ruoyi* Author LiuZedi* Date 2022/1/19 14:53*/
Slf4j
public class WebSocketURIHandler extends SimpleChannelInboundHandlerWebSocketFrame {private NettyProperties nettyProperties;public WebSocketURIHandler(){nettyProperties (NettyProperties) SpringContextUtil.getBean(nettyProperties);}/*** 建立连接** channel 通道 action 活跃的 当客户端主动链接服务端的链接后这个通道就是活跃的了。* 也就是客户端与服务端建立了通信通道并且可以传输数据*/Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 把新连接保存到临时连接表中GlobalChannelPool.temporaryChannels.add(ctx.channel());log.info(客户端与服务端连接开启{},客户端名字{}, ctx.channel().remoteAddress().toString(),ctx.name());}/*** 断开连接** channel 通道 Inactive 不活跃的 当客户端主动断开服务端的链接后这个通道就是不活跃的。* 也就是说客户端与服务端关闭了通信通道并且不可以传输数据*/Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 从连接列表中移除该连接GlobalChannelPool.channels.remove(ctx.channel());log.info(客户端与服务端连接关闭{},客户端名字{}, ctx.channel().remoteAddress().toString(),ctx.name());}Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {FullHttpRequest request (FullHttpRequest) msg;String uri request.uri();String origin request.headers().get(Origin);log.info(uri:{},Origin:{},uri,origin);MapString, String params NettyUtil.getParams(uri);if (nettyProperties.getPath().equals(NettyUtil.getBasePath(uri))) {// 因为有可能携带了参数导致客户端一直无法返回握手包因此在校验通过后重置请求路径Channel channel ctx.channel();GlobalChannelPool.channels.add(channel);// 添加 AttributeKeyGlobalChannelPool.addChannelAttrKey(channel,params.get(module),webSocket,params.get(keyName),params.get(key));// 连接已绑定完成,把该链接移除临时存储区,避免积累过多,检索慢GlobalChannelPool.temporaryChannels.remove(channel);request.setUri(nettyProperties.getPath());log.info(webSocket处理完成重新生成路径!!!);}else {log.warn(webSocket连接不正确。请求链接URI是{},uri);ctx.close();}}super.channelRead(ctx, msg);}Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame msg) throws Exception {ctx.channel().writeAndFlush(msg.retain());}
}
3.5.2 WebSocket 数据处理类
package com.ruoyi.entrust.utils.netty;import io.netty.channel.*;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;/*** program: ruoyi* Author LiuZedi* Date 2022/1/18 14:41*/Slf4j
ChannelHandler.Sharable
public class WebSocketServerHandler extends SimpleChannelInboundHandlerTextWebSocketFrame {/*** 建立连接** channel 通道 action 活跃的 当客户端主动链接服务端的链接后这个通道就是活跃的了。* 也就是客户端与服务端建立了通信通道并且可以传输数据*/Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 把新连接保存到临时连接表中GlobalChannelPool.temporaryChannels.add(ctx.channel());log.info(客户端与服务端连接开启{},客户端名字{}, ctx.channel().remoteAddress().toString(),ctx.name());}/*** 断开连接** channel 通道 Inactive 不活跃的 当客户端主动断开服务端的链接后这个通道就是不活跃的。* 也就是说客户端与服务端关闭了通信通道并且不可以传输数据*/Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 从连接列表中移除该连接GlobalChannelPool.channels.remove(ctx.channel());log.info(客户端与服务端连接关闭{},客户端名字{}, ctx.channel().remoteAddress().toString(),ctx.name());}/*** 接收客户端发送的消息 channel 通道 Read 读 简而言之就是从通道中读取数据* 也就是服务端接收客户端发来的数据。但是这个数据在不进行解码时它是ByteBuf类型的*/Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {log.info(接收到来自客户端的信息{}, ctx.channel().remoteAddress().toString());if (msg ! null) {// WebSocket消息处理String webSocketInfo ((TextWebSocketFrame) msg).text().trim();log.info(收到webSocket消息 webSocketInfo);}else { //数据处理为空log.warn(接收到来自客户端的信息{} warn消息为空, ctx.channel().remoteAddress().toString());}}/**** channel 通道 Read 读取 Complete 完成 在通道读取完成后会在这个方法里通知* 对应可以做刷新操作 ctx.flush()*/Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}/*** exception 异常 Caught 抓住 抓住异常当发生异常的时候可以做一些相应的处理比如打印日志、关闭链接*/Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error(发生异常的连接是 ctx.channel().id());log.error(netty服务端发送异常 cause);cause.printStackTrace();ctx.close();}
}
3.6 Socket 协议通道数据处理
package com.ruoyi.entrust.utils.netty;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.constant.HttpStatus;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.entrust.constant.NettyDataParam;
import com.ruoyi.entrust.utils.JsonUtil;
import com.ruoyi.examination.domain.DetectionOrder;
import com.ruoyi.examination.domain.Order;
import com.ruoyi.examination.service.impl.DetectionOrderServiceImpl;
import com.ruoyi.examination.service.impl.ExaminationOrderServiceImpl;
import com.ruoyi.examination.utils.SpringContextUtil;
import io.netty.channel.*;
import com.ruoyi.examination.domain.Message;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;/*** program: ruoyi* Author LiuZedi* Date 2022/1/17 21:09*/Slf4j
ChannelHandler.Sharable
public class SocketServerHandler extends SimpleChannelInboundHandlerObject {/*** 建立连接** channel 通道 action 活跃的 当客户端主动链接服务端的链接后这个通道就是活跃的了。* 也就是客户端与服务端建立了通信通道并且可以传输数据*/Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {// 把新连接保存到临时连接表中GlobalChannelPool.temporaryChannels.add(ctx.channel());log.info(客户端与服务端连接开启{},客户端名字{}, ctx.channel().remoteAddress().toString(),ctx.name());}/*** 断开连接** channel 通道 Inactive 不活跃的 当客户端主动断开服务端的链接后这个通道就是不活跃的。* 也就是说客户端与服务端关闭了通信通道并且不可以传输数据*/Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {// 从连接列表中移除该连接GlobalChannelPool.channels.remove(ctx.channel());log.info(客户端与服务端连接关闭{},客户端名字{}, ctx.channel().remoteAddress().toString(),ctx.name());}/*** 接收客户端发送的消息 channel 通道 Read 读 简而言之就是从通道中读取数据* 也就是服务端接收客户端发来的数据。但是这个数据在不进行解码时它是ByteBuf类型的*/Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {log.info(接收到来自客户端的信息{}, ctx.channel().remoteAddress().toString());// WebSocket消息处理if (msg instanceof WebSocketFrame) {log.warn(Socket Handler 接收到WebSocket信息 请检查);}// Socket消息处理else{String requestData (String)msg;log.info(收到socket消息 requestData);}}/**** channel 通道 Read 读取 Complete 完成 在通道读取完成后会在这个方法里通知* 对应可以做刷新操作 ctx.flush()*/Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}/*** exception 异常 Caught 抓住 抓住异常当发生异常的时候可以做一些相应的处理比如打印日志、关闭链接*/Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error(发生异常的连接是 ctx.channel().id());log.error(netty服务端发送异常 cause);cause.printStackTrace();ctx.close();}
}3.7 Netty 公共工具类
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.constant.HttpStatus;
import com.ruoyi.common.core.domain.AjaxResult;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** program: ruoyi* Author LiuZedi* Date 2022/1/18 16:59*/
Slf4j
public class NettyUtil {/*** 将路径参数转换成Map对象如果路径参数出现重复参数名将以最后的参数值为准* param uri 传入的携带参数的路径* return*/public static MapString, String getParams(String uri) {MapString, String params new HashMap(10);int idx uri.indexOf(?);if (idx ! -1) {String[] paramsArr uri.substring(idx 1).split();for (String param : paramsArr) {idx param.indexOf();params.put(param.substring(0, idx), param.substring(idx 1));}}return params;}/*** 获取URI中参数以外部分路径* param uri* return*/public static String getBasePath(String uri) {if (uri null || uri.isEmpty())return null;int idx uri.indexOf(?);if (idx -1)return uri;return uri.substring(0, idx);}/*** 发布消息到对应 通道** param channel 当前连接通道* param message 所需发布消息* param requestJsonObject 查询通道的参数* param protocol 通道连接协议* return 是否有对应通道*/public static boolean pushInfoToChannels(Channel channel, String message, JSONObject requestJsonObject, String protocol){boolean haveChannels false;// 查询对应 连接通道ListChannel channels GlobalChannelPool.getChannelByKey(requestJsonObject.getString(module),protocol,requestJsonObject.getString(keyName),requestJsonObject.getString(key));if (null channels || channels.size() 0){log.warn(没有发现对应{}客户端模块为{}请求连接模式为{}KeyName为{}Key为{}, protocol,requestJsonObject.getString(module),protocol,requestJsonObject.getString(keyName),requestJsonObject.getString(key));channel.writeAndFlush(JSON.toJSONString(AjaxResult.error(HttpStatus.NO_WEBSOCKET,没有发现对应webSocket客户端)));}else {log.info(发现多个对应{}客户端数量为{}模块为{}请求连接模式为{}KeyName为{}Key为{}, protocol,channels.size(), requestJsonObject.getString(module), protocol, requestJsonObject.getString(keyName),requestJsonObject.getString(key));haveChannels true;}// 发送数据到对应的通道if (haveChannels){for (Channel ch: channels){ch.writeAndFlush(new TextWebSocketFrame(message));}}return haveChannels;}/*** 发布消息到对应 通道** param message 所需发布消息* param requestJsonObject 查询通道的参数* param protocol 通道连接协议* return 是否有对应通道*/public static boolean pushInfoToChannels(String message, JSONObject requestJsonObject, String protocol){boolean haveChannels false;// 查询对应 连接通道ListChannel channels GlobalChannelPool.getChannelByKey(requestJsonObject.getString(module),protocol,requestJsonObject.getString(keyName),requestJsonObject.getString(key));if (null channels || channels.size() 0){log.warn(没有发现对应{}客户端模块为{}请求连接模式为{}KeyName为{}Key为{}, protocol,requestJsonObject.getString(module),protocol,requestJsonObject.getString(keyName),requestJsonObject.getString(key));}else {log.info(发现多个对应{}客户端数量为{}模块为{}请求连接模式为{}KeyName为{}Key为{}, protocol,channels.size(), requestJsonObject.getString(module), protocol, requestJsonObject.getString(keyName),requestJsonObject.getString(key));haveChannels true;}// 发送数据到对应的通道if (haveChannels){for (Channel ch: channels){ch.writeAndFlush(new TextWebSocketFrame(message));}}return haveChannels;}
}3.8 Netty 启动类
此处 Netty作为单独服务器启动与SpringBoot使用端口并不一致
通过实现CommandLineRunner接口和添加Component注解使启动Netty服务器在项目启动之后执行
Spring boot的CommandLineRunner接口主要用于实现在应用初始化后去执行一段代码块逻辑这段初始化代码在整个应用生命周期内只会执行一次。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;/*** program: ruoyi* Author LiuZedi* Date 2022/1/15 14:55*/
Order(1)
Component
public class NettyStart implements CommandLineRunner {Autowiredprivate NettySocketServer nettySocketServer;Overridepublic void run(String... args) throws Exception {nettySocketServer.start();}
}总结
以上就是今天要讲的内容本文介绍了Netty的使用Netty提供异步的、事件驱动的网络应用程序框架和工具用以快速开发高性能、高可靠性的网络服务器和客户端程序。我们可以快速使用此类方法进行使用开发自己的网络服务器。
之后我将写一篇文章介绍如何使用Netty 实现 通过微信小程序 在网页上进行电子签字的功能。