网站的形成,网站开发注意问题,旧房装修 翻新的公司,怎么免费下wordpress因工作中需要给第三方屏幕厂家下发广告#xff0c;音频#xff0c;图片等内容#xff0c;对方提供TCP接口于是我使用Netty长链接进行数据传输
1.添加依赖 !-- netty依赖--dependencygroupIdio.netty/groupIdartifactIdnetty-all音频图片等内容对方提供TCP接口于是我使用Netty长链接进行数据传输
1.添加依赖 !-- netty依赖--dependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactId/dependency 2.创建Netty服务
Slf4j
Component
public class NettyServer {public void start(InetSocketAddress address) {//配置服务端的NIO线程组EventLoopGroup bossGroup new NioEventLoopGroup();EventLoopGroup workerGroup new NioEventLoopGroup();try {// 绑定线程池,编码解码//服务端接受连接的队列长度如果队列已满客户端连接将被拒绝ServerBootstrap bootstrap new ServerBootstrap().group(bossGroup, workerGroup)// 指定Channel.channel(NioServerSocketChannel.class)//使用指定的端口设置套接字地址.localAddress(address)//使用自定义处理类.childHandler(new NettyServerChannelInitializer())//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数.option(ChannelOption.SO_BACKLOG, 128)//保持长连接2小时无数据激活心跳机制.childOption(ChannelOption.SO_KEEPALIVE, true)//将小的数据包包装成更大的帧进行传送提高网络的负载.childOption(ChannelOption.TCP_NODELAY, true);// 绑定端口开始接收进来的连接ChannelFuture future bootstrap.bind(address).sync();if (future.isSuccess()) {log.info(netty服务器开始监听端口{},address.getPort());}//关闭channel和块直到它被关闭future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}3.创建Socket配置类也可以直接在步骤2中写死
1.在配置文件中
socket:# 监听端口 8090port: 8090#ip地址host: 0.0.0.0
# host: 192.168.31.2Setter
Getter
ToString
Component
Configuration
PropertySource(classpath:application.yml)
ConfigurationProperties(prefix socket)
public class SocketProperties {private Integer port;private String host;}4.在springboot 启动类中启用Netty服务
SpringBootApplication
public class Application implements CommandLineRunner {public static void main(String[] args) {SpringApplication application new SpringApplication(Application.class);application.setApplicationStartup(new BufferingApplicationStartup(2048));application.run(args);}Resourceprivate NettyServer nettyServer;Resourceprivate SocketProperties socketProperties;Overridepublic void run(String... args) {InetSocketAddress address new InetSocketAddress(socketProperties.getHost(),socketProperties.getPort());nettyServer.start(address);}}5.创建字符解析器解析收到的消息
/*** 功能描述: 服务端初始化客户端与服务器端连接一旦创建这个类中方法就会被回调设置出站编码器和入站解码器**/
public class NettyServerChannelInitializer extends ChannelInitializerSocketChannel {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline socketChannel.pipeline();//接收消息格式,使用自定义解析数据格式
// pipeline.addLast(decoder,new MyDecoder());//发送消息格式使用自定义解析数据格式
// pipeline.addLast(encoder,new MyEncoder());pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));//针对客户端如果在1分钟时没有想服务端发送写心跳(ALL)则主动断开//如果是读空闲或者写空闲不处理,这里根据自己业务考虑使用pipeline.addLast(new IdleStateHandler(0,0,90, TimeUnit.SECONDS));//自定义的空闲检测pipeline.addLast(new NettyServerHandler());}
}6.创建Handler 类处理消息
/*** 功能描述: netty服务端处理类*/Slf4j
Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {/*** 功能描述: 有客户端连接服务器会触发此函数** param ctx 通道* return void*/Overridepublic void channelActive(ChannelHandlerContext ctx) {InetSocketAddress insocket (InetSocketAddress) ctx.channel().remoteAddress();String clientIp insocket.getAddress().getHostAddress();int clientPort insocket.getPort();//获取连接通道唯一标识ChannelId channelId ctx.channel().id();//如果map中不包含此连接就保存连接if (ChannelMap.getChannelMap().containsKey(channelId)) {log.info(客户端:{},是连接状态连接通道数量:{} , channelId, ChannelMap.getChannelMap().size());} else {//保存连接ChannelMap.addChannel(channelId, ctx.channel());log.info(客户端:{},连接netty服务器[IP:{}--PORT:{}], channelId, clientIp, clientPort);log.info(连接通道数量: {}, ChannelMap.getChannelMap().size());}}/*** 功能描述: 有客户端终止连接服务器会触发此函数* param ctx 通道处理程序上下文* return void*/Overridepublic void channelInactive(ChannelHandlerContext ctx) {InetSocketAddress inSocket (InetSocketAddress) ctx.channel().remoteAddress();String clientIp inSocket.getAddress().getHostAddress();ChannelId channelId ctx.channel().id();//包含此客户端才去删除if (ChannelMap.getChannelMap().containsKey(channelId)) {//删除连接ChannelMap.getChannelMap().remove(channelId);log.info(客户端:{},断开netty服务器[IP:{}--PORT:{}], channelId, clientIp, inSocket.getPort());log.info(连接通道数量: ChannelMap.getChannelMap().size());}}TransactionalOverridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf (ByteBuf) msg;ByteBuf rebuf Unpooled.buffer();RedisUtils.setChannelId(ctx.channel().id().toString(), ctx.channel().id());// 读取帧头标识byte frameHeader buf.readByte();if (frameHeader ! 0x7E) {byte[] data ByteBufUtil.getBytes(buf);String hex bytesToHex(data);buf.release();String content ((ByteBuf) msg).toString(Charset.defaultCharset());} // 读取消息帧类型else {byte messageType buf.readByte();// 读取帧尾标识if (buf.isReadable()) {// 读取校验值byte checksum buf.readByte();byte frameTail buf.readByte();}}buf.release();}/*** 功能描述: 服务端给客户端发送消息** param channelId 连接通道唯一id* param msg 需要发送的消息内容* return void*/public void channelWrite(ChannelId channelId, Object msg) throws Exception {Channel channel ChannelMap.getChannelMap().get(channelId);if (channel null) {log.info(通道:{},不存在, channelId);return;}if (msg null || msg ) {log.info(服务端响应空的消息);return;}//将客户端的信息直接返回写入ctxchannel.write(msg);//刷新缓存区channel.flush();}Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {String socketString ctx.channel().remoteAddress().toString();if (evt instanceof IdleStateEvent) {IdleStateEvent event (IdleStateEvent) evt;if (event.state() IdleState.READER_IDLE) {log.info(Client:{},READER_IDLE 读超时, socketString);Channel channel ctx.channel();ChannelId id channel.id();// 超时未收到心跳包更新设备状态为离线// todo 更新设备状态ctx.disconnect();ChannelMap.removeChannelByName(id);} else if (event.state() IdleState.WRITER_IDLE) {log.info(Client:{}, WRITER_IDLE 写超时, socketString);ctx.disconnect();Channel channel ctx.channel();ChannelId id channel.id();ChannelMap.removeChannelByName(id);} else if (event.state() IdleState.ALL_IDLE) {log.info(Client:{},ALL_IDLE 总超时, socketString);Channel channel ctx.channel();ChannelId id channel.id();// 超时未收到心跳包更新设备状态为离线// todo 更新设备状态ctx.disconnect();ChannelMap.removeChannelByName(id);}}}/*** 功能描述: 发生异常会触发此函数** param ctx 通道处理程序上下文* param cause 异常* return void*/Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();log.info({}:发生了错误,此连接被关闭。此时连通数量:{}, ctx.channel().id(), ChannelMap.getChannelMap().size());}}ChannelMap类
/*** 功能描述: 管理通道Map类**/
public class ChannelMap {/*** 管理一个全局map保存连接进服务端的通道数量*/private static final ConcurrentHashMapChannelId, Channel CHANNEL_MAP new ConcurrentHashMap(128);public static ConcurrentHashMapChannelId, Channel getChannelMap() {return CHANNEL_MAP;}/*** 获取指定name的channel*/public static Channel getChannelByName(ChannelId channelId){if(CollectionUtils.isEmpty(CHANNEL_MAP)){return null;}return CHANNEL_MAP.get(channelId);}/*** 将通道中的消息推送到每一个客户端*/public static boolean pushNewsToAllClient(String obj){if(CollectionUtils.isEmpty(CHANNEL_MAP)){return false;}for(ChannelId channelId: CHANNEL_MAP.keySet()) {Channel channel CHANNEL_MAP.get(channelId);channel.writeAndFlush(new TextWebSocketFrame(obj));}return true;}/*** 将channel和对应的name添加到ConcurrentHashMap*/public static void addChannel(ChannelId channelId,Channel channel){CHANNEL_MAP.put(channelId,channel);}/*** 移除掉name对应的channel*/public static boolean removeChannelByName(ChannelId channelId){if(CHANNEL_MAP.containsKey(channelId)){CHANNEL_MAP.remove(channelId);return true;}return false;}}