泰兴网站设计,新网店怎么免费推广,用php做网站后台,网络营销与传统营销有哪些区别系列文章目录
【zookeeper核心源码解析】第一课#xff1a;zk启动类核心流程序列图 【zookeeper核心源码解析】第二课#xff1a;俯瞰QuorumPeer启动核心流程#xff0c;实现选举关键流程 【zookeeper核心源码解析】第三课#xff1a;leader与follower何时开始同步#…系列文章目录
【zookeeper核心源码解析】第一课zk启动类核心流程序列图 【zookeeper核心源码解析】第二课俯瞰QuorumPeer启动核心流程实现选举关键流程 【zookeeper核心源码解析】第三课leader与follower何时开始同步如何同步数据 【zookeeper核心源码解析】第四课客户端与服务端读写的io核心流程 【zookeeper核心源码解析】第四课客户端与服务端读写的io核心流程 系列文章目录1. 先看服务端初始化与连接构建的准备2. 客户端代码 1. 先看服务端初始化与连接构建的准备
在第一节中介绍到NIOServerCnxnFactory的初始化该类其实就是专门为客户端读写数据准备的服务端。主要构建连接与数据读写。
c class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable
在run方法中构建连接与io读写具体代码如下
public void run() {while (!ss.socket().isClosed()) {try {selector.select(1000);SetSelectionKey selected;synchronized (this) {selected selector.selectedKeys();}ArrayListSelectionKey selectedList new ArrayListSelectionKey(selected);Collections.shuffle(selectedList);for (SelectionKey k : selectedList) {if ((k.readyOps() SelectionKey.OP_ACCEPT) ! 0) {SocketChannel sc ((ServerSocketChannel) k.channel()).accept();InetAddress ia sc.socket().getInetAddress();int cnxncount getClientCnxnCount(ia);if (maxClientCnxns 0 cnxncount maxClientCnxns){LOG.warn(Too many connections from ia - max is maxClientCnxns );sc.close();} else {LOG.info(Accepted socket connection from sc.socket().getRemoteSocketAddress());sc.configureBlocking(false);SelectionKey sk sc.register(selector,SelectionKey.OP_READ);NIOServerCnxn cnxn createConnection(sc, sk);sk.attach(cnxn);addCnxn(cnxn);}} else if ((k.readyOps() (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) ! 0) {NIOServerCnxn c (NIOServerCnxn) k.attachment();c.doIO(k);} else {if (LOG.isDebugEnabled()) {LOG.debug(Unexpected ops in select k.readyOps());}}}selected.clear();} catch (RuntimeException e) {LOG.warn(Ignoring unexpected runtime exception, e);} catch (Exception e) {LOG.warn(Ignoring exception, e);}}closeAll();LOG.info(NIOServerCnxn factory exited run method);}2. 客户端代码
ClientCnxn 类是客户端的入口代码。
/*** This class manages the socket i/o for the client. ClientCnxn maintains a list* of available servers to connect to and transparently switches servers it is* connected to as needed.**/里面的EventThread专本对数据进行异步读写。感兴趣可以从run()方法进去看 Overridepublic void run() {try {isRunning true;while (true) {Object event waitingEvents.take();if (event eventOfDeath) {wasKilled true;} else {processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning false;break;}}}} catch (InterruptedException e) {LOG.error(Event thread exiting due to interruption, e);}LOG.info(EventThread shut down);}