天津市建设教育培训中心的网站,网站建设方案模版,WordPress文章类别ID,给静态网站加后台序 Netty的影响力以及使用场景就不用多说了#xff0c; 去年10月份后#xff0c;就着手研究Netty源码#xff0c;之前研究过Spring源码#xff0c;MyBatis源码#xff0c;java.util.concurrent源码#xff0c;tomcat源码#xff0c;发现一个特点#xff0c;之前的源码都…序 Netty的影响力以及使用场景就不用多说了 去年10月份后就着手研究Netty源码之前研究过Spring源码MyBatis源码java.util.concurrent源码tomcat源码发现一个特点之前的源码都好打断点调试而Netty的源码就没有那么容易打断点了因为Netty中使用了大量的线程很容易将任务添加到执行器中 而执行器是什么呢 NioEventLoop又是什么呢 他们在代码中出现非常困扰着我刚学习Netty的时候打断点都不知道怎样打就像无头苍蝇一样乱撞没有办法没有思绪因此去网上找书最终找到了《Netty源码剖析与应用》这本书当然中途还看过其他的书但目前没有什么印象了但是我看过的书籍中我觉得写得最好的还是《Netty源码剖析与应用》如果你也在学习Netty我建议这本书也是必看的书籍刚开始看时 确实觉得这本书写得非常好但例子少之前研究Spring源码时读过一本书 《Spring源码深度解析》我觉得这本书的写作方式是非常让人喜欢的通过例子的方式来分析Spring源码可能不同的源码去阅读和写作的方式可能也不一样吧因此没有过多柯求的地方 言归正传《Netty源码剖析与应用》这本书例子少但里面的理论知识写得非常好但对于初学者来说就觉得里面遍地是黄金但又不知从何拾起的感觉我看了两遍就觉得学习到了但又感觉什么没有学习到原因是里面的知识点是零碎的在我们的对Netty的认知体系中没有形成知识网因此没有办法这个时候又想办法学习了《netty高并发-张龙》课程 同时从网上学习了《C-1100图灵Java四期腾讯课堂2021》 Netty相关的知识从中学习到Netty使用的大量例子同时开阔了自己的视野明确了自己的学习方向因此就从一个简单的例子出发打断点研究Netty的源码当然中途发现Netty自己重写了ThreadLocal 叫FastThreadLocal因此写了一篇 《Netty源码性能分析 - ThreadLocal PK FastThreadLocal》博客 发现Netty为了提升性能并没有用jdk自带的队列因此对Netty中使用的队列又做了研究 因此写了《Netty源码性能分析MpscChunkedArrayQueue MpscUnboundedArrayQueue MpscArrayQueue MpscLinkedAtomicQueue》 这篇博客 发现Netty的消息机制很牛逼 因此写了 《Netty 之 DefaultPromise 源码解析》 这篇博客 在源码解析到 读取字节这一块时发现Netty内存管理这一块的代码也写得非常好因此写了《Netty源码解析之内存管理-PooledByteBufAllocator-PoolArena》这篇博客而Netty为了提升性能不用jdk的ByteBuffer来做为内部数据传输而是自己写了一套ByteBuf来来传输数据因此写了《Netty缓冲区ByteBuf源码解析》这篇博客来分析在分配内存时发现Netty每次都智能的创建ByteBuf容量大小因此写了《RecvByteBufAllocator内存分配计算》这篇博客来分析通过不断的遇到问题然后去分析问题经过半年的努力终于Netty的学习和研究也接近尾声有人可能觉得你研究个Netty源码需要半年 但工作中的人应该知道在工作之余再静下心来去研究源码是一件多么困难的事情这也不说了至少我觉得现在对Netty的源码也不再陌生对Netty的使用也得心应手同时如果去开发Netty的插件我觉得也不是什么难的事情了。 因此此时我自信满满如果你也在学习Netty不管你的学习方法如何学习途径如何但最终也要达到自信满满的效果这个不是为了面试用而是让自己都觉得自己对Netty的源码从心底里感觉自信。 话不多说了先来看第一个例子我们从这个例子开始深入研究Netty的源码而Netty主线源码研究分为《Netty 源码解析上》 和《Netty 源码解析下》两篇当然有兴趣可以看完即使没有从中学习到知识也希望能得到启发吧。 先来看服务端代码
public class NettyServer {public static void main(String[] args) throws Exception {// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍// bossGroup只是处理连接请求 ,真正的和客户端业务处理会交给workerGroup完成EventLoopGroup bossGroup new NioEventLoopGroup(3);EventLoopGroup workerGroup new NioEventLoopGroup(8);try {// 创建服务器端的启动对象ServerBootstrap bootstrap new ServerBootstrap();// 使用链式编程来配置参数bootstrap.group(bossGroup, workerGroup) //设置两个线程组// 使用NioServerSocketChannel作为服务器的通道实现.channel(NioServerSocketChannel.class)// 初始化服务器连接队列大小服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理// SO_BACKLOG: 此为TCP参数表示服务器端接收连接的队列长度如果队列已满客户端连接将被拒绝默认值在Windows中为200其他操作系统为128 。.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializerSocketChannel() {//创建通道初始化对象设置初始化参数在 SocketChannel 建立起来之前执行Overrideprotected void initChannel(SocketChannel ch) throws Exception {//对workerGroup的SocketChannel设置处理器ch.pipeline().addLast(new NettyServerHandler());}});System.out.println(netty server start。。);// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象通过isDone()等方法可以判断异步事件的执行情况// 启动服务器(并绑定端口)bind是异步操作sync方法是等待异步操作执行完毕ChannelFuture cf bootstrap.bind(9000).sync();// 给cf注册监听器监听我们关心的事件cf.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (cf.isSuccess()) {System.out.println(监听端口9000成功);} else {System.out.println(监听端口9000失败);}}});// 等待服务端监听端口关闭closeFuture是异步操作// 通过sync方法同步等待通道关闭处理完毕这里会阻塞等待通道关闭完成内部调用的是Object的wait()方法cf.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}NettyServerHandler
/*** 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {/*** 当客户端连接服务器完成就会触发该方法** param ctx* throws Exception*/Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println(客户端连接通道建立完成);}/*** 读取客户端发送的数据** param ctx 上下文对象, 含有通道channel管道pipeline* param msg 就是客户端发送的数据* throws Exception*/Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {//Channel channel ctx.channel();//ChannelPipeline pipeline ctx.pipeline(); //本质是一个双向链接, 出站入站//将 msg 转成一个 ByteBuf类似NIO 的 ByteBufferByteBuf buf (ByteBuf) msg;System.out.println(收到客户端的消息: buf.toString(CharsetUtil.UTF_8));}/*** 数据读取完毕处理方法** param ctx* throws Exception*/Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ByteBuf buf Unpooled.copiedBuffer(HelloClient.getBytes(CharsetUtil.UTF_8));ctx.writeAndFlush(buf);}/*** 处理异常, 一般是需要关闭通道** param ctx* param cause* throws Exception*/Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}
}6.3 详解Bootstrap启动器类 Bootstrap 类是Netty 提供了一个使得的工厂类可以通过它来完成Netty 的客户端或者服务器端的Netty组装以及Netty 程序的初始化当然Netty 的官方解释是完全可以不用这个Bootstrap启动类但是一点点的手动去创建通道完成各种设置和启动并且注册到EventLoop这个过程非常麻烦通常情况下还是使用这个使得的Bootstrap工具类会效率更高。 Netty中有两个启动器类分别用在服务器和客户端如下图6-7所示 。 这两个启动器仅仅是使用地方不同它们大致的配置和使用方法都是相同的下面以ServerBootstrap 服务器启动类作为重点的介绍对象 。 在介绍ServerBootstrap 的服务器启动流程之前首先介绍一下涉及到的两个基本概念父类通道EventLoopGroup线程组事件循环线程组。
6.3.1 父子通道 在Netty 中每一个NioSocketchannel 通道所土法的是Java NIO 通道再往下就是对应的操作系统底层socket描述符理论上来说操作系统底层的socket描述符分为两类。
连接监听类型连接监听类型的socket 描述符放在服务器端它负责接收客户端的套接字连接在服务器端一个连接监听类型的socket描述符可以接受Accept 成千上万的传输类的socket描述符。传输数据类型数据传输类的socket描述符负责传输数据现一条TCP的Socket 传输链路在服务器和客户端都分别会有一个与之相应的数据传输类型socket描述符。 在Netty 中异步非阻塞的服务器端监听通道NioServerSocketChannel ,封装在Linux 底层描述符是连接监听类型socket描述符而NioSocketChannel异步非阻塞TCP Socket 传输通道封装在底层的Linux描述符是数据传输类型的socket描述符。 在Netty中将有接收关系的NioServerSocketChannel和NioSocketChannel 叫作父子通道其中NioServerSocketChannel 负责服务器连接监听和接收也叫父通道ParentChannel , 对应每一个接收到的NioSocketChannel 传输类通道也叫子通道ChildChannel 。
6.3.2 EventLoopGroup 线程组 Netty 中的Reactor 反应器模式肯定不是单线程版本的反应器模式而是多线程版本的反应器模式Netty 的多线程版本的反应器模式是如何实现的呢 在Netty 中一个EventLoop相当于一个子反应器(SubReactor) 大家已经知道一个NioEventLoop子反应器拥有一个线程同时拥有一个Java NIO选择器 Netty 是如何组织外层的反应器的呢 答案是使用了EventLoopGroup线程组多个EventLoop线程组成了一个EventLoopGroup线程组。 反过来说Netty 的EventLoopGroup 线程组就是一个多线程版本的反应器而其中单EventLoop线程对应于一个子反应器(SubReactor)。 Netty 的程序开发不会直接使用单个EventLoop线程而是使用EventLoopGroup线程组EventLoopGroup的构造函数有一个参数用于指定内部的线程数在构造器初始化时 会按照传入的线程数量在内部构造多个Thread 线程和多个EventLoop子反应器(一个线程对应一个EventLoop子反应器)进行多线程的IO 事件查询和分发。 如果使用了EventLoopGroup的无参数的构造函数没有传入线程数或者传入的线程数为0 那么EventLoopGroup 内部的线程数到底是多少呢默认的EventLoopGroup的内部线程数为最大可用CPU 处理数量的2倍 假设电脑使用的是4核的CPU, 那么在内部会启动8个EventLoop 线程相当于8个子反应器(SubReactor)实例。 从前文可知为了及时接受Accept 到新连接在服务器端一般有两个独立的反应器一个反应器负责新连接的监听和接受另一个反应器负责IO 事件处理对应到Netty 服务器程序中则是设置两个EventLoopGroup 线程组一个EventLoop负责新连接的监听和接受一个EventLoopGroup负责IO事件的处理。 那么两个反应器如何分工的呢 负责新连接的监听和接受EventLoopGroup线程组查询父通道的IO事件有点像负责招工的包工头 因此可以形象的称为 “包工头”(Boss)线程组 另一个EventLoopGroup线程组负责查询所有的子通道的IO 事件并且执行Handler处理器中的业务处理例如数据的输入和输出 有点像搬砖这个线程组可以形象的称为工人(Worker)线程组。
6.3.3 Bootstrap的启动流程 Bootstrap的启动流程也就是Netty组件的组装配置 以及Netty 服务器或者客户端的启动流程在本节中对启动流程进行了梳理大致分成8个步骤本书仅仅演示了是服务器端启动器的使用用到了启动器类为ServerBootstrap ,正式使用前首先创建一个服务器端的启动器实例。 // 创建一个服务器端的启动器 ServerBootstrap b new ServerBootstrap(); 接下来结合前面的NettyDiscardServer服务器的程序代码给大家详细的介绍一下Bootstrap启动流程中精彩的8人步骤 。
第1步创建反应器线程组并赋值给ServerBootstrap 启动器实例。 // 创建反应器线程组 // boss线程组 EventLoopGroup bossLoopGroup new NioEventLoopGroup(1); // worker线程组 EventLoopGroup workerLoopGroup new NioEventLoopGroup(); // 设置反应器线程组 b.group(bossLoopGroup, workerLoopGroup); 在设置反应器线程组之前创建了两个NioEventLoopGroup线程组一个负责处理连接监听的IO 事件名为bossLoopGroup ,另一个负责数据的IO事件和Handler 业务处理名为workerLoopGroup 。 在线程组创建完之后就可以配置给启动器实例 调用的方法是b.group(bossGroup , workerGroup) 它一次性的给启动器配置了两大线程组。 不一定非得配置两个线程组可以仅配置一个EventLoopGroup反应器线程组 具体的配置方法是调用b.group(workerGroup)在这种模式下连接监听IO 事件和数据传输IO事件可能被挤到在了同一个线程中处理 这样会带来一定的风险新连接的接受被更加耗时的数据传输或者业务处理所阻塞 。 在服务器端建议设置成两个线程组的工作模式 。
第2步设置通道的IO类型。 Netty 不止支持Java NIO ,也支持阻塞式OIO(也称为BIOBlock-IO即阻塞式IO) ,下面配置的是Java NIO 类型的通道顾炎武方法如下 // 2 设置NIO 类型的通道 b.channel(NioServerSocketChannel.class); 如果确实需要指定Bootstrap的IO 类型为BIO那么这里配置上Netty 的OioServerSocketChannel.class类即可 由于Nio的优势巨大通常不会在Netty 中使用BIO 。
第3步设置监听端口 b.localAddress(new InetSocketAddress(port));
第4步设置传输通道的配置选项 b.option(ChannelOption.SO_KEEPALIVE,true); b.option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT); 这里用到了Bootstrap的option()选项的设置方法对于服务器的Bootstrap而言这个方法的作用是给父通道(Parent Channel) 接收连接通道设置一些选项。 如果要给子通道(Child Channel) 设置一些通道选项则需要用另外的childOption()设置方法 。 可以设置哪些通道选项呢(ChannelOption)呢在上面的代码中 设置了一个底层的TCP相关的选项ChannelOption.SO_KEEPALIVE该选项表示是否开启了TCP底层的心跳机制true为开启false为关闭。
第5步装配子通道的Pipeline流水线 上一节介绍到每一个通道的子通道都用一条ChannelPipeline流水线它的内部有一又向链表装配流水线的方式是将业务处理ChannelHandler实例加入到双向链表中。 装配子通道的Handler流水线调用childHandler()方法传递一个ChannelInitializer通道的初始化类的实例在父通道成功接收到一个连接 并创建成功一个子通道后就会初始化子通道这里配置了ChannelInitializer实例就会被调用 。 在ChannelInitalizer通道初始化类的实例中有一个initChannel()初始化方法在子通道创建后被执行到向子通道流水线增加业务处理器。 // 5 装配子通道流水线 b.childHandler(new ChannelInitializerSocketChannel(){ // 有一个连接达到会创建一个通道的子通道并初始化 protected void initChannel(SocketChannel ch) throws Exception(){ // 流水线管理子通道中的Handler业务处理器 // 向通道流水线添加一个Handler 业务处理器 ch.pipeline().addLast(new NettyDiscardHandler()); } }) 为什么仅装配子通道的流水线呢 而不需要装配父通道的流水线呢 原因是 父通道也就是NioServerSocketChannel 连接接受通道它的内部业务处理是固定的接受新连接后创建子通道然后初始化子通道所以不需要特别的配置如果需要完成特殊的业务处理可以使用ServerBootstrap的handler(ChannelHandler handler)方法为父通道设置ChannelInitializer初始化器。 说明一下ChannelInitializer处理器有一个泛型参数SocketChannel它代表需要通道类型这个类型需要和前面的启动器中设置的通道类型一一对应起来 。
第6步开始绑定服务器新连接的监听端口 // 开始绑定端通过调用sync()同步方法阻塞直到绑定成功 ChannelFuture channelFuture b.bind().sync() ; System.out.println(“服务器启动成功监听端口” channelFuture.channel().localAddress()); 这个方法很简单b.bind()方法的功能返回一个端口绑定Netty 的异步任务channelFuture在这里并没有给channelFuture异步任务增加回调监听器而是阻塞channelFuture异步任务 直到端口绑定任务执行完成 。 在Netty中所有的IO 操作都是异步执行的这就意味着任何一个IO 操作会立刻返回在返回的时候异步任务还没有真正的执行什么时候执行完成呢 Netty中的IO 操作都会返回异步任务实例ChannelFuture实例通过自我阻塞一直到ChannelFuture异步任务执行完成或者 ChannelFuture增加事件监听器两种方式以获得Netty 中的IO操作真正的结果上面使用了第一种。 到这里服务器正式启动。
第7步自我阻塞直到通道关闭
// 7 等待通道关闭 // 自我阻塞直到通道壮志凌云的异步任务结束 ChannelFuture closeFuture channelFuture.channel().closeFuture() ; closeFuture.sync(); 如果要阻塞的当前线程直到通道关闭可以使用通道的closeFuture()方法以获取通道关闭的异步任务当通道被关闭时 closeFuture实例的sync()方法会返回 。
第8步关闭EventLoopGroup Reactor 反应器线程组同时会关闭内部的subReactor子反应器线程也会关闭内部的Selector 选择器内部的轮询线程以及负责查询的所有子通道在子通道关闭后会释放掉底层的资源如TCP Socket文件描述符等。
6.3.4 ChannelOption 通道选项 无论是对于 NioServerSocketChannel父通道类型还是对于 NioSocketChannel子通道类型都可以设置一系列的ChannelOption选项在ChannelOption 类中定义了一大票通道选项下面介绍一些常见的选项。
SO_REVBUF,SO_SNDBUF 此为TCP参数每个TCP socket(套接字)在内核中都有一个发送缓冲区和一个接收缓冲区这两个选项就用来设置TCP 连接的两个缓冲区大小的TCP 的全双工作模式以及TCP滑动容器便 是依赖于这两个独立的缓冲区及其填充的状态 。
TCP_NODELAY 此为TCP参数表示立即发送数据默认值为TrueNetty的默认为True而操作系统默认为False该值用于设置Nagle算法的启用该算法将小的碎片数据连接成更大的报文(或数据包)来最小化所发送的报文数量如果需要发送一些较小的报文则需要禁用该算法Netty 默认禁用该算法从而最小化报文传输的延时。 说明一下这个参数的值与是否开启Nagle算法是相反的设置为true表示关闭设置为false表示开启通俗地讲 如果要求高实时性有数据发送时就立刻发送就设置为true如果需要减少发送次数和减少网络交互次数就设置为false。
SO_KEEPALIVE 此为TCP 参数表示底层的TCP 协议的心跳机制true为连接保持心跳默认为false,启用该功能TCP 会主动探测空闲连接的有效性可以将此功能视为TCP的心跳机制需要注意的是默认的心跳间隔是7200s 即2小时Netty 默认关闭该功能 。
SO_REFSEADDR 此为TCP参数设置为true时表示地址复用默认值为false有四种情况需要用到这个参数设置 。
当有一个相同的本地地址和端口的socket1处于TIME_WAIT状态时而我们希望启动程序的socket2要占用该地址和端口例如在重启服务且保持先前的端口时。有多块网上或用IP Alias技术的机器在同一个端口启动多个进程但每个进程绑定的本地IP地址不能相同 。单个进程绑定相同的端口到多个socket(套接字)上但每个socket绑定的IP地址不同 。完全相同的地址和端口重复绑定但这里只用UDP的多播不用于TCP 。
SO_LINGER 此为TCP参数表示关闭socket的延迟时间默认值为-1表示禁用该功能-1 表示socket.close()方法立即返回但操作系统底层会将发送到缓冲区全部发送到对端0 表示socket.close()方法立即返回操作系统放弃发送缓冲区的数据操作系统放弃发送缓冲区的数据直接向对端发送RST包对端收到复位错误非0整数值表示调用socket.close()方法的线程被阻塞直到延迟时间到来发送缓冲区中的数据发送完毕若超时则对端会收到复位错误 。
SO_BACKLOG 此为TCP参数表示服务器端接收连接的队列长度如果队列已满客户端连接将被拒绝默认值在Windows中为200其他操作系统为128 。
SO_BROADCAST 此为TCP参数表示设置广播模式 。
6.4 详解Channel 通道 先介绍一下在使用Channel 通道的过程中所涉及的主要成员方法然后为大家介绍一下Netty 所提供的一个专门的单元测试通道–EmbeddedChannel(嵌入式通道)。
6.4.1 Channel通道的主要成员和方法 在Netty中通道是其中的一个核心的概念之一代表着网络连接通道是通信的主题由它负责同对端进行网络通信可以写入数据到对端也可以从对端读取数据 。
protected AbstractChannel(Channel parent) {this.parent parent; // 父通道id newId();unsafe newUnsafe(); // 底层的NIO通道完成的实际的IO操作pipeline newChannelPipeline(); // 一条通道拥有一条流水线
}AbstractChannel内部有一个pipeline属性表示处理器的流水线Netty 在对通道进行初始化的时候将pipeline属性初始化为DefaultChannelPipeline的实例 这段代码也表明每个通道拥有一条ChannelPipeline处理器流水线 。 AbstractChannel内部有一个parent属性表示通道的父通道对于连接监听通道(如NIOServerSocketChannel实例)来说其父亲通道为null而对于每一条传输通道(如NioSocketChannel实例)其parent属性的值为接收到该连接的服务器连接监听通道 。 几乎所有的通道实现类都继承了AbstractChannel抽象类都拥有上面的parent和pipeline两个属性成员。 再来看一下在通道接口中所定义的几个重要的方法 。 方法1ChannelFuture connect(SocketAddress address); 此方法的作用为连接远程服务器方法的参数为远程服务器地址调用后会立即返回返回值为负责连接操作的异步任务ChannelFuture此方法在客户端的传输通道中使用。
方法2ChannelFuture bind(SocketAddress address) 此方法的作用为连接远程服务器方法的参数为远程服务器地址调用后会立即返回返回值为负责连接操作的异步任务ChannelFuture此方法在客户端传输通道中使用。
方法3ChannelFuture close() 此方法的作用为关闭通道连接返回连接关闭的ChannelFuture异步任务如果需要在连接正式关闭后执行其他操作则需要为异步任务设置回调方法或者调用ChannelFuture异步任务sync()方法来阻塞当前线程一直等到通道关闭的异步任务执行完毕 。
方法4channel read() 此方法的作用为读取通道数据并且启动入站处理具体来说内部的Java NIO Channel通道读取数据然后启动内部的Pipeline流水线开启数据读取的入站处理 此方法的返回通道自身用于链式调用 。
方法5 ChannelFuture write(Object o ) 此方法的作用为启程出站流水线处理 把处理后的最终数据写到底层Java NIO 通道 此方法的返回值为出站处理异步处理任务 。
方法6 Channel flush() 此方法的作用为将缓冲区中的数据立即写出到对端并不是每一次write操作都是将数据直接写出到对端write操作的作用在大部分情况下仅仅是写入到操作系统缓冲区操作系统将会根据缓冲区的情况决定什么时候把数据写到对端而执行flush()方法方始将缓冲区的数据写到对端 。 上面的6种方法 仅仅是比较常见的方法在Channel 接口中以及各种通道的实例同中还定义了大量的通道操作方法 在一般的日常开发中在如果需要用到请直接查询 Netty API 文档或Netty 源代码 。 这些理论知识在之前的博客中已经说明了但我觉得太重要了在这里又重复一遍对源码的阅读是有很大帮助的。 在之前的博客中分享过一个多路复用的例子在这篇博客中拿那个例子来分析怎样分析呢因为万变不离其中Netty的内部源码也是由IO多路复用的例子演变而来只要能从源码中找到例子中的代码也就能弄懂Netty 的大体架构了。 将例子罢出来。
public class NioSelectorServer {public static void main(String[] args) throws IOException {// 创建NIO ServerSocketChannelServerSocketChannel serverSocket ServerSocketChannel.open();serverSocket.socket().bind(new InetSocketAddress(9000));// 设置ServerSocketChannel为非阻塞serverSocket.configureBlocking(false);// 打开Selector处理Channel即创建epollSelector selector Selector.open();// 把ServerSocketChannel注册到selector上并且selector对客户端accept连接操作感兴趣SelectionKey selectionKey serverSocket.register(selector, SelectionKey.OP_ACCEPT);System.out.println(服务启动成功);while (true) {// 阻塞等待需要处理的事件发生selector.select();// 获取selector中注册的全部事件的 SelectionKey 实例SetSelectionKey selectionKeys selector.selectedKeys();IteratorSelectionKey iterator selectionKeys.iterator();// 遍历SelectionKey对事件进行处理while (iterator.hasNext()) {SelectionKey key iterator.next();// 如果是OP_ACCEPT事件则进行连接获取和事件注册if (key.isAcceptable()) {ServerSocketChannel server (ServerSocketChannel) key.channel();SocketChannel socketChannel server.accept();socketChannel.configureBlocking(false);// 这里只注册了读事件如果需要给客户端发送数据可以注册写事件SelectionKey selKey socketChannel.register(selector, SelectionKey.OP_READ);System.out.println(客户端连接成功);} else if (key.isReadable()) { // 如果是OP_READ事件则进行读取和打印SocketChannel socketChannel (SocketChannel) key.channel();ByteBuffer byteBuffer ByteBuffer.allocateDirect(128);int len socketChannel.read(byteBuffer);// 如果有数据把数据打印出来if (len 0) {System.out.println(接收到消息 new String(byteBuffer.array()));} else if (len -1) { // 如果客户端断开连接关闭SocketSystem.out.println(客户端断开连接);socketChannel.close();}}//从事件集合里删除本次处理的key防止下次select重复处理iterator.remove();}}}
}先来看NioEventLoopGroup的构造函数 。
public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null);
}从这里可以看到NioEventLoopGroup的构造函数的 Executor参数默认值为空。
public NioEventLoopGroup(int nThreads, Executor executor) {this(nThreads, executor, SelectorProvider.provider());
}大家可能觉得SelectorProvider.provider()这个是什么东西 回头看一下NioSelectorServer的ServerSocketChannel serverSocket ServerSocketChannel.open();这一行代码 进入ServerSocketChannel的open()方法。 发现熟悉没有创建ServerSocketChannel需要用到 SelectorProvider.provider()而provider()方法的内部又是如何实现呢
public static SelectorProvider provider() {synchronized (lock) {if (provider ! null)return provider;return AccessController.doPrivileged(new PrivilegedActionSelectorProvider() {public SelectorProvider run() {if (loadProviderFromProperty())return provider;if (loadProviderAsService())return provider;provider sun.nio.ch.DefaultSelectorProvider.create();return provider;}});}
}其中 provider sun.nio.ch.DefaultSelectorProvider.create()会根据操作系统来返回不同的实现类 Windows平台返回 WindowsSelectorProvider; 而 if(provider!null)return provider保证了整个Server程序中只有一个WindowsSelectorProvider对象看WindowsSelectorProvider.openSelector() 代码 。
public AbstractSelector openSelector() throws IOException(){return new WindowsSelectorImpl(this);
}new WindowsSelectorImpl() 的代码如下
WindowsSelectorImpl(SelectorProvider sp) throws IOException {super(sp);pollWrapper new PollArrayWrapper(INIT_CAP);wakeupPipe Pipe.open();wakeupSourceFd ((SelChImpl) wakeupPipe.source()).getFDVal();// Disable the Nagle algorithm so that the wakeup is more immediate SinkChannelImpl sink (SinkChannelImpl)wakeupPipe.sink() ; (sink.sc).socket().setTcpNoDelay(true);wakeupSinkFd ((SelChImpl)sink).getFDVal();pollWrapper.addWakeupSocket(wakeupSourceDf,0);
}其中Pipe.open()是关键这个方法在调用过程如下 。
public static Pipe open() throws IOException {return SelectorProvider.provider().openPipe() ;
}在SelectorProvider中代码如下
public Pipe openPipe() throws IOException{return new PipeImpl(this);
}再看一下PipeImpl()代码
PipeImpl(SelectorProvider var1) {long var2 IOUtil.makePipe(true);int var4 (int)(var2 32);int var5 (int)var2;FileDescriptor var6 new FileDescriptor();IOUtil.setfdVal(var6, var4);this.source new SourceChannelImpl(var1, var6);FileDescriptor var7 new FileDescriptor();IOUtil.setfdVal(var7, var5);this.sink new SinkChannelImpl(var1, var7);
} 其中IOUtil.makePipe(true);是一个本地方法
static native long makePipe(boolean var0);/** Returns two file descriptors for a pipe encoded in a long The read end of the pipe is returned in the high 32 bits while the write end is returned in the low 32 bits. */ static native _org makePipe(boolean blocking);
static native long makePipe(boolean var0);具体实现代码如下
JNIEXPORT jlong JNICALL Java_sun_nio_ch_IOUtil_makePipe(JNIEnv * env , jobject this,jboolean blocking ){int fd[2];if(pipe(cf) 0 ){JNU_ThrowIOExceptionWithLastError(env , Pipe failed);return 0 ; }if(blocking JNI_FALSE){if((confiureBlocking(fd[0[ , JNI_FALSE) 0 )|| (configureBlocking(fd[1],JNI_FALSE) 0 )){JNU_ThrowIOExceptionWithLastError(env, Configure blocking failed );close(fd[0]);close(fd[1]);return 0 ; }}return ((jlong) fd[0] 32 ) | (jlong) fd[1] ;
}static int configureBlocking(int fd , jboolean blocking){int flags fcntl(fd,F_GETFL);int newflags blocking? (flags ~ O_NONBLOCK) : (flags | O_NONBBLOCK) ;return (flags newflags) ? 0 : fcntl(fd,F_SETFL,newflags);
}正如下面这段注释所描述的内容 。
/**
Returns two file descriptors for a pipe encoded in a long . the read end of the pipe is returned in the high 32 bits, while the write end is returned in the slo 32 bits。 */ pollWrapper.addWakeupSocket(wakeupSourceFd,0); 这行代码把返回的Pipe的write端的FD放在pollWrapper中后面会发现这么做的是为了实现Selector的wakeup(); ServerSocketChannel.open() 的实现代码如下。
public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();
}SelectProvider的实现代码如下
public ServerSocketChannel openServerSocketChannel() throws IOException {return new ServerSocketChannelImpl(this);
}可见ServerSocketChannel也有WindowsSelectorImpl 的引用 。
ServerSocketChannelImpl(SelectorProvider var1) throws IOException {super(var1);this.fd Net.serverSocket(true);this.fdVal IOUtil.fdVal(this.fd);this.state 0;
}然后通过serverSocket.register(selector, SelectionKey.OP_ACCEPT);把Selector和Channel 绑定在一起 也就是把新建ServerSocketChannel时创建的FD 与Selector绑定在一起。 到此Server 端已经启动完成主要创建以下对象 。
WindowsSelectorProvider 为单例对象实际上是调用操作系统的API 。WindowsSelectorImpl 中包含了如下内容 。
pollWrapper : 保存Selector上注册的FD, 包括pipe的write端FD 和 ServerSocketChannel 所用的FD 。wakeupPipe : 通道 其实就是两个FD一个是Read端的 一个是write端的。 下面来看看Selector的select()方法selector.select()主要调用WindowsSelectorImpl中的doSelect()方法 。
protected in doSelect( long timeout) throws IOException {if(channelArray null){throw new ClosedSelectorException();}this.timeout timeout ;// set selector timeout processDeregisterQueue() ; if(iterruptTriggered){resetWakeupSocket();return 0 ;}adjustThreadsCount();finishedLock.reset();startLock.startThreads();try{begin();try{subSelector.poll();}catch(IOException e ){finishLock.setException(e);}if(threads.size() 0 ){finishLock.waitForHelperThreads();}}finally{end ();}finishLock.checkForExceptoin();processDeregisterQueue();int updated updateSelectedKeys();resetWakeupSocket();return updated ;
}其中subSelector.poll()是核心也就是轮询pollWrapper中保存的FD , 具体实现是调用 native方法的poll0();
private int poll() throws IOException{return poll0(pollWrapper.pollArrayAddress, Math.min(totalChannels, MAX_SELLECTABLE_FDS), readFds, writeFds,exceptFds, timeout);
}private native int poll0(int pollAddress,int numfds,itn [] readFds,itn [] writeFds, int [] exceptFds ,long timeout);// the first element of each array is the number of selected sockets
// Other elements are file descriptors of selected sockets
private final int [] readFds new int [MAX_SELECTABLE_FDS 1 ] ; // 保存发生read的FD
private final int [] writeFds new int[MAX_SELECTABLE_FDS 1 ] ; // 保存发生write 的FD
private final int [] exceptFds new int [MAX_SELECTABLE_FDS 1 ] ; // 保存发生在except 的FD poll0()会监听pollWrapper中的FD 有没有数据进出 这会造成I/O 阻塞直到有数据读写事件发生比如由于 pollWrapper 中保存的也有ServerSocketChannel 的FD 所以只要ClientSocket发一份数据到ServerSocket 那么poll0()就会返回又由于 pollWrapper 中保存的也有pipe的write端PD 所以只要pipe的write端向FD 发送一份数据也会造成poll0()返回 如果这两种情况都没有发生那么poll0()就会一直阻塞也就是selector.select()会一直阻塞如果有任何一种情况发生那么selector.select()就会返回所以在OperationServer的run()里要用while(true) 这样就可以保证Selector接收数据并处理完后继续监听poll() 。 再来看WindowsSelectorImpl.Wakeup();
public Selector wakeup(){synchronized(interruptLock) {if(! interruptTriggered){setWakeupSocket();interruptTriggered true;}}return this;
}private void setWakeupSocket(){setWakeupSocket0(wakeupSinkFd);
}private native void setWakeupSocket0(itn wakeupSinkFd);JNIEXPORT void JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIENV * env ,jclass this ,jint scoutFd ){/**Write on byte into the pipe */const char byte 1 ; send(scoutFd , byte , 1 , 0 );
}可见wakeup()是通过pipe的write端send(soutFd, byte , 1 ,0 ) 发送一个字节 1 来唤醒 poll()的所以在需要的时候就可以调用selector.wakeup()来唤醒Selector 。 继续接着NioEventLoopGroup的构造函数来看。
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}这里会发现 当NioEventLoopGroup调用super()方法时实际上调用的是MultithreadEventLoopGroup类的构造函数那他们之间的关系是什么呢
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}private static final int DEFAULT_EVENT_LOOP_THREADS;static {DEFAULT_EVENT_LOOP_THREADS Math.max(1, SystemPropertyUtil.getInt(io.netty.eventLoopThreads, NettyRuntime.availableProcessors() * 2));if (logger.isDebugEnabled()) {logger.debug(-Dio.netty.eventLoopThreads: {}, DEFAULT_EVENT_LOOP_THREADS);}
}在这里需要注意DEFAULT_EVENT_LOOP_THREADS这个变量当我们不传nThreads值时也没有配置io.netty.eventLoopThreads变量默认nThreads的值为NettyRuntime.availableProcessors() * 2 处理器个数的两倍,继续看MultithreadEventExecutorGroup的构造函数。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}当然DefaultEventExecutorChooserFactory.INSTANCE 默认是DefaultEventExecutorChooserFactory后面用到再来分析 。 进入MultithreadEventExecutorGroup的构造方法 。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {// 如果线程数小于等于0则抛出异常if (nThreads 0) {throw new IllegalArgumentException(String.format(nThreads: %d (expected: 0), nThreads));}if (executor null) {executor new ThreadPerTaskExecutor(newDefaultThreadFactory());}children new EventExecutor[nThreads];for (int i 0; i nThreads; i ) {boolean success false;try {children[i] newChild(executor, args);success true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException(failed to create a child event loop, e);} finally {if (!success) {for (int j 0; j i; j ) {children[j].shutdownGracefully();}for (int j 0; j i; j ) {EventExecutor e children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}chooser chooserFactory.newChooser(children);final FutureListenerObject terminationListener new FutureListenerObject() {Overridepublic void operationComplete(FutureObject future) throws Exception {if (terminatedChildren.incrementAndGet() children.length) {terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}SetEventExecutor childrenSet new LinkedHashSetEventExecutor(children.length);Collections.addAll(childrenSet, children);readonlyChildren Collections.unmodifiableSet(childrenSet);
}先来看executor new ThreadPerTaskExecutor(newDefaultThreadFactory()); 这一行代码当executor为空时会默认创建ThreadPerTaskExecutor作为executor而ThreadPerTaskExecutor的代码如下
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory null) {throw new NullPointerException(threadFactory);}this.threadFactory threadFactory;}Overridepublic void execute(Runnable command) {threadFactory.newThread(command).start();}
}从ThreadPerTaskExecutor的源码中我们看到了什么呢传入了一个线程工厂threadFactory每一次执行execute()方法时会调用 threadFactory.newThread(command) 返回一个线程并且调用线程的start()方法启动线程这个和线程池相似但是有区分 。 先来看一个例子线程池的使用。
ThreadPoolExecutor executor new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueueRunnable());executor.submit(new Runnable() {Overridepublic void run() {System.out.println(111111);}
});接下来看线程池是如何调用的。 上述代码中有两段代码需要注意如下 一个是w.firstTask另外一个是getTask()这两个代码段有何意义呢我们需要从线程池的原理来分析当线程池中的线程未达到核心线程数时此时向线程池中添加新任务此时线程池会创建一个新的线程而这个新的任务被存储在w.firstTask中新创建的线程会调用任务的run()方法当执行完第一个任务后如果此时有新任务加到线程池中刚刚创建的线程是不会接收新加的任务的只会调用getTask()方法从等待队列中获取任务如果没有获取任务则阻塞等待如果获取到新任务则会调用任务的run()方法这就是上述代码的大概原理 。 那使用 threadFactory.newThread(command).start();有什么特点呢threadFactory又是什么呢从上面的代码得知threadFactory来源于newDefaultThreadFactory()方法接下来看newDefaultThreadFactory()方法的内部实现。
protected ThreadFactory newDefaultThreadFactory() {return new DefaultThreadFactory(getClass());
}public DefaultThreadFactory(Class? poolType) {this(poolType, false, Thread.NORM_PRIORITY);
}public DefaultThreadFactory(Class? poolType, boolean daemon, int priority) {this(toPoolName(poolType), daemon, priority);
}public DefaultThreadFactory(String poolName, boolean daemon, int priority) {this(poolName, daemon, priority, System.getSecurityManager() null ?Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {if (poolName null) {throw new NullPointerException(poolName);}if (priority Thread.MIN_PRIORITY || priority Thread.MAX_PRIORITY) {throw new IllegalArgumentException(priority: priority (expected: Thread.MIN_PRIORITY priority Thread.MAX_PRIORITY));}prefix poolName - poolId.incrementAndGet() -;this.daemon daemon;this.priority priority;this.threadGroup threadGroup;
}其实上面代码也很简单设置是否是daemon线程线程优先级,线程组等。接下来看其newThread()方法。
public Thread newThread(Runnable r) {Thread t newThread(FastThreadLocalRunnable.wrap(r), prefix nextId.incrementAndGet());try {if (t.isDaemon() ! daemon) {t.setDaemon(daemon);}if (t.getPriority() ! priority) {t.setPriority(priority);}} catch (Exception ignored) {}return t;
}protected Thread newThread(Runnable r, String name) {return new FastThreadLocalThread(threadGroup, r, name);
}上面需要注意的是FastThreadLocalRunnable.wrap ( r )这一行代码 。
static Runnable wrap(Runnable runnable) {return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}这一行代码中如果runnable是FastThreadLocalRunnable类型的则不做包装否则创建new FastThreadLocalRunnable()包装已有的Runnable 。 那FastThreadLocalRunnable run()方法特点是什么呢请看run()方法 。
public void run() {try {runnable.run();} finally {FastThreadLocal.removeAll();}
}这里有一个FastThreadLocal.removeAll()方法这又是什么鬼在之前的 Netty源码性能分析 - ThreadLocal PK FastThreadLocal 博客中对FastThreadLocal源码做了详细分析感兴趣可以去看看这里就不再赘述。 接下来看ThreadPerTaskExecutor的结构 。
public interface Executor {void execute(Runnable command);
}static final class ThreadPerTaskExecutor implements Executor {public void execute(Runnable r) { new Thread(r).start(); }
}这里大家应该清楚了调用ThreadPerTaskExecutor的execute实际上是新启动了一个线程系统会调用r.run()方法而r又被包装成FastThreadLocalRunnable当r的run方法调用完毕会触发FastThreadLocal.removeAll()的调用这样就完美的衔接好了在FastThreadLocalRunnable使用过程中可以使用线程中的共享变量FastThreadLocal在调用完run()方法后FastThreadLocal变量及其对应的值又被清理干净了这样极大的提升了线程范围内共享变量的使用速度在调用结束后调用removeAll()方法又避免了内存泄漏 。 接下来看children new EventExecutor[nThreads];这一行代码注意EventExecutor与Executor之间的关系 。 接下来看newChild()方法 。
protected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory args.length 4 ? (EventLoopTaskQueueFactory) args[3] : null;return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}因为我的是mac系统当然SelectorProvider.provider()返回的是KQueueSelectorProvider第二个参数 默认为DefaultSelectStrategyFactory.INSTANCE而第三个参数默认为RejectedExecutionHandlers.reject() 方法返回值从NioEventLoopGroup的构造函数调用中可得知。 接下来继续NioEventLoop的构造函数调用 。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory queueFactory) {super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),rejectedExecutionHandler);if (selectorProvider null) {throw new NullPointerException(selectorProvider);}if (strategy null) {throw new NullPointerException(selectStrategy);}provider selectorProvider;final SelectorTuple selectorTuple openSelector();selector selectorTuple.selector;unwrappedSelector selectorTuple.unwrappedSelector;selectStrategy strategy;
}看到NioEventLoop构造函数就想知道NioEventLoop和其他类之间的结构关系那和其他类之间的结构关系是什么呢 我的天好复杂的样子。先有个大概的印象看到具体代码时再来分析 。 从newChild() 方法中得知NioEventLoop构造方法中parent即创建NioEventLoop的NioEventLoopGroup接下来看newTaskQueue()方法 。
protected static final int DEFAULT_MAX_PENDING_TASKS Math.max(16,SystemPropertyUtil.getInt(io.netty.eventLoop.maxPendingTasks, Integer.MAX_VALUE));private static QueueRunnable newTaskQueue(EventLoopTaskQueueFactory queueFactory) {if (queueFactory null) {// DEFAULT_MAX_PENDING_TASKS默认值为Integer.MAX_VALUEreturn newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);}return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}默认情况下EventLoopTaskQueueFactory为空因此会进入newTaskQueue0()方法。
private static QueueRunnable newTaskQueue0(int maxPendingTasks) {// This event loop never calls takeTask()return maxPendingTasks Integer.MAX_VALUE ? PlatformDependent.RunnablenewMpscQueue(): PlatformDependent.RunnablenewMpscQueue(maxPendingTasks);
}默认情况下DEFAULT_MAX_PENDING_TASKS为Integer.MAX_VALUE因此在newTaskQueue0()方法中调用的是newMpscQueue()方法 。
private static final class Mpsc {private static final boolean USE_MPSC_CHUNKED_ARRAY_QUEUE;private Mpsc() {}static {Object unsafe null;if (hasUnsafe()) {// jctools goes through its own process of initializing unsafe; of// course, this requires permissions which might not be granted to calling code, so we// must mark this block as privileged toounsafe AccessController.doPrivileged(new PrivilegedActionObject() {Overridepublic Object run() {// force JCTools to initialize unsafereturn UnsafeAccess.UNSAFE;}});}if (unsafe null) {logger.debug(org.jctools-core.MpscChunkedArrayQueue: unavailable);USE_MPSC_CHUNKED_ARRAY_QUEUE false;} else {logger.debug(org.jctools-core.MpscChunkedArrayQueue: available);USE_MPSC_CHUNKED_ARRAY_QUEUE true;}}static T QueueT newMpscQueue(final int maxCapacity) {// Calculate the max capacity which can not be bigger then MAX_ALLOWED_MPSC_CAPACITY.// This is forced by the MpscChunkedArrayQueue implementation as will try to round it// up to the next power of two and so will overflow otherwise.final int capacity max(min(maxCapacity, MAX_ALLOWED_MPSC_CAPACITY), MIN_MAX_MPSC_CAPACITY);return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscChunkedArrayQueueT(MPSC_CHUNK_SIZE, capacity): new MpscGrowableAtomicArrayQueueT(MPSC_CHUNK_SIZE, capacity);}static T QueueT newMpscQueue() {return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueueT(MPSC_CHUNK_SIZE): new MpscUnboundedAtomicArrayQueueT(MPSC_CHUNK_SIZE);}看到这里大家可能晕了这是什么意思创建一个队列都这么难吗 像MpscChunkedArrayQueue和MpscUnboundedArrayQueue两种队列的原理在 Netty源码性能分析MpscChunkedArrayQueue MpscUnboundedArrayQueue MpscArrayQueue MpscLinkedAtomicQueue 博客中做了详细的分析知道了MpscChunkedArrayQueue队列的底层存储结构是一个可扩容的数组而MpscUnboundedArrayQueue是不可扩容的数组因此才有newTaskQueue0()方法中的判断如果maxPendingTasks Integer.MAX_VALUE则创建一个固定长度大小的数组来存储队列中的元素如果 maxPendingTasks ! Integer.MAX_VALUE则创建一个可扩容数组来存储队列中的元素他们的底层都是基于数组实现。 再来理解newMpscQueue()方法中的两行代码。
private static final int MPSC_CHUNK_SIZE 1024;
private static final int MIN_MAX_MPSC_CAPACITY MPSC_CHUNK_SIZE * 2;private static final int MAX_ALLOWED_MPSC_CAPACITY Pow2.MAX_POW2;final int capacity max(min(maxCapacity, MAX_ALLOWED_MPSC_CAPACITY), MIN_MAX_MPSC_CAPACITY);
return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscChunkedArrayQueueT(MPSC_CHUNK_SIZE, capacity): new MpscGrowableAtomicArrayQueueT(MPSC_CHUNK_SIZE, capacity);首先USE_MPSC_CHUNKED_ARRAY_QUEUE默认值为true后面再来分析先看capacity的值首先确定MAX_ALLOWED_MPSC_CAPACITY的值从JCtools源码中可以看出它的值为1 30 。 再确定MIN_MAX_MPSC_CAPACITY的值为2048因此capacity max(min(maxCapacity, 2^30 ), 2048)如果传入的值大于2048并且小于2^30则取传入的值如果小于2048则取2048。这一行代码 new MpscChunkedArrayQueueT(MPSC_CHUNK_SIZE, capacity)就好理解了我们从之前的博客中知道MpscChunkedArrayQueue的数组结构如下。 因此MpscChunkedArrayQueueT(1024, capacity) 会创建一个单个数组容量大小为1024且队列中最大容量为capacity的队列例如从上图中得知buffer的数组长度为1024而buffer,next … 所有数组中最大可存储元素个数为capacity也就是说在队列中元素没有被消费的情况下最多可加入队列元素个数为capacity。 我们再来分析USE_MPSC_CHUNKED_ARRAY_QUEUE的值从Mpsc的源码中可以看出如果hasUnsafe()方法返回值为false则USE_MPSC_CHUNKED_ARRAY_QUEUE为false如果为hasUnsafe()方法返回值为true则USE_MPSC_CHUNKED_ARRAY_QUEUE为true为什么这么做呢 从之前的博客中分析得出MpscGrowableAtomicArrayQueue相对于MpscChunkedArrayQueue而言新数组创建时每一次数组的长度 原数组容量2倍 -1 而MpscChunkedArrayQueue和原数组大小一样还有就是存储元素的数组不一样 MpscChunkedArrayQueue用普通数组存储而MpscGrowableAtomicArrayQueue用AtomicReferenceArray数组什么原因导致使用不特性的队列呢进入hasUnsafe()方法 。
private static final Throwable UNSAFE_UNAVAILABILITY_CAUSE unsafeUnavailabilityCause0();public static boolean hasUnsafe() {return UNSAFE_UNAVAILABILITY_CAUSE null;
}private static Throwable unsafeUnavailabilityCause0() {// 如果是dalvik虚拟机或 aliyun-vm则不支持if (isAndroid()) {logger.debug(sun.misc.Unsafe: unavailable (Android));return new UnsupportedOperationException(sun.misc.Unsafe: unavailable (Android));// IKVM 也不支持 sun.misc.Unsafe: unavailable if (isIkvmDotNet()) {logger.debug(sun.misc.Unsafe: unavailable (IKVM.NET));return new UnsupportedOperationException(sun.misc.Unsafe: unavailable (IKVM.NET));}Throwable cause PlatformDependent0.getUnsafeUnavailabilityCause();if (cause ! null) {return cause;}try {boolean hasUnsafe PlatformDependent0.hasUnsafe();logger.debug(sun.misc.Unsafe: {}, hasUnsafe ? available : unavailable);return hasUnsafe ? null : PlatformDependent0.getUnsafeUnavailabilityCause();} catch (Throwable t) {logger.trace(Could not determine if Unsafe is available, t);// Probably failed to initialize PlatformDependent0.return new UnsupportedOperationException(Could not determine if Unsafe is available, t);}
}public static Throwable getUnsafeUnavailabilityCause() {return UNSAFE_UNAVAILABILITY_CAUSE;
}static boolean hasUnsafe() {return UNSAFE ! null;
}从上述代码中如果发现UNSAFE_UNAVAILABILITY_CAUSE !null 或 Unsafe初始化为空则证明使用Unsafe时会有异常因此hasUnsafe()将返回false,接下来看什么情况下UNSAFE_UNAVAILABILITY_CAUSE不为空也就是什么情况下初始化或使用Unsafe会有异常。
final class PlatformDependent0 {private static final InternalLogger logger InternalLoggerFactory.getInstance(PlatformDependent0.class);private static final long ADDRESS_FIELD_OFFSET;private static final long BYTE_ARRAY_BASE_OFFSET;private static final Constructor? DIRECT_BUFFER_CONSTRUCTOR;private static final Throwable EXPLICIT_NO_UNSAFE_CAUSE explicitNoUnsafeCause0();private static final Method ALLOCATE_ARRAY_METHOD;private static final int JAVA_VERSION javaVersion0();private static final boolean IS_ANDROID isAndroid0();private static final Throwable UNSAFE_UNAVAILABILITY_CAUSE;private static final Object INTERNAL_UNSAFE;private static final boolean IS_EXPLICIT_TRY_REFLECTION_SET_ACCESSIBLE explicitTryReflectionSetAccessible0();static final Unsafe UNSAFE;// constants borrowed from murmur3static final int HASH_CODE_ASCII_SEED 0xc2b2ae35;static final int HASH_CODE_C1 0xcc9e2d51;static final int HASH_CODE_C2 0x1b873593;/*** Limits the number of bytes to copy per {link Unsafe#copyMemory(long, long, long)} to allow safepoint polling* during a large copy.*/private static final long UNSAFE_COPY_THRESHOLD 1024L * 1024L;private static final boolean UNALIGNED;static {final ByteBuffer direct;Field addressField null;Method allocateArrayMethod null;Throwable unsafeUnavailabilityCause null;Unsafe unsafe;Object internalUnsafe null;if ((unsafeUnavailabilityCause EXPLICIT_NO_UNSAFE_CAUSE) ! null) {direct null;addressField null;unsafe null;internalUnsafe null;} else {direct ByteBuffer.allocateDirect(1);// attempt to access field Unsafe#theUnsafefinal Object maybeUnsafe AccessController.doPrivileged(new PrivilegedActionObject() {Overridepublic Object run() {try {final Field unsafeField Unsafe.class.getDeclaredField(theUnsafe);// We always want to try using Unsafe as the access still works on java9 as well and// we need it for out native-transports and many optimizations.Throwable cause ReflectionUtil.trySetAccessible(unsafeField, false);if (cause ! null) {return cause;}// the unsafe instancereturn unsafeField.get(null);} catch (NoSuchFieldException e) {return e;} catch (SecurityException e) {return e;} catch (IllegalAccessException e) {return e;} catch (NoClassDefFoundError e) {// Also catch NoClassDefFoundError in case someone uses for example OSGI and it made// Unsafe unloadable.return e;}}});// the conditional check here can not be replaced with checking that maybeUnsafe// is an instanceof Unsafe and reversing the if and else blocks; this is because an// instanceof check against Unsafe will trigger a class load and we might not have// the runtime permission accessClassInPackage.sun.miscif (maybeUnsafe instanceof Throwable) {unsafe null;unsafeUnavailabilityCause (Throwable) maybeUnsafe;logger.debug(sun.misc.Unsafe.theUnsafe: unavailable, (Throwable) maybeUnsafe);} else {unsafe (Unsafe) maybeUnsafe;logger.debug(sun.misc.Unsafe.theUnsafe: available);}// ensure the unsafe supports all necessary methods to work around the mistake in the latest OpenJDK// https://github.com/netty/netty/issues/1061// http://www.mail-archive.com/jdk6-devopenjdk.java.net/msg00698.htmlif (unsafe ! null) {final Unsafe finalUnsafe unsafe;final Object maybeException AccessController.doPrivileged(new PrivilegedActionObject() {Overridepublic Object run() {try {finalUnsafe.getClass().getDeclaredMethod(copyMemory, Object.class, long.class, Object.class, long.class, long.class);return null;} catch (NoSuchMethodException e) {return e;} catch (SecurityException e) {return e;}}});if (maybeException null) {logger.debug(sun.misc.Unsafe.copyMemory: available);} else {// Unsafe.copyMemory(Object, long, Object, long, long) unavailable.unsafe null;unsafeUnavailabilityCause (Throwable) maybeException;logger.debug(sun.misc.Unsafe.copyMemory: unavailable, (Throwable) maybeException);}}if (unsafe ! null) {final Unsafe finalUnsafe unsafe;// attempt to access field Buffer#addressfinal Object maybeAddressField AccessController.doPrivileged(new PrivilegedActionObject() {Overridepublic Object run() {try {final Field field Buffer.class.getDeclaredField(address);// Use Unsafe to read value of the address field. This way it will not fail on JDK9 which// will forbid changing the access level via reflection.final long offset finalUnsafe.objectFieldOffset(field);final long address finalUnsafe.getLong(direct, offset);// if direct really is a direct buffer, address will be non-zeroif (address 0) {return null;}return field;} catch (NoSuchFieldException e) {return e;} catch (SecurityException e) {return e;}}});if (maybeAddressField instanceof Field) {addressField (Field) maybeAddressField;logger.debug(java.nio.Buffer.address: available);} else {unsafeUnavailabilityCause (Throwable) maybeAddressField;logger.debug(java.nio.Buffer.address: unavailable, (Throwable) maybeAddressField);// If we cannot access the address of a direct buffer, theres no point of using unsafe.// Lets just pretend unsafe is unavailable for overall simplicity.unsafe null;}}if (unsafe ! null) {// There are assumptions made where ever BYTE_ARRAY_BASE_OFFSET is used (equals, hashCodeAscii, and// primitive accessors) that arrayIndexScale 1, and results are undefined if this is not the case.long byteArrayIndexScale unsafe.arrayIndexScale(byte[].class);if (byteArrayIndexScale ! 1) {logger.debug(unsafe.arrayIndexScale is {} (expected: 1). Not using unsafe., byteArrayIndexScale);unsafeUnavailabilityCause new UnsupportedOperationException(Unexpected unsafe.arrayIndexScale);unsafe null;}}}UNSAFE_UNAVAILABILITY_CAUSE unsafeUnavailabilityCause;...
}public static Throwable trySetAccessible(AccessibleObject object, boolean checkAccessible) {if (checkAccessible !PlatformDependent0.isExplicitTryReflectionSetAccessible()) {return new UnsupportedOperationException(Reflective setAccessible(true) disabled);}try {object.setAccessible(true);return null;} catch (SecurityException e) {return e;} catch (RuntimeException e) {return handleInaccessibleObjectException(e);}
}上面4行加粗代码。
object.setAccessible(true); 调用
final Field unsafeField Unsafe.class.getDeclaredField(theUnsafe);
Throwable cause ReflectionUtil.trySetAccessible(unsafeField, false);因为获取Unsafe对象时一般常情况下都是通过反射来获取的如下
private static Unsafe reflectGetUnsafe() {try {Field field Unsafe.class.getDeclaredField(theUnsafe);field.setAccessible(true);return (Unsafe) field.get(null);} catch (Exception e) {e.printStackTrace();return null;}
}如果field.setAccessible(true)失败则证明Unsafe实例获取失败不能使用Unsafe来进行CAS操作。
复制内存方法copyMemory()调用
finalUnsafe.getClass().getDeclaredMethod(copyMemory, Object.class, long.class, Object.class, long.class, long.class);在后续中对于堆外内存肯定会用到copyMemory()方法如果Unsafe实例中没有copyMemory()方法肯定证明Unsafe不可用。
ByteBuffer的address字段获取
final Field field Buffer.class.getDeclaredField(address);
final long offset finalUnsafe.objectFieldOffset(field);
final long address finalUnsafe.getLong(direct, offset);使用Unsafe读取地址字段的值。这样它不会在JDK9 上失败JDK9 将禁止通过反射更改访问级别。
Unsafe类中有很多以BASE_OFFSET结尾的常量比如ARRAY_INT_BASE_OFFSETARRAY_BYTE_BASE_OFFSET等这些常量值是通过arrayBaseOffset方法得到的。arrayBaseOffset方法是一个本地方法可以获取数组第一个元素的偏移地址。Unsafe类中还有很多以INDEX_SCALE结尾的常量比如 ARRAY_INT_INDEX_SCALE ARRAY_BYTE_INDEX_SCALE等这些常量值是通过arrayIndexScale方法得到的。arrayIndexScale方法也是一个本地方法可以获取数组的转换因子也就是数组中元素的增量地址。将arrayBaseOffset与arrayIndexScale配合使用可以定位数组中每个元素在内存中的位置。 下面这行代码 long byteArrayIndexScale unsafe.arrayIndexScale(byte[].class);
public static void main(String[] args) {Unsafe unsafe reflectGetUnsafe();long byteArrayIndexScale unsafe.arrayIndexScale(byte[].class);System.out.println(byteArrayIndexScale);long intArrayIndexScale unsafe.arrayIndexScale(int[].class);System.out.println(intArrayIndexScale);long longArrayIndexScale unsafe.arrayIndexScale(long[].class);System.out.println(longArrayIndexScale);
}private static Unsafe reflectGetUnsafe() {try {Field field Unsafe.class.getDeclaredField(theUnsafe);field.setAccessible(true);return (Unsafe) field.get(null);} catch (Exception e) {e.printStackTrace();return null;}
}在这里主要是验证arrayIndexScale()获取byte数组的增量地址对不对如果不对则不能使用Unsafe。 看了这么多那应该就明白newMpscQueue()方法代码的含义。带 Atomic 的类是表示在 Netty 无法使用 Unsafe 的情况下使用 Atomic 原子类来做替代方案。为什么呢相对于MpscChunkedArrayQueue而言MpscGrowableAtomicArrayQueue使用了AtomicReferenceArray来存取队列中的元素而MpscChunkedArrayQueue使用的是普通数组存取队列中元素即使在不能使用Unsafe情况下也能通过AtomicReferenceArray 保证对数组元素的操作的原子性。 那又为什么AtomicReferenceArray可以使用CAS操作数组中的元素在Netty 程序中不能使用Unsafe来保证原子性呢 在JDK 5之后Java类库中才开始使用CAS操作该操作由sun.misc.Unsafe类里面的 compareAndSwapInt()和compareAndSwapLong()等几个方法包装提供。HotSpot虚拟机在内部对这些方法做了特殊处理即时编译出来的结果就是一条平台相关的处理器CAS指令没有方法调用的过程 或者可以认为是无条件内联进去了。不过由于Unsafe类在设计上就不是提供给用户程序调用的类Unsafe::getUnsafe()的代码中限制了只有启动类加载器Bootstrap ClassLoader加载的Class才能访问它因此在JDK 9之前只有Java类库可以使用CAS譬如J.U.C包里面的整数原子类其中的 compareAndSet()和getAndIncrement()等方法都使用了Unsafe类的CAS操作来实现。而如果用户程序也有用CAS操作的需求那要么就采用反射手段突破Unsafe的访问限制要么就只能通过Java类库API来间接使用它。直到JDK 9之后Java类库才在VarHandle类里开放了面向用户程序使用的CAS操作。 接下来看openSelector()方法。
private SelectorTuple openSelector() {final Selector unwrappedSelector;try {unwrappedSelector provider.openSelector();} catch (IOException e) {throw new ChannelException(failed to open a new selector, e);}if (DISABLE_KEY_SET_OPTIMIZATION) {return new SelectorTuple(unwrappedSelector);}Object maybeSelectorImplClass AccessController.doPrivileged(new PrivilegedActionObject() {Overridepublic Object run() {try {return Class.forName(sun.nio.ch.SelectorImpl,false,PlatformDependent.getSystemClassLoader());} catch (Throwable cause) {return cause;}}});// 反射创建sun.nio.ch.SelectorImpl是否抛出异常// 创建出来的maybeSelectorImplClass类是否是Selector的子类if (!(maybeSelectorImplClass instanceof Class) ||// ensure the current selector implementation is what we can instrument.!((Class?) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {if (maybeSelectorImplClass instanceof Throwable) {Throwable t (Throwable) maybeSelectorImplClass;logger.trace(failed to instrument a special java.util.Set into: {}, unwrappedSelector, t);}return new SelectorTuple(unwrappedSelector);}final Class? selectorImplClass (Class?) maybeSelectorImplClass;final SelectedSelectionKeySet selectedKeySet new SelectedSelectionKeySet();Object maybeException AccessController.doPrivileged(new PrivilegedActionObject() {Overridepublic Object run() {try {Field selectedKeysField selectorImplClass.getDeclaredField(selectedKeys);Field publicSelectedKeysField selectorImplClass.getDeclaredField(publicSelectedKeys);// 如果是JDK 9 以上并且在JDK外允许使用 Unsafe // 则使用CAS 设置SelectorImpl的selectedKeys和publicSelectedKeys属性 if (PlatformDependent.javaVersion() 9 PlatformDependent.hasUnsafe()) {// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.// This allows us to also do this in Java9 without any extra flags.long selectedKeysFieldOffset PlatformDependent.objectFieldOffset(selectedKeysField);long publicSelectedKeysFieldOffset PlatformDependent.objectFieldOffset(publicSelectedKeysField);if (selectedKeysFieldOffset ! -1 publicSelectedKeysFieldOffset ! -1) {PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);return null;}// We could not retrieve the offset, lets try reflection as last-resort.}// 如果不是JDK 9 以上或在JDK外不允许使用 Unsafe // 则使用反射设置SelectorImpl的selectedKeys和publicSelectedKeys属性 Throwable cause ReflectionUtil.trySetAccessible(selectedKeysField, true);if (cause ! null) {return cause;}cause ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);if (cause ! null) {return cause;}selectedKeysField.set(unwrappedSelector, selectedKeySet);publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);return null;} catch (NoSuchFieldException e) {return e;} catch (IllegalAccessException e) {return e;}}});if (maybeException instanceof Exception) {selectedKeys null;Exception e (Exception) maybeException;logger.trace(failed to instrument a special java.util.Set into: {}, unwrappedSelector, e);return new SelectorTuple(unwrappedSelector);}selectedKeys selectedKeySet;logger.trace(instrumented a special java.util.Set into: {}, unwrappedSelector);return new SelectorTuple(unwrappedSelector,new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}private static final class SelectorTuple {final Selector unwrappedSelector;final Selector selector;SelectorTuple(Selector unwrappedSelector, Selector selector) {this.unwrappedSelector unwrappedSelector;this.selector selector;}
}可能大家觉得上面代码平平无奇但还是有一点值得我们注意 加是加粗那行代码。 unwrappedSelector provider.openSelector(); 这行代码有什么特别的地方吗 不就是Selector selector Selector.open();这一行代码实现吗当然接着就是为SelectorImpl的publicKeys和publicSelectedKeys这两个属性值赋值JDK9 以上版本并且能够获取到Unsafe则直接CAS操作publicKeyspublicSelectedKeys两个属性否则通过反射为SelectorImpl的publicKeys和publicSelectedKeys属性赋值。 接下来看包装类SelectedSelectionKeySetSelector。先来看其类关系结构 。
final class SelectedSelectionKeySetSelector extends Selector {private final SelectedSelectionKeySet selectionKeys;private final Selector delegate;SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {this.delegate delegate;this.selectionKeys selectionKeys;}Overridepublic boolean isOpen() {return delegate.isOpen();}Overridepublic SelectorProvider provider() {return delegate.provider();}Overridepublic SetSelectionKey keys() {return delegate.keys();}Overridepublic SetSelectionKey selectedKeys() {return delegate.selectedKeys();}Overridepublic int selectNow() throws IOException {selectionKeys.reset();return delegate.selectNow();}Overridepublic int select(long timeout) throws IOException {selectionKeys.reset();return delegate.select(timeout);}Overridepublic int select() throws IOException {selectionKeys.reset();return delegate.select();}Overridepublic Selector wakeup() {return delegate.wakeup();}Overridepublic void close() throws IOException {delegate.close();}
}在SelectedSelectionKeySetSelector类中delegate就是未包装的Selector也就是unwrappedSelector provider.openSelector();的返回值。这一点需要注意 。 接下来看MultithreadEventExecutorGroup的这几行代码分析 。 关于上述几行代码 在另一篇博客 Netty 之 DefaultPromise 源码解析 做了具体的分析有兴趣可以去看看。 接下来看分析ServerBootstrap的源码分析。
ServerBootstrap 接下来分析下面红框中的代码 。 首先看ServerBootstrap的构造方法。
public ServerBootstrap() { }发现什么事情都没有做。 接下来看group代码。
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);ObjectUtil.checkNotNull(childGroup, childGroup);if (this.childGroup ! null) {throw new IllegalStateException(childGroup set already);}this.childGroup childGroup;return this;
}public B group(EventLoopGroup group) {ObjectUtil.checkNotNull(group, group);if (this.group ! null) {throw new IllegalStateException(group set already);}this.group group;return self();
}private B self() {return (B) this;
}group()方法调用只是将bossGroup和workerGroup分别设置到ServerBootstrap的group和childGroup属性中但这里的级联调用还是很有意思的。
public class MySelf {public MySelf group(){return this;}public MySelf option(){return this;}public MySelf childOption(){return this;}public static void main(String[] args) {MySelf self new MySelf();self.group().option().childOption();}}其实有个时候在想如果Java中所有的setXXX()方法都这样写在设置对象时是不是更加方便一些呢 当然这是题外话接着看.channel()方法的实现。
public B channel(Class? extends C channelClass) {return channelFactory(new ReflectiveChannelFactoryC(ObjectUtil.checkNotNull(channelClass, channelClass)));
}channel()方法中传入的是NioServerSocketChannel.class, 在这个方法中其实就是初始化了一个ReflectiveChannelFactory的对象 channel()方法中用到了ReflectiveChannelFactory对象看ReflectiveChannelFactory实现。
public class ReflectiveChannelFactoryT extends Channel implements ChannelFactoryT {private final Constructor? extends T constructor;public ReflectiveChannelFactory(Class? extends T clazz) {ObjectUtil.checkNotNull(clazz, clazz);try {// 获取无参的构造函数this.constructor clazz.getConstructor();} catch (NoSuchMethodException e) {throw new IllegalArgumentException(Class StringUtil.simpleClassName(clazz) does not have a public non-arg constructor, e);}}Override// 泛型T 代表不同的Channelpublic T newChannel() {try {// 使用反射技术创建channelreturn constructor.newInstance();} catch (Throwable t) {throw new ChannelException(Unable to create Channel from class constructor.getDeclaringClass(), t);}}
}根据上面的代码提示我们可以得出以下的结论
Bootstrap中ChannelFactory实现类是ReflectChannelFactory .通过channel()方法创建的Channel具体类型是NioSocketChannel。 此时我们知道constructor即为NioServerSocketChannel的构造器,而newChannel()调用则会创建NioServerSocketChannel对象 。
public B channelFactory(io.netty.channel.ChannelFactory? extends C channelFactory) {return channelFactory((ChannelFactoryC) channelFactory);
}public B channelFactory(ChannelFactory? extends C channelFactory) {ObjectUtil.checkNotNull(channelFactory, channelFactory);if (this.channelFactory ! null) {throw new IllegalStateException(channelFactory set already);}this.channelFactory channelFactory;return self();
}channel()方法调用实际上是创建了一个ReflectiveChannelFactory并赋值给ServerBootstrap的channelFactory属性中将来调用channelFactory的newChannel()方法将反射创建NioServerSocketChannel对象 。 接下来看option方法调用 。
private final MapChannelOption?, Object options new LinkedHashMapChannelOption?, Object();public T B option(ChannelOptionT option, T value) {ObjectUtil.checkNotNull(option, option);if (value null) {synchronized (options) {options.remove(option);}} else {synchronized (options) {options.put(option, value);}}return self();
}从上述代码中可以看出ServerBootstrap的options属性是一个LinkedHashMap对象如果value为空则直接将option从options中移除即可。当然ChannelOption是有很多的属性的这些属性是什么含义呢
public class ChannelOptionT extends AbstractConstantChannelOptionT {// Netty全局参数 ByteBuf 的分配器默认值为ByteBufAlloocator.DEFAULT,4.0版本为UnpooledByteBufAllocator。// 4.1 版本PooledByteBufAllocator分别对应的字符串值为unpooled 和 pooledpublic static final ChannelOptionByteBufAllocator ALLOCATOR valueOf(ALLOCATOR);// Netty 全局参数用于Channel分配接收Buffer的分配器默认值为AdaptiveRecvByteBufAllocator.DEFAULT是一个自适应的接收缓冲区分配器。// 能根据接收的数据自动调节大小可选值为FixedRecvByteBufAllocator// 固定大小的接收缓冲区分配器public static final ChannelOptionRecvByteBufAllocator RCVBUF_ALLOCATOR valueOf(RCVBUF_ALLOCATOR);// Netty全局参数消息大小估算器默认值为DefaultMessageSizeEstimator.DEFAULT。 估算ByteBuf,ByteBuffHolder和// 和FileRegion的大小其中ByteBuf 和ByteBufHolder为实际大小FileRegion// 估算值为0该值估算的字节数在计算水位时使用FileRegion为0// 可知FileRegion不影响高低水位public static final ChannelOptionMessageSizeEstimator MESSAGE_SIZE_ESTIMATOR valueOf(MESSAGE_SIZE_ESTIMATOR);// Netty全局参数连接超时毫秒数默认值为3000ms 即30spublic static final ChannelOptionInteger CONNECT_TIMEOUT_MILLIS valueOf(CONNECT_TIMEOUT_MILLIS);public static final ChannelOptionInteger MAX_MESSAGES_PER_READ valueOf(MAX_MESSAGES_PER_READ);public static final ChannelOptionInteger WRITE_SPIN_COUNT valueOf(WRITE_SPIN_COUNT);public static final ChannelOptionInteger WRITE_BUFFER_HIGH_WATER_MARK valueOf(WRITE_BUFFER_HIGH_WATER_MARK);public static final ChannelOptionInteger WRITE_BUFFER_LOW_WATER_MARK valueOf(WRITE_BUFFER_LOW_WATER_MARK);// Netty全局参数设置某个连接上可以暂存的最大最小Buffer,若连接等待发送的数据量大于设置的值则isWritable()会返回不可写// 这样客户端可以不再发送防止这个量不断的积压 最终可能让客户端挂掉public static final ChannelOptionWriteBufferWaterMark WRITE_BUFFER_WATER_MARK valueOf(WRITE_BUFFER_WATER_MARK);// Netty全局参数一个连接远端关闭时本地端是否关闭默认值为false值为false时连接自动关闭public static final ChannelOptionBoolean ALLOW_HALF_CLOSURE valueOf(ALLOW_HALF_CLOSURE);public static final ChannelOptionBoolean AUTO_READ valueOf(AUTO_READ);// Netty 全局参数自动读取默认值为true, Netty 只有在必要 的时候才设置关心相应的IO事件对于读操作需要// 调用channel.read()设置关心的I/O事件为OP_READ, 这样若有数据到达时才能读取以供用户处理public static final ChannelOptionBoolean AUTO_CLOSE valueOf(AUTO_CLOSE);public static final ChannelOptionBoolean SO_BROADCAST valueOf(SO_BROADCAST);public static final ChannelOptionBoolean SO_KEEPALIVE valueOf(SO_KEEPALIVE);// Socket 参数用于设置接收数据的等待超时时间单位为ms,默认值为0 表示无限等待public static final ChannelOptionInteger SO_SNDBUF valueOf(SO_SNDBUF);// Socket参数TCP 数据接收缓冲区的大小缓冲区即TCP 接收滑动窗口Linux 操作系统可以使用命令// cat /proc/sys/net/ipv4/tcp_rmem 查询大小一般情况下 该值可由用户 任意时刻设置但当设置值超过64KB// 时需要在连接到远端之前设置public static final ChannelOptionInteger SO_RCVBUF valueOf(SO_RCVBUF);// Socket 参数地址复用默认值为false,有4种情况可以使用1当有一个有相同的本地地址和端口的Socket1处于TIME_WAIT状态时// 你希望启动的程序Socket2 要占用该地址和端口比如重启服务且保持先前的端口有多块网卡或用IP Alias技术的机器在同一端启动多个// 进程,但每个进程 绑定的本地IP地址可能不同 单个进程绑定的相同的端口有多个Socket 上但每个Socket绑定的IP地址可能不同 。// 4 完全相同的越来越和端口重新绑定但这里只用于UDP的多皤不用于TCP.public static final ChannelOptionBoolean SO_REUSEADDR valueOf(SO_REUSEADDR);// Socket参数关闭Socket的延迟时间默认值为-1 表示禁用该功能-1 表示socket.close()方法立即返回。但操作系统底层会将发送缓冲区// 的数据全部 发送到对端0表示socket.close()方法立即返回操作系统放弃发送缓冲区的数据直接向对端发送RST包 对端收到复位// 错误非0整数值表示调用socket.close()方法的线程被阻塞直到延迟时间到或缓冲区的数据发送完毕若超时则对端会收到复位错误。public static final ChannelOptionInteger SO_LINGER valueOf(SO_LINGER);public static final ChannelOptionInteger SO_BACKLOG valueOf(SO_BACKLOG);public static final ChannelOptionInteger SO_TIMEOUT valueOf(SO_TIMEOUT);public static final ChannelOptionInteger IP_TOS valueOf(IP_TOS);// IP 参数 对应的IP参数IP_MULTICAST_IF 设置对应的地址为网卡多播模式public static final ChannelOptionInetAddress IP_MULTICAST_ADDR valueOf(IP_MULTICAST_ADDR);// IP 参数对应的IP参数IP_MULTICAST2 , 同上支持IP6public static final ChannelOptionNetworkInterface IP_MULTICAST_IF valueOf(IP_MULTICAST_IF);// IP 参数 多播数据报Time-to-Live即存活跳数public static final ChannelOptionInteger IP_MULTICAST_TTL valueOf(IP_MULTICAST_TTL);// IP 参数对应的IP参数IP_MULTICAST_LOOP设置本地回环接口的多播功能// 由于IP_MULTICAST_LOOP返回true表示关闭所以Netty 加后缀_DISABLED防止歧义public static final ChannelOptionBoolean IP_MULTICAST_LOOP_DISABLED valueOf(IP_MULTICAST_LOOP_DISABLED);// TCP 参数表示立即发送数据默认值为true Netty 默认值为true 而操作系统默认值为false该值设置Nagle算法的启动public static final ChannelOptionBoolean TCP_NODELAY valueOf(TCP_NODELAY);public static final ChannelOptionBoolean DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION valueOf(DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION);// 单线程执行ChannelPipeline中的事件默认值为true,该值控制执行ChannelPipeline中执行ChannelHandler的线程。// 如果为true, 整个pipeline由一个线程执行这样不需要进行线程切换以及线程同步是Netty 4 的推荐做法如为false// channelHandler 中处理过程会由Group中不同的线程执行public static final ChannelOptionBoolean SINGLE_EVENTEXECUTOR_PER_GROUP valueOf(SINGLE_EVENTEXECUTOR_PER_GROUP);private ChannelOption(int id, String name) {super(id, name);}protected ChannelOption(String name) {this(pool.nextId(), name);}public void validate(T value) {if (value null) {throw new NullPointerException(value);}}
}接下来看childHandler()方法调用 。 实际上也只是设置了ServerBootstrap的childHandler的属性值。
public ServerBootstrap childHandler(ChannelHandler childHandler) {this.childHandler ObjectUtil.checkNotNull(childHandler, childHandler);return this;
}这些属性什么时候使用呢 在后面的代码再来分析接下来看ChannelFuture cf bootstrap.bind(9000).sync();这一行代码 。
public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));
}public ChannelFuture bind(SocketAddress localAddress) {// 较验group和channelFactory是否为空如果为空则抛出异常validate();return doBind(ObjectUtil.checkNotNull(localAddress, localAddress));
}public B validate() {if (group null) {throw new IllegalStateException(group not set);}if (channelFactory null) {throw new IllegalStateException(channel or channelFactory not set);}return self();
}bind()方法实现很简单用端口构建出SocketAddress对象在真正调用doBind()方法之前较验group和channelFactory是否存在如果不存在 则抛出异常。 接下来看doBind()方法的实现。
// AbstractBoostrap 与 ServerBootstrap 初 始 化 Channel 并 注 册 到 NioEventLoop线程上以及端口绑定的核心源码解读如下:
private ChannelFuture doBind(final SocketAddress localAddress) {// 初始化Channel 并注册到NioEventLoop线程上final ChannelFuture regFuture initAndRegister();final Channel channel regFuture.channel();// 判断是否存在异常if (regFuture.cause() ! null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.// 将注册成功后需要绑定的端口由NioEventLoop线程去异步执行此时需要创建ChannelPromise对象ChannelPromise promise channel.newPromise();// 最终调用AbstractChannel的bind()方法doBind0(regFuture, channel, localAddress, promise);return promise;} else {// Registration future is almost always fulfilled already, but just in case its not.// 由于注册操作由NioEventLoop线程去异步执行因此可能会执行不完此时需要返回 PendingRegistrationPromise 对象及时将结果交互给主线程final PendingRegistrationPromise promise new PendingRegistrationPromise(channel);// 加上注册监听器注册动作完成后触发regFuture.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause future.cause();if (cause ! null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.// 注册失败处理响应给主线程promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();// 只有成功后才能绑定doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}先来看initAndRegister()方法 。
final ChannelFuture initAndRegister() {Channel channel null;try {// 根据serverBootstrap.channel(NioServerSocketChannel.class)反射创建NioServerSocketChannel对象channel channelFactory.newChannel();// 初始化NioServerSocketChannel , 设置channel的参数为Worker线程管理SocketChannel准备好参数及其Handlert处理链init(channel);} catch (Throwable t) {// 初始化处理失败 创建DefaultChannelPromise实例设置异常并返回if (channel ! null) {// channel can be null if newChannel crashed (eg SocketException(too many open files))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 调用SingleThreadEventLoop 的register()方法 最终触发AbstractUnsafe的register// 这里的group()指的就是bossgroupChannelFuture regFuture config().group().register(channel);// 注册异常处理if (regFuture.cause() ! null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, its one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. Its safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loops task queue for later execution.// i.e. Its safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread./*** 这段注释比较长主要讲述Channel 注册成功后的一些操作bind或connect 操作需要register完成后执行此涉及线程切换因为ServerBootStrap* 运行在主线程上而register,bind,connect需要在NioEventLoop 线程上执行注释翻译如下* 如果程序到这里则说明promise没有失败可能发生以下情况* 1. 如果尝试将Channel注册到EventLoop上则此时注册已经完成inEventLoop返回true,Channel已经注册成功可以安全调用bind()或connect()方法* 2. 如果尝试注册到另一个线程上即inEventLoop返回false, 则此时register语法已经完成添加到事件循环的任务队列中现在同样可以尝试调用* bind() 或connect() 因为register()bind() 和connect() 都被绑定在同一个I/O 线程上所以在执行完register Task后bind()* 或connect()才会被执行*/return regFuture;
}public final EventLoopGroup group() {return bootstrap.group();
}Channel的实例化过程其实就是调用ChannelFactory的newChannel()方法而实例化Channel具体类型又和初始化Bootstrap时传入的channel()方法的参数有关因此对于服务端的Bootstrap而言创建Channel实例上是创建的NioServerSocketChannel。接下来进入NioServerSocketChannel的构造函数看他做了哪些事情 。
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER SelectorProvider.provider();public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}这里的代码比较关键我们可以看到在这个构造器中首先会调用newSocket()方法来打开一个新的Java NIO 的SocketChannel。
private static ServerSocketChannel newSocket(SelectorProvider provider) {try {/*** Use the {link SelectorProvider} to open {link SocketChannel} and so remove condition in* {link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.** See a hrefhttps://github.com/netty/netty/issues/2308#2308/a.*/return provider.openServerSocketChannel();} catch (IOException e) {throw new ChannelException(Failed to open a server socket., e);}
}public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);config new NioServerSocketChannelConfig(this, javaChannel().socket());
}类名解释NioSocketChannel异步非阻塞的客户端TCPSocket连接NioServerSocketChannel异步非阻塞的服务端TCP Socket连接NioDatagramChannel异步非阻塞的UDP连接NioSctpChannel异步的客户端SCTP (Stream Control Transmission Protocol ,流程控制传输协议)连接NIOSctpServerChannel异步是SCTP 服务端连接OioSocketChannel同步阻塞的客户端TCP Socket连接OioServerSocketChannel同步阻塞的服务端TCP Socket 连接OioDatagramChannel同步阻塞的UDP连接OioSctpChannel同步的SCTP服务端连接OioSctpServerChannel同步的客户端TCP Socket连接 在NioServerSocketChannel构造函数执行过程中 通过SelectorProvider.provider.openServerSocketChannel()获取到ServerSocketChannel。大家有没有发现和Selector获取方式很像看Selector的获取方式。 都是通过SelectorProvider.provider获取到的。 突然发现这不就是ServerSocketChannel的创建嘛。 实际上返回的是一个ServerSocketChannelImpl对象。进入NioServerSocketChannel的父类方法 。
public abstract class AbstractNioChannel extends AbstractChannel {// AbstractNioChannel 也是一个抽象类 不过它在AbstractChannel的基础上增加了一些属性和方法AbstractChannel没有涉及到Nio的// 任何属性和具体的方法包括AbstractUnsafe,AbstractNioChannel有以下3个重要的属性// 真正用到了NIO channel, SelectableChannel是java.nio.SocketChannel和java.nio.ServerSocketChannel公共的抽象类private final SelectableChannel ch;// 监听感兴趣的事件 readInterestOp用于区分当前Channel监听的事件类型protected final int readInterestOp;// 注册到Selector 后获取key selectionKey 它是将SelectableChannel注册到Selector后的返回值这些属性的定义可以看出// 在AbstractNioChannel中已经将Netty的Channel 和java NIO的Channel 关联起来。volatile SelectionKey selectionKey;protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch ch;this.readInterestOp readInterestOp;try {// 设置ServerSocketChannel为非阻塞ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {if (logger.isWarnEnabled()) {logger.warn(Failed to close a partially initialized socket., e2);}}throw new ChannelException(Failed to enter non-blocking mode., e);}} 接下会调用父类的AbstractChannel的构造函数并传入实际参数readInterestOpSelectionKey.OP_READ。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {private static final InternalLogger logger InternalLoggerFactory.getInstance(AbstractChannel.class);// AbstractChannel内部有一个parent属性 表示通道的父通道对于连接监听通道如NioServerSocketChannel 实例来说其父亲通道的null// 而对于每一条传输通道(如NioSocketChannel实例)其parent属性的值为接收到该连接的服务器连接的监听通道 。private final Channel parent; // 父通道private final ChannelId id;// 实现具体的连接与读/写数据如网络的读/写链路关闭发起连接等命名为Unsafe表示不对外提供使用并非不安全private final Unsafe unsafe;// 一个Handler容器也可以将其理解为一个Handler链Handler 主要处理数据的编/解码业务逻辑// AbstractChannel内部有一个pipeline属性表示处理器的流水线 Netty在对通道进行初始化的时候将pipeline属性初始化为DefaultChannelPipeline// 的实例 这段代码也表明每个通道拥有一条ChannelPipeline处理器流水线 。//private final DefaultChannelPipeline pipeline;private final VoidChannelPromise unsafeVoidPromise new VoidChannelPromise(this, false);private final CloseFuture closeFuture new CloseFuture(this);private volatile SocketAddress localAddress;private volatile SocketAddress remoteAddress;// 每个Channel 对应一条EventLoop线程private volatile EventLoop eventLoop;private volatile boolean registered;private boolean closeInitiated;private Throwable initialCloseCause;/** Cache for the string representation of this channel */private boolean strValActive;private String strVal;/*** Creates a new instance.* param parent* the parent of this channel. {code null} if theres no parent.*/protected AbstractChannel(Channel parent) {this.parent parent; // 父通道id newId();unsafe newUnsafe(); // 底层的NIO通道完成的实际的IO操作pipeline newChannelPipeline(); // 一条通道拥有一条流水线}...
}Channel 是Netty 抽象出来对网络I/O进行读/写的相关接口与NIO中的Channel接口相似Channel的主要功能有网络I/O的读/写客户端发起连接主动关闭链接关闭链接获取通信双方的网络地址等Channel接口下有一个重要的抽象类AbstractChannel, 一些公共的基础方法都是达这个抽象类中实现。一些特定的功能可以通过各个不同的实现类去实现 最大限度的实现功能和接口的重用。AbstractChannel融合了Netty的线程模型事件驱动模型但由于网络I/O模型及协议种类较多 除了TCP协议Netty还支持很多的其他连接协议并且每种协议都有传统阻塞I/O和NIO(非阻塞I/O)版本的区别不同的协议不同的阻塞类型的连接有不同的Channel类型与之对应因此AbstractChannel并没有与网络I/O直接相关的操作每种阻塞与非阻塞Channel在AbstractChannel上都会继续抽象一层如AbstractNioChannel既是Netty 重新封装的EpollSocketChannel实现其他非阻塞I/O Channel 的抽象层。 至此NioSocketChannel就完成了初始化我们可以稍微的总结一下NioServerSocketChannel初始化的所有工作内容。
调用NioServerSocket.newSocket(DEFAULT_SELECTOR_PROVIDER)打开一个新的java NioSocketChannel.初始化AbstractChannel(Channel parent)对象并给属性赋值具体赋值的属性如下 。
id : 每个Channel 都会被分配一个唯一的id。parent : 属性值默认为null。unsafe: 通过调用newUnsafe()方法实例化一个Unsafe对象它的类型是AbstractNioByteChannel.NioByteUnsafe 。pipeline是通过调用new DefaultChannelPipeline(this) 新创建的实例。
AbstractNIOChannel被赋值的属性如下
ch : 被赋值为Java 原生的SocketChannel ,即NioSocketChannel的newSocket()方法返回的Java NioSocketChannel。readInterestOp : 被赋值的SelectionKey.OP_READ。ch : 被配置为非阻塞 即调用ch.configureBlocking(false)方法 。
NioSocketChannel 中被赋值的属性 config new NioServerSocketChannelConfig(this, socket.socket());
interface Unsafe {RecvByteBufAllocator.Handle recvBufAllocHandle();SocketAddress localAddress();SocketAddress remoteAddress();void register(EventLoop eventLoop, ChannelPromise promise);void bind(SocketAddress localAddress, ChannelPromise promise);void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);void disconnect(ChannelPromise promise);void close(ChannelPromise promise);void closeForcibly();void deregister(ChannelPromise promise);void beginRead();void write(Object msg, ChannelPromise promise);void flush();ChannelPromise voidPromise();ChannelOutboundBuffer outboundBuffer();
}从上述代码中可以看出 这些方法其实都是与Java 底层相关的Socket 的操作相对应 。 继续回到AbstractChannel 的构造函数中 这里调用了newUnsafe()方法获取一个新的Unsafe对象而newUnsafe()方法在NioServerSocketChannel 中被重写了代码如下 。
protected AbstractNioUnsafe newUnsafe() {return new NioMessageUnsafe();
}在上面分析了NioServerSocketChannel的大体初始化过程但是漏掉了一个关键的部分即ChannelPipeline的初始化在Pipeline的注释说明中写道“Each channel has its own pipeline and it is created automatically when a new channel is created”。 我们知道在实例化一个Channel 时必须被初始化为DefaultChannelPipeline实例DefaultChannelPipeline构造器的代码如下 。
protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);
}/*** |---------Decorder1------ Decoder2-------|* HeadContext ----| |------ TailContext* | --- | |------|* |-------Encoder1 ----- Encoder2 --------|** 图中的HeadContext和TailContext之间连接了各种编码器、解码器形成了整个Handler链表图中的箭头表示编码器和解码器的查* 找方向。Handler链表的头部和尾部都是在DefaultChannelPipeline的 构造方法中定义好的具体代码如下:*/
protected DefaultChannelPipeline(Channel channel) {this.channel ObjectUtil.checkNotNull(channel, channel);succeededFuture new SucceededChannelFuture(channel, null);voidPromise new VoidChannelPromise(channel, true);tail new TailContext(this);head new HeadContext(this);head.next tail;tail.prev head;
}DefaultChannelPipeline的构造函数要传入一个Channel 而这个Channel 其实就是我们初始化的NioServerSocketChannel 对象DefaultChannelPipeline会将这个NioSocketChannel对象保存在Channel 属性中 DefaultChannelPipeline中还有两个特殊的属性Head 和Tail 这两个属性是双向链表的头和尾 这个链表是Netty 实现Pipeline的机制的关键关于DefaultChannelPipeline中的双向链表及其所起的作用。后面再来分析 。 先来看HeadContext和TailContext的层次结构 。 从head.next tail 和 tail.prev head 这两行代码可以看出它是一个双向链表。 接着看initAndRegister的init()方法 。
void init(Channel channel) throws Exception {final MapChannelOption?, Object options options0();synchronized (options) {setChannelOptions(channel, options, logger);}// 关于属性设置这一块的源码请看另外一篇博客// Netty之DefaultAttributeMap与AttributeKey的机制和原理final MapAttributeKey?, Object attrs attrs0();synchronized (attrs) {for (EntryAttributeKey?, Object e: attrs.entrySet()) {SuppressWarnings(unchecked)AttributeKeyObject key (AttributeKeyObject) e.getKey();channel.attr(key).set(e.getValue());}}ChannelPipeline p channel.pipeline();final EventLoopGroup currentChildGroup childGroup;final ChannelHandler currentChildHandler childHandler;final EntryChannelOption?, Object[] currentChildOptions;final EntryAttributeKey?, Object[] currentChildAttrs;synchronized (childOptions) {currentChildOptions childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs childAttrs.entrySet().toArray(newAttrArray(0));}p.addLast(new ChannelInitializerChannel() {Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline ch.pipeline();ChannelHandler handler config.handler();if (handler ! null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}在上面代码initChannel(0方法中首先通过handler()方法获取一个Handler ,如果获取的handler不为空则添加pipeline中然后添加一个ServerBootstrapAcceptor的实例这里handler()方法中返回的是哪个对象呢其实它返回的Handler 属性而这个属性就是我们服务端的启动代码中设置 。 b.group(bossGroup, workerGroup) ; 这个时候Pipeline中的Handler 情况如下图所示 。 根据对原来的客户端代码的分析将Channel绑定到EventLoop(这里指的是NioServerSocketChannel绑定到bossGroup)后会在Pipeline中触发fireChannelRegistered事件接着会触发对ChannelInitializer的initChannel() 方法的调用因此在绑定完成后此时的Pipeline的内容如下所示 在我们分析bossGroup和workerGroup时已经知道ServerBootstrapAcceptor的channelRead()方法中会为新建的Channel设置Handler 并注册到一个EventLoop 中。
public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child (Channel) msg;child.pipeline().addLast(childHandler);...try {childGroup.register(child).addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}
}而这里的childHandler就是我们服务端启动代码中设置的Handler。 后续的步骤我们基本已经清楚了在客户端连接Channel注册后就会触发ChannelInitializer的initChannel()方法的调用最后我们总结一下服务端Handler与childHandler的区别与联系。
有服务端NioServerSocketChannel对象的pipeline中添加Handler对象和ServerBootstrapAcceptor对象 。当有新的客户端连接请求时 会调用ServerBootstrapAcceptor的channelRead()方法创建连接对应的NioSocketChannel对象并将childHandler添加到NioSocketChannel 对应的pipeline中而且将些Channel 绑定到wokerGroup 中的某个EventLoop。Handler 对象只有accept()阻塞阶段起作用它主要处理客户端发送过来的连接请求。childHandler在客户端连接建立以后起作用它负责客户端连接的I/O交互。 init()方法分为3步先是option设置再是attrs设置最后构建ChannelInitializer()添加到流水线中。 先来看options的设置方法 。
static void setChannelOptions(Channel channel, MapChannelOption?, Object options, InternalLogger logger) {for (Map.EntryChannelOption?, Object e: options.entrySet()) {setChannelOption(channel, e.getKey(), e.getValue(), logger);}
}private static void setChannelOption(Channel channel, ChannelOption? option, Object value, InternalLogger logger) {try {if (!channel.config().setOption((ChannelOptionObject) option, value)) {logger.warn(Unknown channel option {} for channel {}, option, channel);}} catch (Throwable t) {logger.warn(Failed to set channel option {} with value {} for channel {}, option, value, channel, t);}
}运行到这里我们首先要明白channel 即为NioServerSocketChannel, config 为 NioServerSocketChannelConfig , option就是之前NettyServer中设置的option参数。
bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LifeCycleInBoundHandler());ch.pipeline().addLast(new NettyServerHandler());}});先看一下NioServerSocketChannelConfig的类关系图。 进入NioServerSocketChannelConfig的setOption()方法 。
public T boolean setOption(ChannelOptionT option, T value) {if (PlatformDependent.javaVersion() 7 option instanceof NioChannelOption) {return NioChannelOption.setOption(jdkChannel(), (NioChannelOptionT) option, value);}return super.setOption(option, value);
}setOption()方法内部分两种情况如果Java 版本大于等于7 并且 ChannelOption是NioChannelOption时则调用NioChannelOption的setOption()方法这里不做分析将来遇到具体的情况时再来分析。如果不满足则调用父类的DefaultServerSocketChannelConfig的setOption()方法 。
public T boolean setOption(ChannelOptionT option, T value) {validate(option, value);if (option SO_RCVBUF) {setReceiveBufferSize((Integer) value);} else if (option SO_REUSEADDR) {setReuseAddress((Boolean) value);} else if (option SO_BACKLOG) {setBacklog((Integer) value);} else {return super.setOption(option, value);}return true;
}public ServerSocketChannelConfig setReceiveBufferSize(int receiveBufferSize) {try {javaSocket.setReceiveBufferSize(receiveBufferSize);} catch (SocketException e) {throw new ChannelException(e);}return this;
}关于SO_RCVBUF,SO_REUSEADDR,SO_BACKLOG的作用之前分析过这里就不再赘述大家发现一个规率没有像这种系统参数Netty直接调用rt.jar包的实现类来设置值而对于像CONNECT_TIMEOUT_MILLIS这种参数Netty 则直接保存到ChannelConfig的属性中以供今后使用。请看父类DefaultChannelConfig的setOption()方法 。
public T boolean setOption(ChannelOptionT option, T value) {validate(option, value);if (option CONNECT_TIMEOUT_MILLIS) {setConnectTimeoutMillis((Integer) value);} else if (option MAX_MESSAGES_PER_READ) {setMaxMessagesPerRead((Integer) value);} else if (option WRITE_SPIN_COUNT) {setWriteSpinCount((Integer) value);} else if (option ALLOCATOR) {setAllocator((ByteBufAllocator) value);} else if (option RCVBUF_ALLOCATOR) {setRecvByteBufAllocator((RecvByteBufAllocator) value);} else if (option AUTO_READ) {setAutoRead((Boolean) value);} else if (option AUTO_CLOSE) {setAutoClose((Boolean) value);} else if (option WRITE_BUFFER_HIGH_WATER_MARK) {setWriteBufferHighWaterMark((Integer) value);} else if (option WRITE_BUFFER_LOW_WATER_MARK) {setWriteBufferLowWaterMark((Integer) value);} else if (option WRITE_BUFFER_WATER_MARK) {setWriteBufferWaterMark((WriteBufferWaterMark) value);} else if (option MESSAGE_SIZE_ESTIMATOR) {setMessageSizeEstimator((MessageSizeEstimator) value);} else if (option SINGLE_EVENTEXECUTOR_PER_GROUP) {setPinEventExecutorPerGroup((Boolean) value);} else {return false;}return true;
}这些参数相应的设置到DefaultChannelConfig的属性中而这些参数的意义还是在使用时再来分析了吧。 先记录一下。 接下来看 ServerBootstrapAcceptor的构造函数实现。
ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,EntryChannelOption?, Object[] childOptions, EntryAttributeKey?, Object[] childAttrs) {this.childGroup childGroup;this.childHandler childHandler;this.childOptions childOptions;this.childAttrs childAttrs;// Task which is scheduled to re-enable auto-read.// Its important to create this Runnable before we try to submit it as otherwise the URLClassLoader may// not be able to load the class because of the file limit it already reached.//// See https://github.com/netty/netty/issues/1328enableAutoReadTask new Runnable() {Overridepublic void run() {channel.config().setAutoRead(true);}};
}接下来看注册方法实现。
// 在有了Selector NioEventLoop的成员和ServerSocketChannel之后我们需要将它们绑定起来
// 也就是把ServerSocketChannel绑定到bossGroup 中的NioEventLoop(select)上
// 下面是具体的注册过程
public ChannelFuture register(Channel channel) {return next().register(channel);
}public EventLoop next() {return (EventLoop) super.next();
}public EventExecutor next() {return chooser.next();
}好几个next()方法都弄晕了先明白chooser到底是什么 从之前的源码分析得知EventExecutorChooserFactory的实现类为DefaultEventExecutorChooserFactory而调用newChooser()方法无非是轮询 children new EventExecutor[nThreads]; 中的EventExecutor即可只是在chooserFactory的内部根据数组children的长度是否为2的幂次方而采用不同的算法而已 。 【注意】大家发现没有,这里group()方法返回的实际上是bossGroup 而next()方法只是从bossGroup中取出第1个NioEventGroup并调用其注册方法即使bossGroup中初始化NioEventLoop个数为CPU * 2 但实际上也只会调用bossGroup的第一个NioEventLoop的register()方法 当然网上也有这样的疑问 当然目前我也没有想到Netty开发者这样设计的原因当然以后想到原因再来补充吧或者有谁知道原因告诉我我再来补充这人疑问
public ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));
}public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, promise);promise.channel().unsafe().register(this, promise);return promise;
}下面这个register()方法中promise.channel()不就是channel嘛将channel传入到DefaultChannelPromise中然后又取出来再获取其unsafe()并调用他的register方法 。 那unsafe()是什么呢 请看下图。 unsafe()在什么时候初始化的呢 请看newUnsafe()方法 。而NioMessageUnsafe并没有实现register()方法 因此进入父类AbstractUnsafe的register()方法。
public final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop null) {throw new NullPointerException(eventLoop);}if (isRegistered()) {promise.setFailure(new IllegalStateException(registered to an event loop already));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException(incompatible event loop type: eventLoop.getClass().getName()));return;}// 当将NioSocketChannel注册到Selector上时有部分代码需要解 读NioSocketChannel对应的NioEventLoop线程在未启动时// eventLoop.inEventLoop()会返回false。若Worker的线程数为16则 在前面16个NioSocketChannel注册时都会把注册看作一个Task并添// 加到NioEventLoop的队列中同时启动NioEventLoop队列唤醒 Selector。这部分功能在AbstractUnsafe的register()方法中具体 代码如下:AbstractChannel.this.eventLoop eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn(Force-closing a channel whose registration task was not accepted by an event loop: {},AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}
}在register()方法中加粗代码eventLoop.inEventLoop()需要注意其实在之前的博客 Netty 之 DefaultPromise 源码解析 做了详细的分析 。 不过关于eventLoop.inEventLoop()这行代码我相信大家肯定还是晕来看一个例子 。
public class InEventLoopTest {volatile Thread thread;private final TaskRunner taskRunner new TaskRunner();private final AtomicBoolean started new AtomicBoolean();public final BlockingQueueRunnable taskQueue new LinkedBlockingQueueRunnable();public static void main(String[] args) {InEventLoopTest eventLoop new InEventLoopTest();Thread thread1 new Thread(new Runnable() {Overridepublic void run() {System.out.println(Thread.currentThread().getName() 执行任务2 );}}, 线程1 );Thread thread2 new Thread(new Runnable() {Overridepublic void run() {System.out.println(Thread.currentThread().getName() 执行任务1 );eventLoop.execute(thread1);}}, 线程2 );eventLoop.execute(thread2);}public void execute(Runnable task) {if (task null) {throw new NullPointerException(task);}addTask(task);if (!inEventLoop()) {startThread();} else {System.out.println( 执行任务为同一线程只将任务加入到队列中并不会创建新的线程去执行 );}}public void startThread() {if (started.compareAndSet(false, true)) {final Thread t new Thread(new Runnable() {Overridepublic void run() {taskRunner.run();}}, 执行任务线程);thread t;t.start();}}public void addTask(Runnable task) {if (task null) {throw new NullPointerException(task);}taskQueue.add(task);}public boolean inEventLoop() {return inEventLoop(Thread.currentThread());}public boolean inEventLoop(Thread thread) {return thread this.thread;}final class TaskRunner implements Runnable {Overridepublic void run() {for (; ; ) {Runnable task taskQueue.poll();if (task ! null) {try {task.run();} catch (Exception e) {e.printStackTrace();}if (task ! null) {continue;}}if (taskQueue.isEmpty()) {boolean stopped started.compareAndSet(true, false);assert stopped;if (taskQueue.isEmpty()) {break;}if (!started.compareAndSet(false, true)) {break;}}}}}
}执行结果执行任务线程 执行任务1 执行任务为同一线程只将任务加入到队列中并不会创建新的线程去执行
执行任务线程 执行任务2 分析上面执行结果主线程执行 thread2 此时因为InEventLoopTest的thread 为空与主线程肯定不相等inEventLoop()方法肯定返回false因此将thread2加入到taskQueue队列的同时调用startThread()方法创建新的线程来消费taskQueue队列中的任务。 而在线程2的内部又调用了execute()方法来执行线程1的任务因为执行任务的线程运行了thread2的run()方法 。而此时Thread.currentThread()即为执行任务的线程因此inEventLoop()方法将返回true即不会偿试创建新的线程来执行任务只将任务添加到队列即可。 我相信通过这个例子你应该对inEventLoop()方法有所理解 。 上面例子只是帮我们去理解inEventLoop()方法以及execute()方法而真正execute()是如何实现的呢 但还有一点值得注意bossGroup启动时inEventLoop()方法返回false为什么呢 因为实际上执行eventLoop.inEventLoop()的线程是main线程而inEventLoop()方法的内部如下
public boolean inEventLoop() {return inEventLoop(Thread.currentThread());
}public boolean inEventLoop(Thread thread) {return thread this.thread;
}判断当前线程和EventLoop执行任务的线程是否是同一个如果是同一个返回true否则返回false主线程第一次执行没有创建执行任务的线程返回false。 接下来进入eventLoop的execute()方法。
public void execute(Runnable task) {if (task null) {throw new NullPointerException(task);}boolean inEventLoop inEventLoop();// 将任务添加到taskQueue中 addTask(task);// 可能存在并发再次判断 if (!inEventLoop) {// 启动新线程去执行任务startThread();if (isShutdown()) {boolean reject false;try {if (removeTask(task)) {reject true;}} catch (UnsupportedOperationException e) {// The task queue does not support removal so the best thing we can do is to just move on and// hope we will be able to pick-up the task before its completely terminated.// In worst case we will log on termination.}if (reject) {reject();}}}if (!addTaskWakesUp wakesUpForTask(task)) {wakeup(inEventLoop);}
}用一句话来总节当EventLoop的execute()方法的第一次被调用时会触发startThread()方法的调用进而启动EventLoop所对应的java 本地线程。 // 这段代码表示变量在wakenUp为false的情况下会触发Selector的wakeup操作再思考若在添加任务时成功触发唤醒那么为什么NioEventLoop
// 在调用select()方法后还要再次调用wakenUp呢这段代码源码注释非常长有点难以理解具体如下
// 1. wakenUp唤醒动作可能在NioEventLoop线程运行的两个阶段被触发第一阶段有可能在NioEventLoop线程运行于wakeUp.getAndSet(false)
// 与selector.select(timeoutMillis)之间此时Selector.select能立刻返回最新任务得到及时执行
// 第二个阶段可能在selector.select(timeoutMillis)与runAllTasks之间此时在runAllTasks执行完本次任务后又添加了新任务这些任务是无法被及时唤醒的
// 因此此时wakenUp为true, 其他的唤醒操作都会失败从而导致这部分任务需要等待select超时后才会被执行。
// 这对于实时性要求很敲打程序来讲是无法接受的 因此在selector.select(timeMillis)与runAllTasks中间加入了if(wakenUp.get())即
// 若是唤醒动作则预期唤醒一次附上后续唤醒操作失败
// 2.但由于本书是Netty版本在select()方法里 调用hasTask()查看任务队列是否有任务且在进入select()方法之前会把wakenUp设置为false
// ,所以wakenUp.compareAndSet(false,true) 会成功因此当添加新任务时会调用selectNow()方法不会等到超时才执行任务此时无须
// 在select()方法后再次调用wakeup()方法
// 3. wakeup()方法操作耗时性能因此建议在非复杂处理时尽量不要开额外的线程
protected void wakeup(boolean inEventLoop) {if (!inEventLoop wakenUp.compareAndSet(false, true)) {selector.wakeup();}
}
private static final int ST_NOT_STARTED 1;
private static final int ST_STARTED 2;
private static final int ST_SHUTTING_DOWN 3;
private static final int ST_SHUTDOWN 4;
private static final int ST_TERMINATED 5;private volatile int state ST_NOT_STARTED;private void startThread() { // state的默认状态为ST_NOT_STARTEDif (state ST_NOT_STARTED) {// 将state状态设置为ST_STARTED已启动状态if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success false;try {doStartThread();success true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}
}接下来进入doStartThread()方法 。
private void doStartThread() {assert thread null;executor.execute(new Runnable() {Overridepublic void run() {thread Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success true;} catch (Throwable t) {logger.warn(Unexpected exception from an event executor: , t);} finally {for (;;) {int oldState state;if (oldState ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success gracefulShutdownStartTime 0) {if (logger.isErrorEnabled()) {logger.error(Buggy EventExecutor.class.getSimpleName() implementation; SingleThreadEventExecutor.class.getSimpleName() .confirmShutdown() must be called before run() implementation terminates.);}}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify// the future. The user may block on the future and once it unblocks the JVM may terminate// and start unloading classes.// See https://github.com/netty/netty/issues/6596.FastThreadLocal.removeAll();STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.countDown();if (logger.isWarnEnabled() !taskQueue.isEmpty()) {logger.warn(An event executor terminated with non-empty task queue ( taskQueue.size() ));}terminationFuture.setSuccess(null);}}}}});
}上面这个方法的逻辑还是很简单的不过有一点值得注意就是executor是什么 用 executor.execute()包装一下Runnable有什么用意是什么呢 executor为ThreadExecutorMap 那executor又是何时被初始化的呢在SingleThreadEventExecutor构造方法中打断点 。 在SingleThreadEventExecutor的构造函数中 executor ThreadExecutorMap.apply(executor, this);而apply()方法又传入了两个参数第一个参数又是一个executor第二个参数为NioEventLoop那第一个参数又是哪里初始化的呢 在MultithreadEventExecutorGroup的构造函数中找到了答案 默认情况下为ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory null) {throw new NullPointerException(threadFactory);}this.threadFactory threadFactory;}Overridepublic void execute(Runnable command) {// threadFactory的默认值为DefaultThreadFactorythreadFactory.newThread(command).start();}
} 而ThreadPerTaskExecutor的ThreadFactory threadFactory由newDefaultThreadFactory()方法创建看一下newDefaultThreadFactory()方法的实现。
protected ThreadFactory newDefaultThreadFactory() {return new DefaultThreadFactory(getClass());
}而newDefaultThreadFactory()方法实际上创建了一个DefaultThreadFactory对象 。 接下来继续回到ThreadExecutorMap的apply()方法 。
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {ObjectUtil.checkNotNull(executor, executor);ObjectUtil.checkNotNull(eventExecutor, eventExecutor);return new Executor() {Overridepublic void execute(final Runnable command) {// executor为ThreadPerTaskExecutorexecutor.execute(apply(command, eventExecutor));}};
}因此在doStartThread()的execute()方法实际上是调用了ThreadPerTaskExecutor的execute()方法但这里值得注意在上述execute()方法中对command又用apply()方法做了一层包装。
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {ObjectUtil.checkNotNull(command, command);ObjectUtil.checkNotNull(eventExecutor, eventExecutor);return new Runnable() {Overridepublic void run() {setCurrentEventExecutor(eventExecutor);try {command.run();} finally {setCurrentEventExecutor(null);}}};
}private static final FastThreadLocalEventExecutor mappings new FastThreadLocalEventExecutor();private static void setCurrentEventExecutor(EventExecutor executor) {mappings.set(executor);
}在run方法运行之前将NioEventLoop设置到ThreadLocal中NioEventLoop 相当于整个任务在运行中的全局上下文能从中获取修改移除值 最终会调用execute()的这一行代码 threadFactory.newThread(command).start();
public Thread newThread(Runnable r) {Thread t newThread(FastThreadLocalRunnable.wrap(r), prefix nextId.incrementAndGet());try {if (t.isDaemon() ! daemon) {t.setDaemon(daemon);}if (t.getPriority() ! priority) {t.setPriority(priority);}} catch (Exception ignored) {}return t;
} 而在newThread()方法中又对任务做了包装看看FastThreadLocalRunnable.wrap ( r)方法的实现。
final class FastThreadLocalRunnable implements Runnable {private final Runnable runnable;private FastThreadLocalRunnable(Runnable runnable) {this.runnable ObjectUtil.checkNotNull(runnable, runnable);}Overridepublic void run() {try {runnable.run();} finally {FastThreadLocal.removeAll();}}static Runnable wrap(Runnable runnable) {return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);}
}在wrap()方法中 最终任务被包装成了FastThreadLocalRunnable。 再来看threadFactory的newThread()方法看其内部newThread()方法实现。
protected Thread newThread(Runnable r, String name) {return new FastThreadLocalThread(threadGroup, r, name);
}看到没有最终Thread被替换成了FastThreadLocalThreadRunnable被替换成了FastThreadLocalRunnable而真正在执行FastThreadLocalRunnable的run()方法时在finally中会调用FastThreadLocal.removeAll()方法将线程范围内ThreadLocal给清除掉。 那Netty 这样做的目的是什么呢 其实也是为性能考虑 之前写了一篇博客 Netty源码性能分析 - ThreadLocal PK FastThreadLocal 关于ThreadLocal和FastThreadLocal的实现原理对比有兴趣可以去看看。 接下来进入doStartThread()方法的内部SingleThreadEventExecutor.this.run();这一行代码的分析 。
// 最后回到NioEventLoop的run()方法将前面的三个部分结合起来 首先调用select(boolean oldWakenup)方法轮询就绪的Channel
// 然后调用processSelectKeys()方法处理I/O事件最后运行runAllTasks()方法处理任务队列具体实现代码如下
protected void run() {for (;;) {try {try {// 根据是否有任务获取策略 默认策略 当有任务时返回selector.selectNow()// 当无任务时返回SelectStrategy.SELECTswitch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:// fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT:// 执行select()方法select(wakenUp.getAndSet(false));// wakenUp.compareAndSet(false, true) is always evaluated// before calling selector.wakeup() to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when wakenUp is set to// true too early.//// wakenUp is set to true too early if:// 1) Selector is waken up between wakenUp.set(false) and// selector.select(...). (BAD)// 2) Selector is waken up between selector.select(...) and// if (wakenUp.get()) { ... }. (OK)//// In the first case, wakenUp is set to true and the// following selector.select(...) will wake up immediately.// Until wakenUp is set to false again in the next round,// wakenUp.compareAndSet(false, true) will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following selector.select(...) call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).// 这进而的注释很长 为何还要判断wakenUp.get()并去执行唤醒操作selector.wakeup()这个唤醒操作// 底层原理是构造一个感兴趣的就绪事件让Selector 在调用select()方法时轮询到就绪事件并立刻返回// WindowsSelectImpl的wakeup源码如下// public Selector wakeup(){// synchronized(this.interruptLock){// if(!this.interruptTriggered){// this.selectWakeupSocket();// this.interruptTriggered true ;// }// return this;// }// }// 由上述代码可以发现多次同时调用wakeup()方法与调用一次没有区别因为InterruptTriggered第一次调用后就为true// ,后续再调用会立刻返回在默认情况下其他线程添加任务到taskQueue队列中后会调用NioEventLoop的wakeup()方法if (wakenUp.get()) {selector.wakeup();}// fall throughdefault:}} catch (IOException e) {// If we receive an IOException here its because the Selector is messed up. Lets rebuild// the selector and retry. https://github.com/netty/netty/issues/8566// 当出现I/O异常时需要重新构建SelectorrebuildSelector0();handleLoopException(e);continue;}cancelledKeys 0;needsToSelectAgain false;final int ioRatio this.ioRatio;if (ioRatio 100) {try {// I/O操作 根据selectedKeys进行处理processSelectedKeys();} finally {// Ensure we always run tasks.// 执行完所有的任务runAllTasks();}} else {final long ioStartTime System.nanoTime();try {// I/O操作根据selectedKeys进行处理processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime System.nanoTime() - ioStartTime;// 按一定的比例执行任务可能遗留一部分任务等待下次执行runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}
}先来分析selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())这一行代码。 先弄明白selectStrategy是什么 同样可以追溯到NioEventLoop的构造函数中。 在其中打一个断点会发现。 DefaultSelectStrategyFactory.INSTANCE为DefaultSelectStrategyFactory对象 而newSelectStrategy()方法实际创建的是DefaultSelectStrategy对象。因此selectStrategy就是DefaultSelectStrategy对象。接下来进入他的calculateStrategy()方法。
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}当然在calculateStrategy()方法中传入了两个参数第一个参数为selectNowSupplier而selectNowSupplier是NioEventLoop的属性。写法如下。
private final IntSupplier selectNowSupplier new IntSupplier() {Overridepublic int get() throws Exception {return selectNow();}
};第二个参数来源于hasTask()方法
protected boolean hasTasks() {return super.hasTasks() || !tailTasks.isEmpty();
}protected boolean hasTasks() {assert inEventLoop();return !taskQueue.isEmpty();
}hasTasks()方法中只要队列中任务不为空则返回true而hasTasks为true,则会调用selectSupplier.get()方法而get()方法的内部会调用selectNow()接下来看selectNow()的实现。
int selectNow() throws IOException {try { return selector.selectNow();} finally {if (wakenUp.get()) {selector.wakeup();}}
}在阅读源码之前先来看看Selector相关的基础知识 。 Selector类中总共包含以下10个方法
open():创建一个Selector对象isOpen():是否是open状态如果调用了close()方法则会返回falseprovider():获取当前Selector的Providerkeys():如上文所述获取当前channel注册在Selector上所有的keyselectedKeys():获取当前channel就绪的事件列表selectNow():获取当前是否有事件就绪该方法立即返回结果不会阻塞如果返回值0则代表存在一个或多个select(long timeout):selectNow的阻塞超时方法超时时间内有事件就绪时才会返回否则超过时间也会返回select():selectNow的阻塞方法直到有事件就绪时才会返回wakeup():调用该方法会时阻塞在select()处的线程会立马返回(ps下面一句划重点)即使当前不存在线程阻塞在select()处那么下一个执行* select()方法的线程也会立即返回结果相当于执行了一次selectNow()方法close(): 用完Selector后调用其close()方法会关闭该Selector且使注册到该Selector上的所有SelectionKey实例无效。channel本身并不会关闭。 当然啦如果hasTasks返回false则calculateStrategy()方法返回的是SelectStrategy.SELECT如果返回的是SelectStrategy.SELECTNetty的处理方式又是如何呢 进入select()方法 。
private void select(boolean oldWakenUp) throws IOException {Selector selector this.selector;try {int selectCnt 0;// 获取当前系统时间(纳秒级)long currentTimeNanos System.nanoTime();// 获取定时任务的触发时间long selectDeadLineNanos currentTimeNanos delayNanos(currentTimeNanos);// 死循环for (; ; ) {// 获取距离定时任务触发时间的时长 四舍五入long timeoutMillis (selectDeadLineNanos - currentTimeNanos 500000L) / 1000000L;// 已经触发或超时if (timeoutMillis 0) {// 若之前未执行过select ,则调用非阻塞的selectNow()方法if (selectCnt 0) {selector.selectNow();selectCnt 1;}// 跳出循环去处理I/O事件和定时任务break;}// If a task was submitted when wakenUp value was true, the task didnt get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we dont, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline./**** 当任务队列中有任务且唤醒标志为false时需要调用selectNow()方法* 否则任务得不到及时处理可能需要阻塞等待超时* 这段判断在Netty之后才加上的检测到有任务并未设置唤醒标识*/if (hasTasks() wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt 1;break;}// 阻塞检测就绪的Channel,除非有就绪的Channel// 或者遇到空轮询的问题或者被其他线程唤醒// 否则只能等待timeoutMillis后自动醒来int selectedKeys selector.select(timeoutMillis);// 检测次数加1 此参数主要用来判断是否有空轮询的selectCnt ;// 若轮询到selectKeys不为0或 oldWakenUp 的参数为true// 或有线程设置 wakenUp 为true , 或任务队列和定时任务队列的值if (selectedKeys ! 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}// 线程中断if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or its client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug(Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.);}selectCnt 1;break;}long time System.nanoTime();// 超时自动醒来说明定时任务已经从队列中移除了if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) currentTimeNanos) {// timeoutMillis elapsed without anything selected.// 将selectCnt设置为1在下次进入循环时直接跳出无须调用selectCnt 1;} else if ( //在timeoutMillis时间内连续的select次数大于或等于512次并未跳出循环SELECTOR_AUTO_REBUILD_THRESHOLD 0 selectCnt SELECTOR_AUTO_REBUILD_THRESHOLD) {// 此时进入空轮询需要重新构建Selector并跳出循环// The code exists in an extra method to ensure the method is not too big to inline as this// branch is not very likely to get hit very frequently.selector selectRebuildSelector(selectCnt);selectCnt 1;break;}// 当前系统时间更新(纳秒级)currentTimeNanos time;}if (selectCnt MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug(Selector.select() returned prematurely {} times in a row for Selector {}.,selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() raised by a Selector {} - JDK bug?,selector, e);}// Harmless exception - log anyway}
}select()方法中有一个空轮询的bug先放一放先来分析NioEventLoop的run()方法中。Netty为什么要添加下面加红框代码。它的用意是什么呢同样将疑问留给后面再来分析 。 小插曲分析这一块代码时也花了几个小时最终我自己被弄晕了不知道Netty这样做的用意是什么问题在于没有弄明白selector.wakeup()方法的真正使用因此先通过几个例子来弄明白wakeup(), select(), selectNow() 的使用再来分析这一块的源码 。 网上对wakeup()方法原理是这样说明的 NIO中的Selector封装了底层的系统调用其中wakeup用于唤醒阻塞在select方法上的线程它的实现很简单在linux上就是创建一 个管道并加入poll的fd集合wakeup就是往管道里写一个字节那么阻塞的poll方法有数据可读就立即返回。wakeup方法的API还告诉我们如果当前Selector没有阻塞在select方法上那么本次 wakeup调用会在下一次select阻塞的时候生效这个道理很简单wakeup方法写入一个字节下次poll等待的时候立即发现可读并返回因 此不会阻塞。
例子1 主线程调用selector.select(); 创建一个子线程2秒后调用selector.wakeup()方法 。主线程被唤醒。 其实我的认知也是这样的select()方法阻塞wakeup()唤醒因为这种认知也就决定了看不懂Netty源码。
例2 第一次select()方法前面调用了两次wakeup() 当然调用一次效果也是一样 第一次select()方法不阻塞第二次select()方法阻塞 。 如果当前Selector没有阻塞在select方法上那么本次 wakeup调用会在下一次select阻塞的时候生效这个特性非常重要也是看懂Netty源码的关键。
例3 wakeup()方法对select(long timeout) 方法和对select()方法的影响一样。
例4wakeup()方法调用此时执行selectNow()方法此时并不会唤醒selectNow()之后的select()方法 。 有了这4个例子为基础再来分析select()方法应该会轻松很多 。 先来看wakenUp.getAndSet(false)这一行代码 。
public final boolean getAndSet(boolean newValue) {boolean prev;do {prev get();} while (!compareAndSet(prev, newValue));return prev;
}在getAndSet()方法中不断的进行抢锁操作只要一次抢到锁wakenUp就被设置为false 也就是说在getAndSet()方法执行之后进入select()方法此时wakenUp的值一定是false。 接下来看什么时候会进入1中的代码块又什么时候执行下面2中的代码。 在分析上面提出的疑问之前先来看一下currentTimeNanos delayNanos()方法之间的关系 。 进入delayNanos()方法 。
private static final long SCHEDULE_PURGE_INTERVAL TimeUnit.SECONDS.toNanos(1)private static final long START_TIME System.nanoTime();protected long delayNanos(long currentTimeNanos) {ScheduledFutureTask? scheduledTask peekScheduledTask();if (scheduledTask null) {return SCHEDULE_PURGE_INTERVAL;}return scheduledTask.delayNanos(currentTimeNanos);
}final ScheduledFutureTask? peekScheduledTask() {QueueScheduledFutureTask? scheduledTaskQueue this.scheduledTaskQueue;if (scheduledTaskQueue null) {return null;}return scheduledTaskQueue.peek();
}public long delayNanos(long currentTimeNanos) {return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
}delayNanos()方法分两种情况来考虑如果this.scheduledTaskQueue中没有ScheduledFutureTask和this.scheduledTaskQueue中有ScheduledFutureTask两种情况如果scheduledTaskQueue没有ScheduledFutureTask则返回的是SCHEDULE_PURGE_INTERVAL 默认值为1秒 如果scheduledTaskQueue有ScheduledFutureTask 。 ScheduledFutureTask的START_TIME属性有一个初始化时间ScheduledFutureTask的构造函数调用也有一个时间而ScheduledFutureTask的构造函数调用时间大于ScheduledFutureTask的START_TIME属性初始化时间它们之间有一个时间差。 运行到select()方法的long currentTimeNanos System.nanoTime();这一行代码有一个时间currentTimeNanos而delayNanos()方法调用时间为ScheduledFutureTask的构造函数调用 1 秒假如ScheduledFutureTask的初始化时间为5毫秒 那么deadlineNanos的值为1005毫秒线程执行到select()的这一行代码long currentTimeNanos System.nanoTime();时所花的时间为1004.6秒则会进入下面加粗代码并执行。 但如果运行到long currentTimeNanos System.nanoTime(); 这一行代码的时间为1004.4秒则依然不会进入下面加粗代码因为 timeoutMillis的值精确到毫秒。
private void select(boolean oldWakenUp) throws IOException {Selector selector this.selector;try {int selectCnt 0;long currentTimeNanos System.nanoTime();long selectDeadLineNanos currentTimeNanos delayNanos(currentTimeNanos);for (; ; ) {long timeoutMillis (selectDeadLineNanos - currentTimeNanos 500000L) / 1000000L;if (timeoutMillis 0) {if (selectCnt 0) {selector.selectNow();selectCnt 1;}break;}...}}
}那么来看另外一种情况 假如 delayNanos(currentTimeNanos)返回值为1秒 select(timeoutMillis) 方法的执行时间大于等于999.5毫秒时。 当然啦在下图中是假设selector.select(timeoutMillis)的执行时间为1002毫秒再次进入循环时timeoutMillis的值就为-2秒。 如果是上面这种情况则会执行下面加粗代码 。
private void select(boolean oldWakenUp) throws IOException {Selector selector this.selector;try {int selectCnt 0;long currentTimeNanos System.nanoTime();long selectDeadLineNanos currentTimeNanos delayNanos(currentTimeNanos);for (; ; ) {long timeoutMillis (selectDeadLineNanos - currentTimeNanos 500000L) / 1000000L;if (timeoutMillis 0) {if (selectCnt 0) {selector.selectNow();selectCnt 1;}break;}...}}
}假如select(timeoutMillis) 的执行时间小于999.5毫秒时 假如为999.4毫秒select(timeoutMillis)就从阻塞中唤醒此时 long timeoutMillis (selectDeadLineNanos - currentTimeNanos 500000L) / 1000000L;的值为timeoutMillis 1000 - 999.4 0.5 0.6 四舍五入为1 毫秒。此时会继续执行循环内代码继续执行 select(1毫秒)其他情况以此类推。 接下来看另外一种情况什么时候进入下面加粗代码执行呢
private void select(boolean oldWakenUp) throws IOException {Selector selector this.selector;try {int selectCnt 0;long currentTimeNanos System.nanoTime();long selectDeadLineNanos currentTimeNanos delayNanos(currentTimeNanos);for (; ; ) {...if (hasTasks() wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt 1;break;}...}} catch (CancelledKeyException e) {}
}聪明的小伙伴肯定一眼就看出来了当添加任务时就会执行上述加粗代码场景是怎样子的呢 当线程1向NioEventLoop的执行任务线程称线程2 添加了任务此时线程2刚好执行到if (hasTasks() wakenUp.compareAndSet(false, true)) {…} 这一行代码发现队列中有任务就执行wakenUp.compareAndSet(false, true) CAS 操作但线程1 也执行了wakeup(boolean inEventLoop) 方法同时也执行了wakenUp.compareAndSet(false, true)操作也就是上述步骤3 和步骤4 位置的代码同时执行 。但步骤4 也就是NioEventLoop执行任务的线程CAS操作成功。 此时线程2 就会执行上述加粗代码。
if (hasTasks() wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt 1;break;
}但聪明的小伙伴肯定会想如果此时线程1的 步骤3 抢锁成功会怎样呢这里又分两种情况如果线程1的selector.wakeup();先执行此时线程2的int selectedKeys selector.select(timeoutMillis); 这一行代码后执行根据之前例子中分析得出的结论【如果当前Selector没有阻塞在select方法上那么本次 wakeup调用会在下一次select阻塞的时候生效】此时select()方法不会阻塞并且在后续的 if (selectedKeys ! 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {… } 这一行代码的hasTask()方法返回true退出死循环如果线程2的int selectedKeys selector.select(timeoutMillis); 这一行代码先执行而线程1的selector.wakeup();后执行依然会唤醒线程2 中的select()方法同样的方式退出死循环。 接下来对NioEventLoop的if (selectedKeys ! 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { … } 这一行代码的前4种情况进行分析 。
第一种情况 selectedKeys ! 0时这种情况很好理解有就绪的Channel时selector.select(timeoutMillis);就会被唤醒。第二种情况在runAllTasks()方法执行完且select(wakenUp.getAndSet(false));方法执行前此时有任务添加到队列又在select()方法中的 if (hasTasks() wakenUp.compareAndSet(false, true)) { 这一行代码执行之前又有任务添加到队列此时会出现 selectedKeys 0 oldWakenUp true的情况 。 3. 什么时候会出现selectedKeys 0 oldWakenUp false wakenUp.get() true的情况呢 这个就更简单了。 4. 什么时候会出现selectedKeys 0 oldWakenUp false wakenUp.get() false hasTasks() true呢 如果对源码比较了解还是不难的。 当然啦超时醒来在后面的循环判断中也会退出死循环但这里相当于走了捷径能更快的退出循环。从而提升程序的性能。 从这些细节中可以看出 Netty在性能这一方面可谓苦心孤诣 。 接下来再来分析之前提出的疑问下面加红框代码的用意何在。 如果selector.wakeup()被执行首先要满足条件wakenUp.get() 返回值为true,而我们知道在进入select(wakenUp.getAndSet(false))方法时肯定wakenUp.get() 肯定为false因此再次进入select()方法的分析 。
在select()方法执行开始到selector.select(timeoutMillis) 这段代码期间有新任务添加当然有分为两中情况 第一种情况if (hasTasks() wakenUp.compareAndSet(false, true)) 这一行代码执行前没有新任务添加且在这一行代码执行时 刚好有新任务添加且这一行代码CAS操作成功wakenUp.compareAndSet(false, true)返回true . 第二种情况在if (hasTasks() wakenUp.compareAndSet(false, true)) 这一行代码执行前就已经在新任务添加。 但无论哪种情况此时wakeUp的值始终为true . 会触发外层run()方法中的selector.wakeup()方法调用。 在runAllTasks()方法执行完再次进入循环执行select(wakenUp.getAndSet(false)) 这一行代码之间有任务添加到队列中。 在select()方法内 在private void select(boolean oldWakenUp) 方法执行开始到if (hasTasks() wakenUp.compareAndSet(false, true)) 这一行代码之间如果没有任务添加到队列则会执行selector.selectNow(); 代码退出循环此时对性能没有什么影响 。 但如果在select(boolean oldWakenUp) 到if (hasTasks() wakenUp.compareAndSet(false, true))这一行代码之间有新的任务 添加到队列中 此时(hasTasks() wakenUp.compareAndSet(false, true))条件为false ,会进入下面的selector.select(timeoutMillis) 这一行代码执行但select()方法是阻塞的需要等待selector.wakeup()调用后此方法才能被唤醒对于高性能的Netty来说是无法忍受的。此时提前预调用 selector.wakeup()就起到重要作用 从之前wakeup()的原理得知【 NIO中的Selector封装了底层的系统调用其中wakeup用于唤醒阻塞在select方法上的线程它的实现很简单在linux上就是创建一 个管道并加入poll的fd集合wakeup就是往管道里写一个字节那么阻塞的poll方法有数据可读就立即返回。】因为在runAllTasks()方法之前就已经调用了selector.wakeup();方法而此时管道中肯定有一个字节存在了当调用 selector.select(timeoutMillis)方法时发现管道中有一个字节存在则立即返回不会进入阻塞从最大程度上压榨了CPU的性能 。
有 3 种方式可以 select 就绪事件
1select() 阻塞方法有一个就绪事件或者其它线程调用了 wakeup() 或者当前线程被中断时返回。
2select(long timeout) 阻塞方法有一个就绪事件或者其它线程调用了 wakeup()或者当前线程被中断或者阻塞时长达到了 timeout 时返回。不抛出超时异常。
3selectNode() 不阻塞如果无就绪事件则返回 0如果有就绪事件则将就绪事件放到一个集合返回就绪事件的数量。 当然我们知道selectNow()为非阻塞的select(…)方法为阻塞的那么性能上有什么差别呢同样用例子来证明 。
例1 selectNow()方法调用10万次看执行时间
public static void main(String[] args) throws Exception {Selector selector Selector.open();long start System.currentTimeMillis();for (int i 0; i 100000; i) {selector.selectNow();}Utils.print(结束 : (System.currentTimeMillis() - start));
}
例2 同样的方式select()方法调用10万次但此时有另外一个线程不断的调用selector.wakeup()方法不断的唤醒select()方法看执行时间 。
public static boolean stop false;public static void main(String[] args) throws Exception {Selector selector Selector.open();long start System.currentTimeMillis();FutureTaskBoolean futureTask new FutureTaskBoolean(new MyCallable(selector));Thread thread new Thread(futureTask);thread.start();new Thread(new Runnable() {Overridepublic void run() {for (;;) {if (stop) {break;}selector.wakeup();}}}).start();stop futureTask.get();Utils.print(结束 : (System.currentTimeMillis() - start));
}
static class MyCallable implements CallableBoolean {public Selector selector;public MyCallable(Selector selector) {this.selector selector;}Overridepublic Boolean call() throws Exception {for (int i 0; i 100000; i) {try {selector.select(50000);} catch (IOException e) {e.printStackTrace();}}return true;}
}执行结果经过多次测试发现在10万次的比较中发现select()方法的执行时间大概是selectNow()时间的2倍 因此从测试结果上来看wakeup()与select()方法组合比selectNow()方法更加耗时因此预selector.wakeup() 方法调用一定程度上能提升任务执行比较频繁情况下的性能 。
空循环bug Netty解决JDK 空轮询Bug 大家可能早就听说过臭名昭著的Java Nio epoll 的bug , 它会导致Selector空轮询最终导致CPU使用率达到10% , 官方称JDK1.6的update 18修复了这个问题但是直到JDK 1.7 该问题仍旧存在只不过该Bug发生的概率降低了一些而已并没有根本的解决出现此Bug 是因为当Selector轮询结果为空时 没有进行wakeup()或对新消息及时进行处理导致发生空轮询CPU 使用率达到了100% 我们来看一下这个问题存在Issue中的原始描述 。 this is aan issue with poll (and epoll) on linux ,if a file descriptor for a connected socket is polled with a request event mask of , and if the connection is abruptly terminated(RST) then the poll wakes up with the POLLHUP (and maybe POLLERR) bit set in the returned event set ,the implication of this behaviour is that Selector wiill wakeup and as the interest set for the SocketChannel is 0 it means ther aren’t any selected events and the select method return 0 . 具体解释在部分Linux Kernel 2.6 中 poll 和 epoll 对于突然中断的Socket 连接会对返回的EventSet 事件集体置为POLLHUP也可能是POLLERR,EventSet 事件集合发生了变化这就可能导致Selector 被唤醒。 这是与操作系统机制有关的 JDK 虽然仅仅是一个兼容的各个操作系统平台的软件但遗憾的是JDK 5 和JDK 6 最初版本中这个问题并没有得到解决而是将帽子抛给了操作系统方。这就是这个Bug 一直到2013年才最终修复的原因 。 Netty中最终的解决办法是创建一个新的Selector,将可用的事件重新注册到新的Selector中来终止空轮询我们来回顾一下事件轮询的关键代码 。 假如selector.select(timeoutMillis)方法中timeoutMillis为1000毫秒如果在999.5毫秒后被唤醒则此时退出循环如果在999.5之前醒来假如999.4 则 1000 - 999.4 0.5 取整数为1毫秒则需要再次调用selector.select(1) 进行阻塞等待后续循环以此类推个人觉得一般情况只会出现2次循环就是第一在999.5毫秒之前醒来再一次睡眠假如睡眠一秒钟再次醒来从概率上来说一次提前醒来一次延迟醒来时间应该大于1秒因此第二次循环之后应该就退出 如果循环了3次以上 都提前0.5毫秒醒来可能就有问题了在这里Netty 打印了debug日志我相信有了这些理论知识再来理解Netty 开发者为什么会在selectCnt大于3时打印一行debug日志的原因了。 如果出现空轮询bugNetty会怎样做呢进入selectRebuildSelector()方法 。
private Selector selectRebuildSelector(int selectCnt) throws IOException {// The selector returned prematurely many times in a row.// Rebuild the selector to work around the problem.logger.warn(Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.,selectCnt, selector);rebuildSelector();Selector selector this.selector;// Select again to populate selectedKeys.selector.selectNow();return selector;
}public void rebuildSelector() {if (!inEventLoop()) {execute(new Runnable() {Overridepublic void run() {rebuildSelector0();}});return;}rebuildSelector0();
}// select函数的代码解读中发现Netty在处理空轮询次数大于或等于阈值默认是512时需要重新构建Selector重新构建Selector
// 方式比较巧妙重新打开一个新的Selector 将旧的Selector 上的key和attchment复制过去同时关闭旧的Selector具体代码如下
private void rebuildSelector0() {final Selector oldSelector selector;final SelectorTuple newSelectorTuple;if (oldSelector null) {return;}try {// 开启新的SelectornewSelectorTuple openSelector();} catch (Exception e) {logger.warn(Failed to create a new Selector., e);return;}// Register all channels to the new Selector.// 遍历旧的Selector上的keyint nChannels 0;for (SelectionKey key: oldSelector.keys()) {Object a key.attachment();try {// 判断key是否有效if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) ! null) {continue;}// 将旧的Selector 上触发的事件需要取消int interestOps key.interestOps();key.cancel();// 把channel重新注册到新的Selector 上SelectionKey newKey key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);if (a instanceof AbstractNioChannel) {// Update SelectionKey((AbstractNioChannel) a).selectionKey newKey;}nChannels ;} catch (Exception e) {logger.warn(Failed to re-register a Channel to the new Selector., e);if (a instanceof AbstractNioChannel) {AbstractNioChannel ch (AbstractNioChannel) a;ch.unsafe().close(ch.unsafe().voidPromise());} else {SuppressWarnings(unchecked)NioTaskSelectableChannel task (NioTaskSelectableChannel) a;invokeChannelUnregistered(task, key, e);}}}selector newSelectorTuple.selector;unwrappedSelector newSelectorTuple.unwrappedSelector;try {// time to close the old selector as everything else is registered to the new one// 关闭旧的SelectoroldSelector.close();} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn(Failed to close the old Selector., t);}}if (logger.isInfoEnabled()) {logger.info(Migrated nChannels channel(s) to the new Selector.);}
}实际上在rebuildSelector() 方法中主要做了以下三件事件。
创建了一个新的Selector将原来的Selector中注册的事件全部取消。将可用的事件重新注册到新的Selector并激活就这样Netty 完美的解决了JDK的空轮询bug。 对于ioRatio这个参数的理解 。 如果ioRatio的值为100则没有时间概念先执行完processSelectedKeys()接着执行runAllTasks()执行完runAllTasks()后再执行processSelectedKeys()交替执行但配置了ioRatio比率时假如比率为60而processSelectedKeys()的执行时间为6秒则下次runAllTasks()的执行时间为 6 * (100 - 60) / 60 4 秒但也不一定准确的是只能执行4秒的时间因为在runAllTasks()方法的内部每执行完64个任务才检测一次如前63个任务执行时间为3秒第64个任务执行时间为2秒当第64个任务执行完检测时间有没有大于4秒 3 2 5 秒显然大于退出任务但此runAllTasks()总花的时间为5秒。 关于processSelectedKeys()的处理逻辑后面再来分析先来分析runAllTasks()方法的处理逻辑 。
protected boolean runAllTasks(long timeoutNanos) {// 从定时任务队列中将达到执行时间的task丢到taskQueue队列中fetchFromScheduledTaskQueue();// 从taskQueue队列获取taskRunnable task pollTask();// 若task为空if (task null) {// 执行tailTasks中的task做收尾工作afterRunningAllTasks();return false;}// 获取执行截止时间final long deadline ScheduledFutureTask.nanoTime() timeoutNanos;// 执行任务个数long runTasks 0;// 运行task的最后时间long lastExecutionTime;for (;;) {// 运行task的run()方法safeExecute(task);runTasks ;// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.// 运行64个任务就进行一次是否达到截止时间检查// 0x3F 代表63 转化为二进制为 0011 1111if ((runTasks 0x3F) 0) {lastExecutionTime ScheduledFutureTask.nanoTime();if (lastExecutionTime deadline) {break;}}// 再从taskQueue队列中获取tasktask pollTask();// 若没有task了则更新最后执行时间并跳出循环if (task null) {lastExecutionTime ScheduledFutureTask.nanoTime();break;}}// 收尾工作afterRunningAllTasks();this.lastExecutionTime lastExecutionTime;return true;
}protected static void safeExecute(Runnable task) {try {task.run();} catch (Throwable t) {logger.warn(A task raised an exception. Task: {}, task, t);}
} 第二部分runAllTasks 主要目的就是执行taskQueue队列和定时任务的中的任务 如心跳检测异步写操作等首先NioEventLoop会根据ioRatioI/O事件与taskQueue运行时间占比计算任务执行时长由一个NioEventLoop线程线程需要管理很多的Channel这些Channel的任务可能非常多 若要执行完 则I/O事件可能得不到及时处理因此每执行64个任务后就会检测执行任务的时间是否已经用完 如果执行任务的时间用完了 就不再执行后续的任务了。 其中的任务当然包括之前的register0()方法。如下图所示 。 对于register0()方法还没有分析接下来进入register0()的具体实现逻辑。
// 在AbstractUnsafe的register0()方法中有关于如何将用户自定义 的Hanlder添加到NioSocket Channel的Handler链表中的方法核心代 码解读如下:
private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration neverRegistered;// 此方法调用AbstractNioChannel的doRegister()方法把NioServerSocketChannel和NioSocketChannel的注册抽象出来doRegister();neverRegistered false;registered true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener./*** 在ServerBootstrapAcceptor的channelRead()方法中把用户定义的Handler 追加到Channel 的管道中child.pipeline().addLast(childHandler)* 此方法会追加一个回调此时正好会触发这个回调* if(!registered){* newCtx.setAddPending();* callHandlerCallbackLater(newCtx,true);* return this;* }*/pipeline.invokeHandlerAddedIfNeeded();// 此方法会触发promisesafeSetSuccess(promise);pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.if (isActive()) {if (firstRegistration) {// 此方法会触发HeadContext的channelActive()方法并最终调用AbstractNioChannel的doBeginRead()方法注册监听OP_READ事件pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}// doRegister()方法在AbstractUnsafe的register0()方法中被调用
protected void doRegister() throws Exception {boolean selected false;for (;;) {try {/*** 通过javaChannel()方法获取具体的NioChannel* 把Channel 注册到EventLoop线程的Selector上* 对于注册后返回的SelectionKey需要为其设置Channel 感兴趣的事件*/selectionKey javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the canceled SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.// 由于尚未调用select.select(...)// 因此可能仍在缓存而未删除但已经取消SelectionKey// 强制调用selector.selectNow()方法// 将已经取消的selectionKey从Selector 上删除eventLoop().selectNow();selected true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?// 只有第一次抛出此异常才能调用select.selectNow()进行取消// 如果调用selector.selectNow()还有取消缓存则可能是JDK的一个bugthrow e;}}}
}对于javaChannel()这个方法需要注意。 如果当前channel是NioServerSocketChannel则返回的是ServerSocketChannel如果当前是NioSocketChannel则返回的是SocketChannel但无论是无论是bossGroup还是workerGroup都调用其相应的Channel的register方法还有一点需要注意这些传入感兴趣的事件为0 并不是OP_READOP_WRITEOP_CONNECTOP_ACCEPT事件那什么时候注册成这些事件的呢 留着疑问后面来分析 。 当然register()方法的第三个参数为this也就是SocketChannel本身先记着后面会有用。 接下来分析invokeHandlerAddedIfNeeded()方法在之前提到过 pipeline为DefaultChannelPipeline因此进入它的invokeHandlerAddedIfNeeded()方法 。
final void invokeHandlerAddedIfNeeded() {assert channel.eventLoop().inEventLoop();// firstRegistration的初始化值为trueif (firstRegistration) {firstRegistration false;// We are now registered to the EventLoop. Its time to call the callbacks for the ChannelHandlers,// that were added before the registration was done.callHandlerAddedForAllHandlers();}
}上面就是普通的方法调用接下来进入callHandlerAddedForAllHandlers()方法的研究。
private void callHandlerAddedForAllHandlers() {final PendingHandlerCallback pendingHandlerCallbackHead;synchronized (this) {assert !registered;// This Channel itself was registered.registered true;pendingHandlerCallbackHead this.pendingHandlerCallbackHead;// Null out so it can be GCed.this.pendingHandlerCallbackHead null;}// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside// the EventLoop.PendingHandlerCallback task pendingHandlerCallbackHead;while (task ! null) {task.execute();task task.next;}
}首先要明白pendingHandlerCallbackHead是什么是什么时候添加的呢 在代码中寻寻觅觅。 callHandlerCallbackLater()中打断点 。 pendingHandlerCallbackHead在callHandlerCallbackLater()初始化而什么时候被调用的呢 也就是在ServerBootstrap的init()方法中p.addLast(new ChannelInitializerChannel() {…} 这一行代码执行时初始化了pendingHandlerCallbackHead。 接下来进入PendingHandlerAddedTask的execute()方法 。
private final class PendingHandlerAddedTask extends PendingHandlerCallback {PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {super(ctx);}Overridepublic void run() {callHandlerAdded0(ctx);}Overridevoid execute() {EventExecutor executor ctx.executor();// executor 就是NioEventloop而此方法正好是// NioEventLoop runAllTasks()方法内部调用的因此executor.inEventLoop()// 值为true if (executor.inEventLoop()) {callHandlerAdded0(ctx);} else {try {executor.execute(this);} catch (RejectedExecutionException e) {if (logger.isWarnEnabled()) {logger.warn(Cant invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.,executor, ctx.name(), e);}remove0(ctx);ctx.setRemoved();}}}
}大家可能晕了上面加粗代码又是什么 进入executor()方法 。
AbstractChannelHandlerContext#
public EventExecutor executor() {if (executor null) {return channel().eventLoop();} else {return executor;}
}AbstractNioChannel#
public NioEventLoop eventLoop() {return (NioEventLoop) super.eventLoop();
}AbstractChannel#
public EventLoop eventLoop() {EventLoop eventLoop this.eventLoop;if (eventLoop null) {throw new IllegalStateException(channel not registered to an event loop);}return eventLoop;
}executor最终来源于AbstractNioChannel的eventLoop属性那eventLoop属性又是什么时候初始化的呢 在代码中寻寻觅觅发现是在ServerBootstrap的doBind()方法中内初始化。 eventLoop的值又来源于chooser选择器。 关于chooser选择器他的原理之前已经分析过。 这里就不再赘述 通过不断的回顾之前的代码大家有没有发现之前看不懂的代码逻辑开始慢慢的清晰了。 言归正传回到之前的代码 。
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.callHandlerAdded();} catch (Throwable t) {boolean removed false;try {remove0(ctx);ctx.callHandlerRemoved();removed true;} catch (Throwable t2) {if (logger.isWarnEnabled()) {logger.warn(Failed to remove a handler: ctx.name(), t2);}}if (removed) {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() .handlerAdded() has thrown an exception; removed., t));} else {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() .handlerAdded() has thrown an exception; also failed to remove., t));}}
} 当然上面主要调用了callHandlerAdded()方法接下来进入callHandlerAdded()这个方法 。
final void callHandlerAdded() throws Exception {// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates// any pipeline events ctx.handler() will miss them because the state will not allow it.if (setAddComplete()) {handler().handlerAdded(this);}
} 接着进入handlerAdded()方法 。
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {// 在AbstractUnsafe的register0()方法中channel的registered已经被设置为trueif (ctx.channel().isRegistered()) {// This should always be true with our current DefaultChannelPipeline implementation.// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers// will be added in the expected order.if (initChannel(ctx)) {// We are done with init the Channel, removing the initializer now.removeState(ctx);}}
}private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.try {initChannel((C) ctx.channel());} catch (Throwable cause) {// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).// We do so to prevent multiple calls to initChannel(...).exceptionCaught(ctx, cause);} finally {ChannelPipeline pipeline ctx.pipeline();if (pipeline.context(this) ! null) {// 将handler从pipeline链表中移除掉pipeline.remove(this);}}return true;}return false;
}之前说过 handler()就是ServerBootstrap 的init方法中new 的ChannelInitializer。 而传入的参数如果当前channel为NioServerSocketChannel则传入的则是NioServerSocketChannel如果是NioSocketChannel则传入的也就是NioSocketChannel。
public void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline ch.pipeline();ChannelHandler handler config.handler();if (handler ! null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});
}上述ch.eventLoop().execute() 方法不就是又向NioEventLoop中加任务嘛之前分析过那么多select(…)相关的方法此时终于用到了此时
new Runnable() {Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});
}被当作任务添加到eventLoop的任务队列中最终这个任务又在runAllTasks(…)方法中被执行。当然handler执行完毕将被移除掉。 因为在移除handler()时也会调用pipeline.context(this)方法因此将context()方法在remove()时再做分析 接下来看移除handler的代码
public final ChannelPipeline remove(ChannelHandler handler) {remove(getContextOrDie(handler));return this;
}private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {AbstractChannelHandlerContext ctx (AbstractChannelHandlerContext) context(handler);if (ctx null) {throw new NoSuchElementException(handler.getClass().getName());} else {return ctx;}
}public final ChannelHandlerContext context(ChannelHandler handler) {if (handler null) {throw new NullPointerException(handler);}AbstractChannelHandlerContext ctx head.next;for (;;) {if (ctx null) {return null;}if (ctx.handler() handler) {return ctx;}ctx ctx.next;}
} 上述过程中context()方法是查找handler在pipeline链表中的过程因为pipeline链表中存储的并不是handler 本身而是AbstractChannelHandlerContext只是handler属性是我们需要执行的任务而已因此上述context()方法需要从Head 开始向后遍历直到Tail。中途所有的AbstractChannelHandlerContext的handler是否和自己传入的handler相等如果相等则证明找到了handler对应的AbstractChannelHandlerContext取出AbstractChannelHandlerContext并调用remove()方法将其移除掉。
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {assert ctx ! head ctx ! tail;synchronized (this) {remove0(ctx);// If the registered is false it means that the channel was not registered on an eventloop yet.// In this case we remove the context from the pipeline and add a task that will call// ChannelHandler.handlerRemoved(...) once the channel is registered.if (!registered) {callHandlerCallbackLater(ctx, false);return ctx;}EventExecutor executor ctx.executor();if (!executor.inEventLoop()) {executor.execute(new Runnable() {Overridepublic void run() {callHandlerRemoved0(ctx);}});return ctx;}}callHandlerRemoved0(ctx);return ctx;}
} 当然在remove()方法中并没有立即移除而是调用remove0()来实现真正的移除操作。
private static void remove0(AbstractChannelHandlerContext ctx) {AbstractChannelHandlerContext prev ctx.prev;AbstractChannelHandlerContext next ctx.next;prev.next next;next.prev prev;
}remove0()的实现逻辑很简单其实就是双向链表中移除元素的操作。 Pipeline add 方法的实现 之前对于Pipeline的addLast()方法有过简单的分析如下代码 。 下面就以 p.addLast(new ChannelInitializerChannel() {…} 为例子来详细的分析addLast()到底做了哪些事情 。
public final ChannelPipeline addLast(ChannelHandler... handlers) {return addLast(null, handlers);
}public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {if (handlers null) {throw new NullPointerException(handlers);}for (ChannelHandler h: handlers) {if (h null) {break;}addLast(executor, null, h);}return this;
}上述方法没有什么难的就是遍历handlers将每个Handler通过addLast()方法加入到pipeline中但需要注意的是EventExecutorGroup传入的值为null。接下来进入新的addLast()方法。
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);newCtx newContext(group, filterName(name, handler), handler);addLast0(newCtx);// If the registered is false it means that the channel was not registered on an eventLoop yet.// In this case we add the context to the pipeline and add a task that will call// ChannelHandler.handlerAdded(...) once the channel is registered.if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}// 如果已经注册了直接调用handler()的handlerAdded(this);方法callHandlerAdded0(newCtx);return this;
}上面有一个有意思的方法checkMultiplicity() 在看这个方法之前先来了解Sharable注解的基本使用。 Sharable 注解的基本用法 在使用没有标注 Sharable 的 handler 时在添加到到一个 pipeline 中时你需要每次都创建一个新的 handler 实例因为它的成员变量是不可分享的。所以正确的做法应该是 ch.pipeline.addLast(new ...)。 当自定义一个 handler 时要考虑应不应该为其加 Sharable 注解。如果该 handler 只是用来打印一些消息那么可以加上该注解因为它被共享时每个pipeline的信息不会错乱。比如Netty 自带的 LoggingHandler 就只是打印日志消息就加上了 Sharable 注解。相反如果你定义的 handler 里面需要保存一些信息供使用该 handler 的 pipeline 使用如果它是线程安全的也可以加上 Sharable 注解。如果它不是线程安全的加上 Sharable 注解那么如果有多个 pipeline 共用一个 handler 实例 就可能导致不同 pipeline 之间的信息混乱。 接下来进入checkMultiplicity()方法看它是怎样控制如果不加SharableHandler 是不能被共享的。
private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h (ChannelHandlerAdapter) handler;if (!h.isSharable() h.added) {throw new ChannelPipelineException(h.getClass().getName() is not a Sharable handler, so cant be added or removed multiple times.);}h.added true;}
}public boolean isSharable() {/*** Cache the result of {link Sharable} annotation detection to workaround a condition. We use a* {link ThreadLocal} and {link WeakHashMap} to eliminate the volatile write/reads. Using different* {link WeakHashMap} instances per {link Thread} is good enough for us and the number of* {link Thread}s are quite limited anyway.** See a hrefhttps://github.com/netty/netty/issues/2289#2289/a.*/Class? clazz getClass();MapClass?, Boolean cache InternalThreadLocalMap.get().handlerSharableCache();Boolean sharable cache.get(clazz);if (sharable null) {sharable clazz.isAnnotationPresent(Sharable.class);cache.put(clazz, sharable);}return sharable;
} 判断能不能被共享有一个重要的条件就是handler有没有实现ChannelHandlerAdapter接口如果没有实现就不需要判断有没有加Sharable注解而!h.isSharable() h.added的判断条件就是如果handler已经被加入到pipeline但handler没有配置Sharable注解则抛出提示信息为 is not a Sharable handler, so can’t be added or removed multiple times.异常。 当然在isSharable()方法中还看到了什么呢就是InternalThreadLocalMap对象线程范围内的变量共享将类中是否有Sharable注解的结果存储到线程共享变量中下次再需要判断类中是否有Sharable注解从缓存中取出即可。 接下来看addLast()方法中的newContext()方法 。
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {private final ChannelHandler handler;DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, handler.getClass());this.handler handler;}Overridepublic ChannelHandler handler() {return handler;}
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,String name, Class? extends ChannelHandler handlerClass) {this.name ObjectUtil.checkNotNull(name, name);this.pipeline pipeline;this.executor executor;this.executionMask mask(handlerClass);// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.ordered executor null || executor instanceof OrderedEventExecutor;
}DefaultChannelHandlerContext有一个最大的特点就是有一个handler属性就是将任务保存到这个属性中。接下来看addLast0()方法这个方法和之前的remove0()方法很像,接下来进入addLast0()方法。
// 通过addLast()方法追加进去的编码器和解码器都位于 TailContext的前面。
private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev tail.prev;newCtx.prev prev;newCtx.next tail;prev.next newCtx;tail.prev newCtx;
}最终pipeline的结构如下。 接下来看register0()的safeSetSuccess()方法。
protected final void safeSetSuccess(ChannelPromise promise) {if (!(promise instanceof VoidChannelPromise) !promise.trySuccess()) {logger.warn(Failed to mark a promise as success because it is done already: {}, promise);}
}那这个方法调用会有什么作用呢 之前写过一篇博客 Netty 之 DefaultPromise 源码解析 就是关于ChannelPromise的使用的看明白那篇博客再来分析safeSetSuccess()方法就容易多了。再来看register0()方法参数ChannelPromise从何而来。 来源于SingleThreadEventLoop的register(Channel channel) 方法。 同时又作为参数返回到ServerBootstrap方法的doBind()方法中。 new DefaultChannelPromise(channel, this)又添加了监听器
new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause future.cause();if (cause ! null) {// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.// 注册失败处理响应给主线程promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();// 只有成功后才能绑定doBind0(regFuture, channel, localAddress, promise);}}
}根据之前DefaultPromise的基本特性当调用safeSetSuccess()方法后会触发所有监听器的operationComplete()方法调用 。 当然也会触发NettyServer中我们自己声明的ChannelFutureListener的operationComplete()调用 。 当然operationComplete()方法的调用也是有先后顺序的越先声明ChannelFutureListener它的operationComplete()方法就越先被调用 。 接下来看doBind0()具体做了哪些事情 。
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}发现没有又创建了一个任务添加到EventLoop的任务队列中. 因为在AbstractChannel的register0()方法中已经被设置为true。因此if (regFuture.isSuccess()) 判断将返回true进入channel.bind(localAddress, promise)方法 。
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);
}public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);
} 调用channel的bind()方法实际上是调用了pipeline的bind()方法而调用pipeline的bind()方法实际上是从tail向前调用所有流水线上的bind()方法 。 接下来进入
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {if (localAddress null) {throw new NullPointerException(localAddress);}if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}final AbstractChannelHandlerContext next findContextOutbound(MASK_BIND);EventExecutor executor next.executor();if (executor.inEventLoop()) {next.invokeBind(localAddress, promise);} else {safeExecute(executor, new Runnable() {Overridepublic void run() {next.invokeBind(localAddress, promise);}}, promise, null);}return promise;
}findContextInbound()方法是从pipeline链表中从前向后调用而findContextOutbound()方法是从链表从后向前调用 。
private AbstractChannelHandlerContext findContextOutbound(int mask) {AbstractChannelHandlerContext ctx this;do {// 从后向前查找ctx ctx.prev;//如果ctx.executionMask mask 0 // 则证明handler 的 mask对应的方法上设置了Skip注解 } while ((ctx.executionMask mask) 0);return ctx;
}上面关于ctx.executionMask mask 应该怎样理解呢 先弄明白ctx的executionMask参数是如何初始化的。 这个要回溯到DefaultChannelHandlerContext的构造方法中。 请看AbstractChannelHandlerContext其中加粗代码this.executionMask mask(handlerClass);就是初始化executionMask的值。那进入mask()方法中看executionMask是如何计算得来的。
static int mask(Class? extends ChannelHandler clazz) {// Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast// lookup in the future.MapClass? extends ChannelHandler, Integer cache MASKS.get();Integer mask cache.get(clazz);if (mask null) {mask mask0(clazz);cache.put(clazz, mask);}return mask;
}mask()方法的逻辑还是很简单的根据类类型从缓存中查找如果缓存中不存在则调用mask0()方法获取 。
final class ChannelHandlerMask {private static final InternalLogger logger InternalLoggerFactory.getInstance(ChannelHandlerMask.class);static final int MASK_EXCEPTION_CAUGHT 1;static final int MASK_CHANNEL_REGISTERED 1 1;static final int MASK_CHANNEL_UNREGISTERED 1 2;static final int MASK_CHANNEL_ACTIVE 1 3;static final int MASK_CHANNEL_INACTIVE 1 4;static final int MASK_CHANNEL_READ 1 5;static final int MASK_CHANNEL_READ_COMPLETE 1 6;static final int MASK_USER_EVENT_TRIGGERED 1 7;static final int MASK_CHANNEL_WRITABILITY_CHANGED 1 8;static final int MASK_BIND 1 9;static final int MASK_CONNECT 1 10;static final int MASK_DISCONNECT 1 11;static final int MASK_CLOSE 1 12;static final int MASK_DEREGISTER 1 13;static final int MASK_READ 1 14;static final int MASK_WRITE 1 15;static final int MASK_FLUSH 1 16;private static final int MASK_ALL_INBOUND MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED |MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;private static final int MASK_ALL_OUTBOUND MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;private static final FastThreadLocalMapClass? extends ChannelHandler, Integer MASKS new FastThreadLocalMapClass? extends ChannelHandler, Integer() {Overrideprotected MapClass? extends ChannelHandler, Integer initialValue() {return new WeakHashMapClass? extends ChannelHandler, Integer(32);}};private static int mask0(Class? extends ChannelHandler handlerType) {int mask MASK_EXCEPTION_CAUGHT;try {if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {mask | MASK_ALL_INBOUND;if (isSkippable(handlerType, channelRegistered, ChannelHandlerContext.class)) {mask ~MASK_CHANNEL_REGISTERED;}...}if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {mask | MASK_ALL_OUTBOUND;if (isSkippable(handlerType, bind, ChannelHandlerContext.class,SocketAddress.class, ChannelPromise.class)) {mask ~MASK_BIND;}... }if (isSkippable(handlerType, exceptionCaught, ChannelHandlerContext.class, Throwable.class)) {mask ~MASK_EXCEPTION_CAUGHT;}} catch (Exception e) {PlatformDependent.throwException(e);}return mask;}SuppressWarnings(rawtypes)private static boolean isSkippable(final Class? handlerType, final String methodName, final Class?... paramTypes) throws Exception {return AccessController.doPrivileged(new PrivilegedExceptionActionBoolean() {Overridepublic Boolean run() throws Exception {Method m;try {m handlerType.getMethod(methodName, paramTypes);} catch (NoSuchMethodException e) {logger.debug(Class {} missing method {}, assume we can not skip execution, handlerType, methodName, e);return false;}// 如果方法存在并且方法上有Skip注解则返回true return m ! null m.isAnnotationPresent(Skip.class);}});}private ChannelHandlerMask() { }Target(ElementType.METHOD)Retention(RetentionPolicy.RUNTIME)interface Skip {}
}如果ChannelHandler的bind()方法上配置了Skip注解其他方法都没有配置Skip注解则executionMask的计算方式 。 首先 MASK_ALL_OUTBOUND转化为二进制为 11111111000000001 ~MASK_BIND 11111111111111111111110111111111 mask ~MASK_BIND; 等于
1 1111 1110 0000 0001 1 1111 1101 1111 1111 1 1111 1100 0000 0001
因此得到executionMask mask ~MASK_BIND 的二进制数为 1 1111 1100 0000 0001 而 MASK_BIND的值转化为二进制为 0 0000 0010 0000 0000
因此再来理解 findContextOutbound()方法中的while条件ctx.executionMask mask 就好理解了。 假如Handler的bind()方法配置了Skip 注解其他方法都没有配置Skip注解 则它的executionMask转化为二进制为 1 1111 1100 0000 0001 而 MASK_BIND的二进制码为 0 0000 0010 0000 0000 那两者二进制相与值为0
1 1111 1100 0000 0001 0 0000 0010 0000 0000 0 0000 0000 0000 0000
在本例findContextOutbound() 方法中只要链表中的节点对应的handler的bind()方法加了Skip注解则跳过当前节点 继续向前查找 。 当然啦上面两行加粗代码的判断也需要注意 ChannelInboundHandler.class.isAssignableFrom(handlerType) 和ChannelOutboundHandler.class.isAssignableFrom(handlerType)也就是说如果类只有是ChannelInboundHandler的子类拥有MASK_ALL_INBOUND 同理类只有继承ChannelOutboundHandler才能拥有MASK_ALL_OUTBOUND 。 先来看TailContext的类关系 。 再来看ServerBootstrapAcceptor的类关系 。 TailContext和ServerBootstrapAcceptor都是ChannelInboundHandler的子类而它们的executionMask MASK_BIND 0因此真正调用的是HeadContext的bind()方法 。
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) {try {((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}} else {bind(localAddress, promise);}
}public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);
}对于NioServerSocketChannel的unsafe是 NioMessageUnsafe进入NioMessageUnsafe的bind()方法 。
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {assertEventLoop();if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}// See: https://github.com/netty/netty/issues/576if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) localAddress instanceof InetSocketAddress !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() !PlatformDependent.isWindows() !PlatformDependent.maybeSuperUser()) {// Warn a user about the fact that a non-root user cant receive a// broadcast packet on *nix if the socket is bound on non-wildcard address.logger.warn(A non-root user cant receive a broadcast packet if the socket is not bound to a wildcard address; binding to a non-wildcard address ( localAddress ) anyway as requested.);}boolean wasActive isActive();try {// 模板设计模式调用子类的NioServerSocketChannel的doBind()方法doBind(localAddress);} catch (Throwable t) {// 绑定失败回调safeSetFailure(promise, t);closeIfClosed();return;}// 从非活跃状态到活跃状态触发了active事件如果之前没有激活调用bind()方法后就已经激活了激活后需要调用fireChannelActive 方法 。 if (!wasActive isActive()) {invokeLater(new Runnable() {Overridepublic void run() {// 第一次激活需要调用流水线中handler的channelActive 方法 pipeline.fireChannelActive();}});}// 绑定成功回调通知safeSetSuccess(promise);
}模板设计模式根据不同的Channel 调用它的bind()方法 。
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}这里以NioServerSocketChannel为例真正调用的是ServerSocketChannelImpl的bind()方法 。 之前的例子中bind()方法是这样写的。 先来分析ServerSocketChannel serverSocket ServerSocketChannel.open();这一行代码。
ServerSocketChannel serverSocket ServerSocketChannel.open();public static ServerSocketChannel open() throws IOException {return SelectorProvider.provider().openServerSocketChannel();
}public ServerSocketChannel openServerSocketChannel() throws IOException {return new ServerSocketChannelImpl(this);
}ServerSocketChannel.open()方法实际上创建的是ServerSocketChannelImpl对象 。接下来看serverSocket.socket().bind(new InetSocketAddress(9000),1024);这一行代码 。
serverSocket.socket().bind(new InetSocketAddress(9000),1024);public ServerSocket socket() {synchronized(this.stateLock) {if (this.socket null) {this.socket ServerSocketAdaptor.create(this);}return this.socket;}
}public class ServerSocketAdaptor extends ServerSocket {private final ServerSocketChannelImpl ssc;public static ServerSocket create(ServerSocketChannelImpl var0) {try {return new ServerSocketAdaptor(var0);} catch (IOException var2) {throw new Error(var2);}}private ServerSocketAdaptor(ServerSocketChannelImpl var1) throws IOException {this.ssc var1;}
}socket()最终创建的是ServerSocketAdaptor对象而他的ssc属性就是ServerSocketChannelImpl自身因此调用bind()方法实际上又是调用ServerSocketAdaptor的bind()方法 进入ServerSocketAdaptor的bind()方法 。
public void bind(SocketAddress var1, int var2) throws IOException {if (var1 null) {var1 new InetSocketAddress(0);}try {this.ssc.bind((SocketAddress)var1, var2);} catch (Exception var4) {Net.translateException(var4);}
}最终又是调用ssc的bind()方法从之前的分析逻辑中得知ssc就是ServerSocketChannelImpl因此绕了一大圈 ServerSocketChannel serverSocket ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(9000),1024); 这两行代码实际上调用的是ServerSocketChannelImpl的bind()方法 。 现在应该对bind()方法有所理解了吧如果SocketChannel是第一次激活将调用管道中的所有handler的ChannelActive()方法 而激活方法是从流水线从前向后调用因此下面方法中首先传入的值为head。
public final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(head);return this;
}接下来进入fireChannelActive()方法 首先进入HeadContext的
static void invokeChannelActive(final AbstractChannelHandlerContext next) {EventExecutor executor next.executor();if (executor.inEventLoop()) {next.invokeChannelActive();} else {executor.execute(new Runnable() {Overridepublic void run() {next.invokeChannelActive();}});}
}private void invokeChannelActive() {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelActive(this);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelActive();}
}public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead();
}HeadContext的channelActive内部先调用它的fireChannelActive方法 后面再来分析readIfIsAutoRead的使用。
public ChannelHandlerContext fireChannelActive() {invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));return this;
}我们之前分析过一个 findContextOutbound()而findContextOutbound()方法是从后向前查找findContextInbound()方法则是从流水线中从前向后查找 。 我们之前分析过 HeadContext是实现了ChannelOutboundHandler, ChannelInboundHandler而TailContext实现了ChannelInboundHandler但是在findContextInbound()方法中do while循环中首先跳过HeadContext因为HeadContext的channelActive()方法已经调用过了而ServerBootstrapAcceptor是实现了ChannelInboundHandler接口但遗憾的是ServerBootstrapAcceptor所继承的类ChannelInboundHandlerAdapter的channelActive方法配置了Skip注解。
Skip
Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelActive();
}因此最终调用的是TailContext的fireChannelActive()方法 。
public void channelActive(ChannelHandlerContext ctx) {onUnhandledInboundChannelActive();
}protected void onUnhandledInboundChannelActive() {}在TailContext的onUnhandledInboundChannelActive()方法中什么也没有做只是一个空实现到这里 channelActive()方法已经调用完毕但是在HeadContext的channelActive()方法中readIfIsAutoRead()方法还没有分析这个方法做了哪些事情呢 进入这个方法 。
private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {channel.read();}
}首先要明白channel.config()是什么 在NioServerSocketChannel创建时是不是初始化了一个config属性。 而DefaultChannelConfig的autoRead属性的默认值为1以NioServerSocketChannel为例子默认情况下channel.config().isAutoRead()的值为true。 因此会调用channel.read()方法。
public Channel read() {pipeline.read();return this;
}此方法的作用为读取通道数据并且启动入站处理具体来说从内部的Java NIO Channel 通道读取数据然后启动内部的Pipeline流水线启数据读取的入站处理此方法的返回通道自身用于链式调用。
public final ChannelPipeline read() {tail.read();return this;
}上面的方法还是很简单的直接进入TailContext的read()方法 。
public ChannelHandlerContext read() {// 沿着流水线从后向前调用read方法final AbstractChannelHandlerContext next findContextOutbound(MASK_READ);EventExecutor executor next.executor();if (executor.inEventLoop()) {next.invokeRead();} else {Tasks tasks next.invokeTasks;if (tasks null) {next.invokeTasks tasks new Tasks(next);}executor.execute(tasks.invokeReadTask);}return this;
} 沿着流水线从后向前调用read()方法最终调用HeadContext的read()方法 。
private void invokeRead() {if (invokeHandler()) {try {((ChannelOutboundHandler) handler()).read(this);} catch (Throwable t) {notifyHandlerException(t);}} else {read();}
}public void read(ChannelHandlerContext ctx) {unsafe.beginRead();
}在HeadContext的read()方法中又调用了unsafe的beginRead()方法如果是NioServerSocket的话unsafe是NioMessageChannel。接下来进入beginRead()方法 。
public final void beginRead() {assertEventLoop();if (!isActive()) {return;}try {doBeginRead();} catch (final Exception e) {invokeLater(new Runnable() {Overridepublic void run() {pipeline.fireExceptionCaught(e);}});close(voidPromise());}
}protected void doBeginRead() throws Exception {if (inputShutdown) {return;}super.doBeginRead();
}protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey this.selectionKey;if (!selectionKey.isValid()) {return;}readPending true;final int interestOps selectionKey.interestOps();if ((interestOps readInterestOp) 0) {selectionKey.interestOps(interestOps | readInterestOp);}
}其实上面的 doBeginRead()方法的最终目的就是上面加粗代码修改selectionKey的监听事件 。 readInterestOp的值是什么呢如果是NioServerSocketChannel则是SelectionKey.OP_ACCEPT接收事件 。 如果是NioSocketChannelreadInterestOp的值为SelectionKey.OP_READ。 interestOps(int ops)方法可以修改事件列表对于NioServerSocketChannel等同于下面红框代码注册SelectionKey.OP_ACCEPT事件 。 接着请看
《Netty 源码解析下》