当前位置: 首页 > news >正文

网站名称及域名网站运营服务商

网站名称及域名,网站运营服务商,福州网站建设公司哪家比较好,静态网站公用头部 调用标题Netty场景及其原理 Netty简化Java NIO的类库的使用#xff0c;包括Selector、 ServerSocketChannel、 SocketChannel、ByteBuffer#xff0c;解决了断线重连、 网络闪断、心跳处理、半包读写、 网络拥塞和异常流的处理等。Netty拥有高性能、 吞吐量更高#xff0c;延迟更低…Netty场景及其原理 Netty简化Java NIO的类库的使用包括Selector、 ServerSocketChannel、 SocketChannel、ByteBuffer解决了断线重连、 网络闪断、心跳处理、半包读写、 网络拥塞和异常流的处理等。Netty拥有高性能、 吞吐量更高延迟更低减少资源消耗最小化不必要的内存复制等优点。Netty通过都作为基础的TCP/UDP的基础通信组件如Dubbo、RocketMQ、Lettuce、ServiceComb等。 Netty Ractor线程模型 Reactor可以理解为Thread通过死循环的方式处理IO复用返回的事件列表(Socket的Read、Write)。 Netty内部会使用多个Ractor也就是意味着会使用多Epoll同时运行。 while (true) { eventKeys epoll.pool(timeOut);process(eventKeys); }NioEventLoop Ractor的实现继承SingleThreadEventLoop内部Hold Thread和一个BlockQueue会死循环执行io.netty.channel.nio.NioEventLoop#run处理通过io.netty.channel.nio.NioEventLoop#register注册的事件。 private final QueueRunnable taskQueue;// taskQueueprivate final Thread thread; // 用于执行任务的单线程protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedHandler) {// ........// 线程工厂生成一个线程并添加Runnable任务最终内部执行SingleThreadEventExecutor的run方法// run将在子类中覆写thread threadFactory.newThread(new Runnable() {Overridepublic void run() {boolean success false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();// 执行父类中的run多态多态啊success true;} catch (Throwable t) {logger.warn(Unexpected exception from an event executor: , t);} finally {// ......}});//....................} SingleThreadEventExecutor.this.run() for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));// ....if (wakenUp.get()) {selector.wakeup();}default:// fallthrough}cancelledKeys 0;needsToSelectAgain false;final int ioRatio this.ioRatio;if (ioRatio 100) {try {processSelectedKeys(); // 处理Selctor就绪的任务} finally {// Ensure we always run tasks.runAllTasks(); // 事件循环主要每次EventLoop完成后都执行一次可以从外部添加Task}} else {final long ioStartTime System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// ..........}每新建一个Channel只会选择只选择一个 NioEventLoop 与其绑定。所以说Channel生命周期的所有事件处理都是线程独立的不同的 NioEventLoop线程之间不会发生任何交集。 select(wakenUp.getAndSet(false)),不断地轮询是否有IO事件发生并且在轮询的过程中不断检查是否有定时任务和普通任务保证了netty的任务队列中的任务得到有效执行轮询过程顺带用一个计数器避开了了jdk空轮询的bug过程清晰明了 Netty的任务分为三种 普通任务通过 NioEventLoop 的 execute() 方法向任务队列 taskQueue 中添加任务。例如 Netty 在写数据时会封装 WriteAndFlushTask 提交给 taskQueue。taskQueue 的实现类是多生产者单消费者队列 MpscChunkedArrayQueue在多线程并发添加任务时可以保证线程安全。 // NioEventLoop 继承 SingleThreadEventLoop 实现了Execute接口 public void NioEventLoop#execute(Runnable task) {boolean inEventLoop inEventLoop();addTask(task); // 添加任务到阻塞队列中在run方法中执行完IO的Select任务就会执行task其中schame提交的定时任务也会在这里执行 }定时任务通过调用 NioEventLoop 的 schedule() 方法向定时任务队列 scheduledTaskQueue 添加一个定时任务用于周期性执行该任务。例如心跳消息发送等。定时任务队列 scheduledTaskQueue 采用优先队列 PriorityQueue 实现。 尾部队列tailTasks 相比于普通任务队列优先级较低在每次执行完 taskQueue 中任务后会去获取尾部队列中任务执行。尾部任务并不常用主要用于做一些收尾工作例如统计事件循环的执行时间、监控信息上报等。 EventLoopGroup 可持有多个NioEventLoop和一个Exectors来全异步处理请求EventLoopGroup bossGroup new NioEventLoopGroup(1); NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory,final RejectedExecutionHandler rejectedExecutionHandler) nThread: 确定使用多少个NioEnventLoop每个EventLoop会占用一个Threadchildren new EventExecutor[nThreads];for (int i 0; i nThreads; i ) {boolean success false;try {children[i] newChild(executor, args);success true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException(failed to create a child event loop, e);} finally {}} /* 1、executor:异步执行的线程池不设置则默认使用new ThreadPerTaskExecutor(newDefaultThreadFactory())线程池的名字为 DefaultThreadFactory-#子增值2、selectorProvider生成IO复用类的工厂默认使用SelectorProvider.provider()3、selectStrategyFactory默认是DefaultSelectStrategy.INSTANCE控制Select几个方法执行的策略如果有就绪事件则直接处理否则 执行select等待4、rejectedExecutionHandler默认是io.netty.util.concurrent.RejectedExecutionHandlers直接抛出异常EventLoop是单个Thread除了执行Epoll.pool外还需要执行传入的Task如果阻塞队列满了或者Task执行失败则会调用此方法。Object... args传参调用方和被调用方前后定义好契约在使用的时候可以使用Index访问减少形参的编写。protected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);} */https://netty.io/ The Netty project is an effort to provide an asynchronous event-driven network application framework and tooling for the rapid development of maintainable high-performance and high-scalability protocol servers and clients. In other words, Netty is an NIO client server framework that enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server development. https://netty.io/3.8/guide/#architecture ServerBootstrap、Bootstrap 阅读源代码应该先从接口抽象类开始搞起来都是基于接口的编程模式执行的。 NioEventLoop-Reactor实现 EventLoop是reactor(定义selector-监听事件并注册回调方法-触发则调用对应的回调方法)模型的实现接口具体的实现类有NioEventLoop、EpollEventLoop等其中NioEventLoop使用最广泛因此重点讲解此处的原理。以下是NioEventLoop类继承关系图看似比较复杂但是从关键的几个方法入手就比较简单。图中可以看出NioEventLoop最终需要实现Executor#excute方法而excute方法会被外部类通过submit调用进而执行Runnable任务(外部可以调用多次执行多个Runnable任务但是最基本的事件循环任务是默认的任务始终会执行)。不用多想Runnable任务肯定通过new Thread(Runnable).start的形式被调用在某个线程中执行。因此如果要分析此类我们应该重点关注excute方法Thread如何创建、最终的Runable任务是如何被包装起来的。 NioEventLoop继承SingleThreadEventExectExecutor从名字中就可以看出此任务是在单线程中执行的其他所做的包装都是为了可以更加安全高效的执行任务下面我们一一分析首先看execute的具体实现。 execute具体实现 SingleThreadEventExecutor#execute对应具体的实现。 private final QueueRunnable taskQueue;// taskQueueprivate final Thread thread; // 用于执行任务的单线程Overridepublic void execute(Runnable task) {if (task null) {throw new NullPointerException(task);}boolean inEventLoop inEventLoop();// 当前线程正是执行EventLoop的Threadif (inEventLoop) {addTask(task); // 添加task} else {startThread(); // 这里会执行runnable方法addTask(task);if (isShutdown() removeTask(task)) {reject();}}if (!addTaskWakesUp wakesUpForTask(task)) {wakeup(inEventLoop);}} public boolean inEventLoop() {return inEventLoop(Thread.currentThread());}private void startThread() {if (STATE_UPDATER.get(this) ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {thread.start();}}}inEventLoop用来判断是否在运行的thread中添加新的task如果是的则直接将其添加到taskQueue中否则startThread并将task添加到taskQueue中。 startThread会通过cas操作判断thread是否已经start如果没有则启动。thread启动后会执行selector任务和用户自定义任务。如果在用户自定义任务中再创建任务则inEventLoop返回true。 thread是SingleThreadEventExecutor最重要的filed这里仅仅包含一个thread因此全部的任务都需要在此thread内部执行当SingleThreadEventExecutor被构造的时候会初始化threadthread中的Runnable包装了SingleThreadEventExecutor.this.run()方法主要实现逻辑在这个方法中而SingleThreadEventExecutor中protected abstract void run()是抽象方法具体的实现在NioEventLoop中继续分析具体实现。 protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks,RejectedExecutionHandler rejectedHandler) {// ........// 线程工厂生成一个线程并添加Runnable任务最终内部执行SingleThreadEventExecutor的run方法// run将在子类中覆写thread threadFactory.newThread(new Runnable() {Overridepublic void run() {boolean success false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();// 执行父类中的run多态多态啊success true;} catch (Throwable t) {logger.warn(Unexpected exception from an event executor: , t);} finally {// ......}});//....................}NioEventLoop#run才是最终的任务其过程如下select(wakenUp.getAndSet(false))是IO复用器其会返回就绪的事件并根据返回的结果处理processSelectedKeys()。runAllTasks()将执行其他的任务这个和前面的taskQueue域息息相关。也就是说这个thread中不仅仅可以处理selector的IO复用任务还可以中执行一些位于taskQueue中的Other Tasks。因此多出了一个变量ioRatio来控制IO复用任务和其他任务分别占用thread的比例。当ioRatio100的时候则执行processSelectedKeys后并执行全部的Other Tasks如果Other Tasks中的某个task比较耗时那么会影响selector的效率进而影响Netty的响应速度所以ioRatio默认为50这样处理完processSelectedKeys后可以控制执行Other Tasks的时间。 Overrideprotected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));// wakenUp.compareAndSet(false, true) is always evaluated// before calling selector.wakeup() to reduce the wake-up// overhead. (Selector.wakeup() is an expensive operation.)//// However, there is a race condition in this approach.// The race condition is triggered when wakenUp is set to// true too early.//// wakenUp is set to true too early if:// 1) Selector is waken up between wakenUp.set(false) and// selector.select(...). (BAD)// 2) Selector is waken up between selector.select(...) and// if (wakenUp.get()) { ... }. (OK)//// In the first case, wakenUp is set to true and the// following selector.select(...) will wake up immediately.// Until wakenUp is set to false again in the next round,// wakenUp.compareAndSet(false, true) will fail, and therefore// any attempt to wake up the Selector will fail, too, causing// the following selector.select(...) call to block// unnecessarily.//// To fix this problem, we wake up the selector again if wakenUp// is true immediately after selector.select(...).// It is inefficient in that it wakes up the selector for both// the first case (BAD - wake-up required) and the second case// (OK - no wake-up required).if (wakenUp.get()) {selector.wakeup();}default:// fallthrough}cancelledKeys 0;needsToSelectAgain false;final int ioRatio this.ioRatio;if (ioRatio 100) {try {processSelectedKeys(); // 处理Selctor就绪的任务} finally {// Ensure we always run tasks.runAllTasks(); // 事件循环主要每次EventLoop完成后都执行一次可以从外部添加Task}} else {final long ioStartTime System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// ..........}} select(wakenUp.getAndSet(false));首先通过openSelector()方法创建一个新的selector然后执行一个死循环只要执行过程中出现过一次并发修改selectionKeys异常就重新开始转移 具体的转移步骤为 1. 拿到有效的key 2. 取消该key在旧的selector上的事件注册 3. 将该key对应的channel注册到新的selector上 4. 重新绑定channel和新的key的关系转移完成之后就可以将原有的selector废弃后面所有的轮询都是在新的selector进行 最后我们总结reactor线程select步骤做的事情不断地轮询是否有IO事件发生并且在轮询的过程中不断检查是否有定时任务和普通任务保证了netty的任务队列中的任务得到有效执行轮询过程顺带用一个计数器避开了了jdk空轮询的bug过程清晰明了 由于篇幅原因下面两个过程将分别放到一篇文章中去讲述尽请期待 processSelectedKeys()中是处理网络事件的全部操作这是最重要的方法从这里可以看出Netty是如何封装select的。那就看看到底select是如何处理的。 private void processSelectedKeys() {if (selectedKeys ! null) {processSelectedKeysOptimized(selectedKeys.flip());} else {processSelectedKeysPlain(selector.selectedKeys());}}private SelectedSelectionKeySet selectedKeys; // 到底就绪的keys是如何被调用的哦 private Selector openSelector() {final Selector selector;try {selector provider.openSelector();} catch (IOException e) {throw new ChannelException(failed to open a new selector, e);}if (DISABLE_KEYSET_OPTIMIZATION) {return selector;}final SelectedSelectionKeySet selectedKeySet new SelectedSelectionKeySet();Object maybeSelectorImplClass AccessController.doPrivileged(new PrivilegedActionObject() {Overridepublic Object run() {try {return Class.forName(sun.nio.ch.SelectorImpl,false,PlatformDependent.getSystemClassLoader());} catch (ClassNotFoundException e) {return e;} catch (SecurityException e) {return e;}}});if (!(maybeSelectorImplClass instanceof Class) ||// ensure the current selector implementation is what we can instrument.!((Class?) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {if (maybeSelectorImplClass instanceof Exception) {Exception e (Exception) maybeSelectorImplClass;logger.trace(failed to instrument a special java.util.Set into: {}, selector, e);}return selector;}final Class? selectorImplClass (Class?) maybeSelectorImplClass;Object maybeException AccessController.doPrivileged(new PrivilegedActionObject() {Overridepublic Object run() {try {Field selectedKeysField selectorImplClass.getDeclaredField(selectedKeys);Field publicSelectedKeysField selectorImplClass.getDeclaredField(publicSelectedKeys);selectedKeysField.setAccessible(true);publicSelectedKeysField.setAccessible(true);selectedKeysField.set(selector, selectedKeySet);publicSelectedKeysField.set(selector, selectedKeySet);return null;} catch (NoSuchFieldException e) {return e;} catch (IllegalAccessException e) {return e;} catch (RuntimeException e) {// JDK 9 can throw an inaccessible object exception here; since Netty compiles// against JDK 7 and this exception was only added in JDK 9, we have to weakly// check the typeif (java.lang.reflect.InaccessibleObjectException.equals(e.getClass().getName())) {return e;} else {throw e;}}}});if (maybeException instanceof Exception) {selectedKeys null;Exception e (Exception) maybeException;logger.trace(failed to instrument a special java.util.Set into: {}, selector, e);} else {selectedKeys selectedKeySet;logger.trace(instrumented a special java.util.Set into: {}, selector);}return selector;}private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {for (int i 0;; i ) {final SelectionKey k selectedKeys[i];if (k null) {break;}// null out entry in the array to allow to have it GCed once the Channel close// See https://github.com/netty/netty/issues/2363selectedKeys[i] null;final Object a k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {SuppressWarnings(unchecked)NioTaskSelectableChannel task (NioTaskSelectableChannel) a;processSelectedKey(k, task);}if (needsToSelectAgain) {// null out entries in the array to allow to have it GCed once the Channel close// See https://github.com/netty/netty/issues/2363for (;;) {i;if (selectedKeys[i] null) {break;}selectedKeys[i] null;}selectAgain();// Need to flip the optimized selectedKeys to get the right reference to the array// and reset the index to -1 which will then set to 0 on the for loop// to start over again.//// See https://github.com/netty/netty/issues/1523selectedKeys this.selectedKeys.flip();i -1;}}}// 处理SelectionKey的最终方法private static void processSelectedKey(SelectionKey k, NioTaskSelectableChannel task) {int state 0;try {task.channelReady(k.channel(), k);state 1;} catch (Exception e) {k.cancel();invokeChannelUnregistered(task, k, e);state 2;} finally {switch (state) {case 0:k.cancel();invokeChannelUnregistered(task, k, null);break;case 1:if (!k.isValid()) { // Cancelled by channelReady()invokeChannelUnregistered(task, k, null);}break;}}}SingleThreadEventExecutor#runAllTasks()task全部存储在taskQueue中这里通过for循环执行全部的Task。runAllTasks(long timeoutNanos)则会记录任务运行的时候如果超时则退出防止Task执行时间过长。到此execute内部大概的实现逻辑讲清楚了明白任务都是在execute处理先处理selector事件然后处理用户添加的任务。 protected boolean runAllTasks() {boolean fetchedAll;do {fetchedAll fetchFromScheduledTaskQueue();Runnable task pollTask();// 从taskQueue头部获取任务if (task null) {return false;}for (;;) {try {task.run();// 执行task} catch (Throwable t) {logger.warn(A task raised an exception., t);}task pollTask();if (task null) {break;}}} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.lastExecutionTime ScheduledFutureTask.nanoTime();return true;}private boolean fetchFromScheduledTaskQueue() {long nanoTime nanoTime();Runnable scheduledTask pollScheduledTask(nanoTime);while (scheduledTask ! null) {if (!taskQueue.offer(scheduledTask)) {// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.scheduledTaskQueue().add((ScheduledFutureTask?) scheduledTask);return false;}scheduledTask pollScheduledTask(nanoTime);}return true;} protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue();Runnable task pollTask();if (task null) {return false;}final long deadline ScheduledFutureTask.nanoTime() timeoutNanos;long runTasks 0;long lastExecutionTime;for (;;) {try {task.run();} catch (Throwable t) {logger.warn(A task raised an exception., t);}runTasks ;// Check timeout every 64 tasks because nanoTime() is relatively expensive.// XXX: Hard-coded value - will make it configurable if it is really a problem.if ((runTasks 0x3F) 0) {lastExecutionTime ScheduledFutureTask.nanoTime();if (lastExecutionTime deadline) {break;}}task pollTask();if (task null) {lastExecutionTime ScheduledFutureTask.nanoTime();break;}}this.lastExecutionTime lastExecutionTime;return true;}NioEventLoopGroup 从命名可以看出是用来管理NioEventLoop集合的在多个线程里面跑多个EventLoop。分析这个也很关键。 Channel Channel的具体实现内部包含了很多抽象类Channel对应一条具体的连接 NioServerSocketChannel NioServerSocketChannel是Channel的具体实现内部包含了很多抽象类Channel对应一条具体的连接。包含ChannelHandler ChannelHandler ChannelHandler Netty各种编码的处理最终肯定都实现此类。 ChannelHandler 定义了Handler最基本接口。 void handlerAdded(ChannelHandlerContext ctx) throws Exception; void handlerRemoved(ChannelHandlerContext ctx) throws Exception; void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;InheritedDocumentedTarget(ElementType.TYPE)Retention(RetentionPolicy.RUNTIME)interface Sharable {// no value}ChannelHandler接口含有三个方法分别对应ChannelHandler在成功添加、成功移除、发生异常的时候调用。Sharable注解后续好好学习 ChannelInboundHandler 这就是面向接口的抽象编程高度抽象使得类与类直接可以做到松耦合。继承ChannelHandler接口并增加了可读事件需要实现的几个方法。分别对应Handler被成功Register、成功Unregister、Read、ReadComplete等。 ChannelOutboundHandler 这就是面向接口的抽象编程高度抽象使得类与类直接可以做到松耦合。继承ChannelHandler接口并增加了可写事件需要实现的几个方法。当bind成功、connect成功或者close的时候对应的回调方法会被执行。 ChannelHandlerContext 每个ChannelHandler最终会存放在ChannelHandlerContext中其中DefaultChannelHandlerContext是一种接口的一种实现为了让Pipe和ChannelHandler可以交互通过Context类将二者通过组合模式弄到一起。最重要的是AbstractChannelHandlerContext中包含了下面两个Filed通过者两个域最终将Pipe中的Handler通过双向链表连接在一起。 volatile AbstractChannelHandlerContext next;//next和prev构成双向链表存储handlervolatile AbstractChannelHandlerContext prev;private final boolean inbound; // 当前Handler属于inbound还是outbound判断的方式也很简单private final boolean outbound;private final DefaultChannelPipeline pipeline;// 存储当前pipe的引用 Nprivate final String name; // Hander的名字必须唯一private final boolean ordered;private static boolean isInbound(ChannelHandler handler) {return handler instanceof ChannelInboundHandler;}private static boolean isOutbound(ChannelHandler handler) {return handler instanceof ChannelOutboundHandler;} 判断类型也很简单直接通过instanceof判断继承的类型即可。 DefaultChannelHandlerContext中存放了handler的private final ChannelHandler handler句柄客户端实现对应的handler方法将存储在这里最后被调用。 每个有效连接会是一个Channel Channel存储了和连接相关的全部信息具体的实现包括NioSocket和NioServerSocketChannelChannelInitializer用于辅助初始化Channel。这里面的实现都包括大量的成员域。 逻辑相当的复杂。 包括如下重要 EventLoop eventLoop(); // Channel位于哪个事件循环 ChannelPipeline pipeline();// Channel对应的pipelinePipe管道顾名思义这个类就用来存储对应的处理方法的当某个事件就绪后会依次调用这个里面的方法处理。 在Netty中一条有效连接客户端和服务器某个端口的成功连接叫做Channel当Channel中事件就绪后调用的处理逻辑叫做ChannelPipeline里面存放的都是ChannelHandler使用双向链表将ChannelHandler连接起来。其中双向链表的节点叫做ChannelHandlerContext 构造方法是如何传递进去的 Pipeline中会存储AbstractChannelHandlerContext根据传入的Handler来辨别是inbound还是outbound。 ChannelPipeline 每个Channel就绪事件可分为inbound event对应可读读入之后调用的Handler对应ChannelInboundHandler和outbound event可写写出之前调用的Handler对应ChannelOutboundHandler。ChannelPipeline称为管道通过双向链表存储了这些ChannelHandler包装在ChannelHandlerContext中类。最后事件就绪将遍历Pipe上相应的Handler处理。 ChannelPipeline 定义了Pipe抽象的方法有如下重要方法 // 链表结尾添加hander ChannelPipeline addLast(String name, ChannelHandler handler); // 链表头部添加hander ChannelPipeline addFirst(String name, ChannelHandler handler);DefaultChannelPipeline ChannelPipeline的具体实现内部定义了双向链表的头节点和尾部节点。后续每次将Handler添加到DefaultChannelPipeline上都会将Handler包装成ChannelHandlerContext并插入到双向链表中下面将详细分析头节点、尾部节点以及增加和删除Handler的过程。 在这里插入图片描述 final AbstractChannelHandlerContext head;// 尾节点final AbstractChannelHandlerContext tail;// 头节点private final Channel channel; // 关联Pipe对应的channel protected DefaultChannelPipeline(Channel channel) {this.channel ObjectUtil.checkNotNull(channel, channel);tail new TailContext(this);// head new HeadContext(this);head.next tail;tail.prev head;}//内部类标记头结点final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {}//内部类标记尾结点final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {TailContext(DefaultChannelPipeline pipeline) {super(pipeline, null, TAIL_NAME, true, false);setAddComplete();}} 当DefaultChannelPipeline构造的时候会自动创建tail和head节点后续的Handler都加入这个双向链表。具体细节地方先不深究这里先大概了解原理先。 public final ChannelPipeline addLast(String name, ChannelHandler handler) {return addLast(null, name, handler);} Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {checkMultiplicity(handler);newCtx newContext(group, filterName(name, handler), handler);addLast0(newCtx);// If the registered is false it means that the channel was not registered on an eventloop yet.// In this case we add the context to the pipeline and add a task that will call// ChannelHandler.handlerAdded(...) once the channel is registered.if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor newCtx.executor();if (!executor.inEventLoop()) {newCtx.setAddPending();executor.execute(new Runnable() {Overridepublic void run() {callHandlerAdded0(newCtx);}});return this;}}callHandlerAdded0(newCtx);return this;}private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev tail.prev;newCtx.prev prev;newCtx.next tail;prev.next newCtx;tail.prev newCtx;} 从上面可以看出addLast添加一个Handler都会经过以下几步骤 checkMultiplicity首先判断Handler是否之前已经加入过链表如果不为Sharable且之前已经加入则抛出异常。isSharable()的逻辑也比较简单通过反射的方式或者Handler是否加了Sharable注解。为了性能的极致isSharable()中竟然还使用了获取Map缓存状态减少反射的开支。 学习什么是WeakHashMap? 学习ThreadLocal private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h (ChannelHandlerAdapter) handler;if (!h.isSharable() h.added) {throw new ChannelPipelineException(h.getClass().getName() is not a Sharable handler, so cant be added or removed multiple times.);}h.added true;}}public boolean isSharable() {/*** Cache the result of {link Sharable} annotation detection to workaround a condition. We use a* {link ThreadLocal} and {link WeakHashMap} to eliminate the volatile write/reads. Using different* {link WeakHashMap} instances per {link Thread} is good enough for us and the number of* {link Thread}s are quite limited anyway.** See a hrefhttps://github.com/netty/netty/issues/2289#2289/a.*/Class? clazz getClass();//获取the runtime class of this ObjectMapClass?, Boolean cache InternalThreadLocalMap.get().handlerSharableCache(); // 获取存储在当前线程ThreadLocal中的WeakHashMap防止竞争关系因为Handler通常会在一个线程中加。这里需要Boolean sharable cache.get(clazz);//从WeakHashMap中获取状态if (sharable null) {//没有缓存则获取状态并缓存。sharable clazz.isAnnotationPresent(Sharable.class);//通过反射获取注解cache.put(clazz, sharable);}return sharable;}newContext则会通过Handler并创建DefaultChannelHandlerContext。filterName用于确保Handler的名字是唯一的。 newCtx newContext(group, filterName(name, handler), handler); private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);}DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {super(pipeline, executor, name, isInbound(handler), isOutbound(handler));if (handler null) {throw new NullPointerException(handler);}this.handler handler;}通过handler会构造DefaultChannelHandlerContextisInbound(handler), isOutbound(handler)则用于判断当前Handler对应in event还是out event。通过Handler继承哪个类直接判断属于什么类型。 private static boolean isInbound(ChannelHandler handler) {return handler instanceof ChannelInboundHandler;}private static boolean isOutbound(ChannelHandler handler) {return handler instanceof ChannelOutboundHandler;} addLast0(newCtx)将DefaultChannelHandlerContext加入双向链表保存。 private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev tail.prev;newCtx.prev prev;newCtx.next tail;prev.next newCtx;tail.prev newCtx;}后续的逻辑则用于执行Handler加入成功之后的回调方法这个回调方法在客户端实现Handler类的时候通过Override接口方法则在这里就会被成功调用可以用于记录日志信息。这就是通过接口编程的好处这里是面向接口的编程。回调方法的调用形式有两种如果在EventLoop线程中添加的Handler则会将添加成功的回调方法封装成Task任务的模式。 ChannelInitializer 前面已经很清楚的讲解了Channel、ChannelHandler、ChannelPipe而这里的ChannelInitializer属于工具类用客户更加方便的初始化Channel。 new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler).addLast(defaultEventExecutorGroup,encoder,new NettyDecoder(),new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),connectionManageHandler,serverHandler);}}重点在于这里的ChannelInitializer构造复写的initChannel方法将会被父类的channelRegistered调用。 ByteBuf 解决粘包问题 LineBasedFrameDecoder
http://www.hkea.cn/news/14394136/

相关文章:

  • 网站建设费用什么意思济南建设企业网站
  • 手游托在什么网站申请中国空间站图片高清
  • 自己做的网页怎么上传到网站吗高明网站开发公司
  • 旅游网站模板psd中型网站开发周期
  • 做科研交流常用的网站微信app免费下载安装
  • 网站和系统哪个好做网门app下载
  • python php 网站开发原创wordpress改成英文
  • 免费软件站湖北省建设工程质量安全协会网站
  • 宁波集团网站建设阿里云服务器安装wordpress
  • 学历教育网站建设哪里有免费的个人简历模板
  • 湖南网站设计案例wordpress动漫网站
  • 专业的深圳网站建设公司哪家好wordpress用旧的编辑器
  • 建立一个网站的步骤完全免费空间网站
  • 企业网站功能包括为什么要建设学校网站
  • 恢复原来的网站网站改版要注意什么
  • 网站建设的收获体会赵朴初网站建设
  • 松江做网站嘉祥县建设局官方网站
  • 贵州网站推广公司seo网站计划书
  • 网站有二维码吗区块链系统软件开发
  • 劳务派遣技术支持 东莞网站建设广州网站建设公司乐云seo
  • 网站建设公司合伙人手表回收网网站
  • 受雇去建设网站类网站自己开发app所需的各种费用
  • 深圳网站建设网页设计企业所得税怎么计算公式
  • 重庆快速网站推广晋中建设局网站
  • 中山企业网站优化做网站公司凡科
  • 网站视频上传怎么做大淘客wordpress
  • 网站建设的团队分工wordpress筛选主题
  • 简单的网站制作代码做体育设施工程公司的网站
  • 做封面的地图网站网络安装
  • 新时代文明实践站网址网站代码优化的方法