国内外高校门户网站建设,免费建立小程序网站,wordpress 杂志,wordpress图片文件目录WorkFlow源码剖析——Communicator之TCPServer#xff08;中#xff09;
前言
上节博客已经详细介绍了workflow的poller的实现#xff0c;这节我们来看看Communicator是如何利用poller的#xff0c;对连接对象生命周期的管理。#xff08;PS#xff1a;与其说Communica…WorkFlow源码剖析——Communicator之TCPServer中
前言
上节博客已经详细介绍了workflow的poller的实现这节我们来看看Communicator是如何利用poller的对连接对象生命周期的管理。PS与其说Communicator利用的是poller其实Communicator使用的是mpoller上节在介绍poller时也提到过mpoller现场帮读者回忆一下mpoller是poller的manager类管理多个poller事件池对外提供的接口负责将各种poller_node负载均衡的分散给不同的poller。
上节在介绍poller时出现了各种回调比如poller-callback()、node-data.accept()、node-data.partial_written()、node-data.create_message()等当时我们总是一笔带过没有去深入分析这些回调会做什么并且每次在IO事件结束都会回调poller-callback()为什么要这样做在poller当中只看到了针对poller_node的malloc函数而没有看到对应的free函数哪里调用了free函数去释放poller_node
别着急本节的源码分析会逐步揭晓这些疑问。
同样的注意里还是放在TCPServer上对于SSL和UDP相关的内容直接忽略。先把TCP给模清楚。
管理连接对象的实现
连接上下文数据结构的分析
既然谈到对连接对象的管理那Communicator必然有一个数据结构来表示一个连接上下文对象它就是CommConnEntry代码如下
struct CommConnEntry
{struct list_head list; // 链表节点CommConnection *conn; // 下一章介绍TCPServer的时候会用到本文可忽略。long long seq; // seq的作用其实可以理解为一条连接 请求-回复 的轮次。一条连接的服务端和客户端理论上讲seq值是同时递增的并且一定是保持相同的。int sockfd; // 连接的句柄
#define CONN_STATE_CONNECTING 0
#define CONN_STATE_CONNECTED 1
#define CONN_STATE_RECEIVING 2
#define CONN_STATE_SUCCESS 3
#define CONN_STATE_IDLE 4
#define CONN_STATE_KEEPALIVE 5
#define CONN_STATE_CLOSING 6
#define CONN_STATE_ERROR 7int state; // 连接状态int error;int ref; // 对对象的引用计数struct iovec *write_iov; // 异步写缓存CommSession *session; // 含义同go-task每次读写完毕或则出错了都会调用该对象的hanle函数CommTarget *target; // 连接目的地。对于客户端该成员是服务器的地址对于服务端该成员是客户端的地址。CommService *service; // 该成员仅服务端有意义mpoller_t *mpoller;/* Connection entrys mutex is for client session only. */pthread_mutex_t mutex;
};workflow将客户端、服务端、tcp、udp、ssl的实现都混杂在一个文件当中。在第一次阅读它的源码时有点双眼摸瞎的感觉。如果你有足够丰富的网络编程的经验可能还好。
需要注意一点的是该连接上下文在客户端和服务端所使用成员可能是不同的客户端不会使用service成员服务端不会使用seq成员。
对于CommConnEntry::list成员其实有两种用途 一是被挂在服务端的CommService::alive_list上。可以理解为服务端的http保活池。 二是被挂在客户端的CommTarget::idle_list上。可以理解为客户端的对同一个ipaddr:port的http连接池。
广义上讲服务端的CommServiceTarget::idle_list也是http连接池。只是服务端的idle_list上只可能会有一个连接。
然后可以预见的是 随着tcp连接状态的变化state成员所记录的状态也会随之更新。 当ref成员减为零CommConnEntry对象将会被free掉。
根据以往的经验能大胆猜测到的就是这些信息。
状态迁移池
状态迁移池——没错类似于事件池状态迁移池也有一个循环它的任务是不断根据IO的结果转换连接上下文的状态并且根据IO的结果去回调必要的处理函数最为代表的是session-handlesession的概念在go-task源码剖析一节中也是存在。它存在的意义在下一章讲解workflow对TCPServer的时候才适合透露。我们重点集中在communicator如何管理连接上下文的状态的。
这里其实就引入了一个问题连接上下文为什么存在状态的迁移别急让我一步步道来。 首先是状态池的启动————Communicator::init
代码如下
int Communicator::init(size_t poller_threads, size_t handler_threads) {/* ... */if (this-create_poller(poller_threads) 0){if (this-create_handler_threads(handler_threads) 0){this-stop_flag 0;return 0;}mpoller_stop(this-mpoller);mpoller_destroy(this-mpoller);msgqueue_destroy(this-msgqueue);}return -1;
}涉及的代码过多这里仅挑重点。 Communicator::init首先会启动mpoller也就是上章我们所讲的事件池。上节的poller-callback函数以及它的参数poller-context在poller初始化时就是由struct poller_params提供。而该结构体的够着在Communicator::create_poller当中是这样被赋值的 void Communicator::callback(struct poller_result *res, void *context) {msgqueue_t *msgqueue (msgqueue_t *)context;msgqueue_put(res, msgqueue);
}int Communicator::create_poller(size_t poller_threads) {struct poller_params params {.max_open_files (size_t)sysconf(_SC_OPEN_MAX),.callback Communicator::callback,};this-msgqueue msgqueue_create(16 * 1024, sizeof (struct poller_result));if (this-msgqueue) {params.context this-msgqueue;/* ... */}return -1;
}所以可以看到上章的poller-callback回调会将传进来的poller_result追加到Communicator的状态迁移池的队列当中。 Communicator::init然后会启动状态迁移池。状态迁移池使用的就是workflow自己造了链式线程池轮子。特别的是在线程池的每个线程都运行一个routineCommunicator::handler_thread_routine该函数是一个死循环。在每个线程都分配到一个Communicator::handler_thread_routine后线程池的队列其实就失去了它的意义。每个Communicator::handler_thread_routine会使用1当中分配的队列。
转到Communicator::handler_thread_routine它的实现如下
void Communicator::handler_thread_routine(void *context) {Communicator *comm (Communicator *)context;struct poller_result *res;while (1) {res (struct poller_result *)msgqueue_get(comm-msgqueue);if (!res)break;switch (res-data.operation) {case PD_OP_TIMER:comm-handle_sleep_result(res);break;case PD_OP_READ:comm-handle_read_result(res);break;case PD_OP_WRITE:comm-handle_write_result(res);break;case PD_OP_LISTEN:comm-handle_listen_result(res);break;/* ... */}free(res);}if (!comm-thrdpool) {mpoller_destroy(comm-mpoller);msgqueue_destroy(comm-msgqueue);}
}阅读过上面的代码后我们应该惊喜因为我们看到了free这里我可以自信的回答这个问题上节poller当中只看到了针对poller_node的malloc函数而没有看到对应的free函数哪里调用了free函数去释放poller_node
没错poller_node就是在这里释放的。
poller_node生命周期是这样的链路
__poller_new_node { malloc } - write(addr) - pipe - __poller_handle_pipe { addr read() } - poller-callback(addr) - handler_thread_routine { free }最终在状态迁移池启动完毕后结合poller的事件池Communicator最终的系统架构如下图 IO结果的处理
还是贴出上章所讲解的基本tcp服务框架示例 -----------| socket |-----------|V-----------| bind | 这三步就是由本节的Communicator执行-----------|V-----------| listen |-----------| ______________________________________________V-----------| accept | 从这里开始涉及到的所有函数是poller负责。-----------| | | |V V V Vfd fd fd .../\read write当然Communicator会使用mpoller暴露的api对sockfd设置所关心的IO事件。间接调用了IO系统调用接口。下面从listen fd入手逐步揭开communicator的真面目。 创建绑定监听三部曲————Communicator::bind
函数如下
int Communicator::bind(CommService *service) {struct poller_data data;int errno_bak errno;int sockfd;sockfd this-nonblock_listen(service);if (sockfd 0) {data.fd sockfd;data.context service;/* ... */data.operation PD_OP_LISTEN;data.accept Communicator::accept;// 开始接收客户端连接if (mpoller_add(data, service-listen_timeout, this-mpoller) 0) {errno errno_bak;return 0;}close(sockfd);}return -1;
}注意到listen套接字被分配了一个Communicator::accept回调上一章介绍poller时每当listen套接字接收到一个客户端的连接都会将IO socket作为参数回调一下accept函数此处代表Communicator::accept它实际上会为IO socket创建一个CommServiceTarget对象。
三部曲核心在nonblock_listen如下
int Communicator::nonblock_listen(CommService *service) {int sockfd service-create_listen_fd(); // scoket()int ret;if (sockfd 0) {if (__set_fd_nonblock(sockfd) 0) { // 设置为非阻塞if (__bind_sockaddr(sockfd, service-bind_addr,service-addrlen) 0) { // 监听socket和addr绑定ret listen(sockfd, SOMAXCONN); // 开始监听if (ret 0 || errno EOPNOTSUPP) {service-reliable (ret 0);return sockfd;}}}close(sockfd);}return -1;
}所以Communicator::bind绑定并启动一个tcpserver的流程是 初始化一个监听套接字。 将监听套接字添加到mpoller事件池当中。开始接受来自客户端的连接。 接收客户端连接————Communicator::handle_listen_result
了解了前面状态迁移池和bind的流程结合上一章poller的源码分析一旦poller的accept接收到一条连接会回调一下Communicator::accept然后再调用poller-callback并将IOsocket填到res然后res被传回Communicator队列当中。状态迁移线程会从队列当中取res然后根据res-data.operation会去回调Communicator::handle_listen_result它的实现如下
void Communicator::handle_listen_result(struct poller_result *res) {CommService *service (CommService *)res-data.context;struct CommConnEntry *entry;CommServiceTarget *target;int timeout;switch (res-state) {case PR_ST_SUCCESS:target (CommServiceTarget *)res-data.result; // Communicator::accept的返回值entry Communicator::accept_conn(target, service);if (entry) {entry-mpoller this-mpoller;res-data.operation PD_OP_READ;res-data.fd entry-sockfd;res-data.create_message Communicator::create_request;res-data.context entry;res-data.message NULL;timeout target-response_timeout;if (mpoller_add(res-data, timeout, this-mpoller) 0) {if (this-stop_flag)mpoller_del(res-data.fd, this-mpoller);break;}__release_conn(entry);}elseclose(target-sockfd);target-decref();break;}
}Communicator::accept_conn函数根据res创建出IOsocket的连接上下文CommConnEntry并且该连接上下文初始的state为CONN_STATE_CONNECTED。然后将该对象加入到mpoller事件池当中开启对IOsocket的读事件进行监听。 边接收边解析————Communicator::create_request Communicator::append_message
客户端请求报文解析完毕状态转移————Communicator::handle_read_result
因为这两部分涉及的代码过于庞大详细讲解的话避免不了要贴大量的代码作者表达水平有限。还是觉得使用图解的方式去呈现比较省事。所以为避免文章代码比例过高下一小节将会以图画的形式向读者剖析这部分的源码。
状态迁移总结
服务端监听套接字的绑定就不说了下面使用一张图来讲解从服务端接收客户端连接 到 读客户端请求报文边读边解析最后向客户端发送回复的一个流程。 接收客户端连接并读取解析客户端发来的报文流程————异步状态机之美 第一步是poller当中的__poller_handle_listen回调函数 服务端在接收到一个客户端连接后首先会为其创建一个CommServiceTarget对象。 然后将IO socketfd通过回调poller-callback放到队列当中传回给Communicator。
第二步是Communicator的Communicator::handle_listen_result函数 状态变迁池拿到res对像后得知operation为PD_OP_LISTEN的res所以调用Communicator::handle_listen_result函数来处理res。 在Communicator::handle_listen_result函数当中首先会构造一个连接上下文entry它的状态被初始化为CONN_STATE_CONNECTED。 构造一个operation为READ的poller_node。并且data成员的create_message回调填为Communicator::create_request。 将poller_node加入到mpoller开始对其度事件进行监听。
第三步是poller当中的读事件处理回调__poller_handle_read 读数据。 __poller_append_message它里面会创建一个poller_message_t对象如果不存在的话一般在一轮请求的最开始会构造一个msg对象。利用poller_message_t对象对读到的数据进行解析。这是一个边读边解析的过程中间可能会调用数次。当msg-append返回值大于0时说明请求报文读并且解析完了。此时将msg封装在res当中并回调poller-callback。create_message和append两个回调分别对应Communicator::create_request和Communicator::append_message。这两个函数核心代码已经在上图③号虚框当中显示读者可以仔细阅读一下。这里其实涉及到连接上下文entry的两次状态变换。在create_message时entry-state会变更为CONN_STATE_RECEIVING而在数据解析完毕Communicator::append_message当中的in-append返回大于0进入到下面的if分支又会将entry-state变更为CONN_STATE_SUCCESS。
第四步也是读流程的最后一步Communicator::handle_read_result当中的Communicator::handle_incoming_request函数 状态变迁池拿到res对像后得知operation为PD_OP_READ的res所以调用Communicator::handle_read_result函数来处理res。因为是服务端所以Communicator::handle_read_result函数会调用Communicator::handle_incoming_request函数。 这里会将session的state设置成CS_STATE_TOREPLY。 如果entry-state CONN_STATE_SUCCESS则将entry挂到target的idle链表上、entry-ref同时entry-state修改成CONN_STATE_IDLE。session-passive 必须赋值为 2。 回调session-handle然后entry-ref–当entry-ref为0时调用__release_conn将连接关闭并free掉entry连接上下文。
所以entry状态变化顺序为
[CONN_STATE_CONNECTED] - [CONN_STATE_RECEIVING] - [CONN_STATE_SUCCESS] - [CONN_STATE_IDLE]向客户端发送回复报文————先尽力而为的写然后再异步写。
当服务端需要发送一个回复报文时会调用Communicator::reply接口它的代码如下
int Communicator::reply(CommSession *session) {struct CommConnEntry *entry;CommServiceTarget *target;int errno_bak;int ret;if (session-passive ! 2) { // 处在读完毕的状态errno EINVAL;return -1;}errno_bak errno;session-passive 3; // 写状态target (CommServiceTarget *)session-target;ret this-reply_reliable(session, target);if (ret 0) { // 这里是同步写已经将所有数据发完了。无需异步写entry session-in-entry;session-handle(CS_STATE_SUCCESS, 0); // 再次回调session的handleif (__sync_sub_and_fetch(entry-ref, 1) 0) {__release_conn(entry);target-decref();}} else if (ret 0)return -1;errno errno_bak;return 0;
}int Communicator::reply_reliable(CommSession *session, CommTarget *target) {struct CommConnEntry *entry;struct list_head *pos;int ret -1;pthread_mutex_lock(target-mutex);if (!list_empty(target-idle_list)) { // 处于CONN_STATE_IDLE状态pos target-idle_list.next;entry list_entry(pos, struct CommConnEntry, list);list_del(pos);session-out session-message_out();if (session-out)ret this-send_message(entry);} elseerrno ENOENT;pthread_mutex_unlock(target-mutex);return ret;
}int Communicator::send_message(struct CommConnEntry *entry) {/* ... */end vectors cnt;cnt this-send_message_sync(vectors, cnt, entry); // 先尽力而为的同步写if (cnt 0)return cnt;return this-send_message_async(end - cnt, cnt, entry); // 写缓存满了需要异步写
}写的设计思路和Muduo的很像muduo源码阅读笔记10、TcpConnection。这里不过多赘述只讲一下差别。还是以全面的情况为例子假设现在需要发送一批回复数据并且同步写无法将所有的数据发送完。那么在同步写一部分我们的数据之后肯定会触发异步写。
而异步写呢就得靠poller层的__poller_handle_write函数。只要tcp的发送缓存区非满poller_node就会收到通知然后尽力向发送缓存区写一些数据这可能也需要花几轮的功夫去写数据。在这期间每写一部分数据__poller_handle_write函数就会回调node-data.partial_written从Communicator::send_message_async函数在构造WRITE类型的poller_node时我们可以得知partial_written就是Communicator::partial_written而它的实现如下
int Communicator::partial_written(size_t n, void *context) {struct CommConnEntry *entry (struct CommConnEntry *)context;CommSession *session entry-session;int timeout;timeout Communicator::next_timeout(session);mpoller_set_timeout(entry-sockfd, timeout, entry-mpoller);return 0;
}在写完部分数据后为什么需要回调一下partial_written呢这里其实就得到了合理的解释既然在规定的写超时时间内我能向发送缓存写一些数据那就说明网没断只是网络状况可能不好。所以按理来说在规定的时间内发送了部分数据就应该更新一下发送的超时时间避免没有必要的超时。
一旦异步写完成了和__poller_handle_read不同__poller_handle_write会自动将poller_node从epoll上移除然后回调poller-callback。PS如果你忘了poller的实现建议回顾一下WorkFlow源码剖析——Communicator之TCPServer上
然后同读完成类似在Communicator当中写完成会被Communicator::handle_write_result处理因为是服务断所以会调用Communicator::handle_reply_result。该函数逻辑如下
void Communicator::handle_reply_result(struct poller_result *res) {struct CommConnEntry *entry (struct CommConnEntry *)res-data.context;CommService *service entry-service;CommSession *session entry-session;CommTarget *target entry-target;int timeout;int state;switch (res-state) {case PR_ST_FINISHED:timeout session-keep_alive_timeout();if (timeout ! 0) {__sync_add_and_fetch(entry-ref, 1); // 避免被释放res-data.operation PD_OP_READ;res-data.create_message Communicator::create_request;res-data.message NULL;pthread_mutex_lock(target-mutex);if (mpoller_add(res-data, timeout, this-mpoller) 0) { // 以读的方式添加到mpoller当中pthread_mutex_lock(service-mutex);if (!this-stop_flag service-listen_fd 0) {entry-state CONN_STATE_KEEPALIVE; // entry-state修改成CONN_STATE_KEEPALIVElist_add_tail(entry-list, service-alive_list); // 追加到保活链表} else {mpoller_del(res-data.fd, this-mpoller);entry-state CONN_STATE_CLOSING;}pthread_mutex_unlock(service-mutex);}else // 出错该释放了__sync_sub_and_fetch(entry-ref, 1); pthread_mutex_unlock(target-mutex);}if (1)state CS_STATE_SUCCESS;session-handle(state, res-error); // 第二次回调session-handleif (__sync_sub_and_fetch(entry-ref, 1) 0) {__release_conn(entry);((CommServiceTarget *)target)-decref();}break;}
}逻辑分成三部分 将entry-ref自增一 以读的方式将poller_node加回到mpoller当中继续监听客户端的读请求 将entry-state修改成CONN_STATE_KEEPALIVE并且加到CommService的保活链表当中。 将session的state设置成CS_STATE_SUCCESS再回调session-handle。 entry-ref自减一为零就释放连接以及上下文。
所以在经过Communicator::handle_reply_result函数后entry-state状态被修改为CONN_STATE_KEEPALIVE。然后再处理下一轮客户端请求。
最后备忘一下 对于session-passive的变化在create_request创建msg请求报文解析器session-passive被置为1在读取并解析完毕请求报文后Communicator::handle_incoming_request函数回将session-passive置为2在调用Communicator::reply向网络发送回复时session-passive会被置为3。 Communicator::send_message_async当中在吧poller_node以WRITE方式加入到mpoller时会走到mpoller_mod分支。
到了这里其实就能回答这个问题连接上下文为什么存在状态的迁移
首先一条tcp连接在服务端必定纯在两种状态接收、发送。而因为追求性能我们不得不采用异步的方式将socketfd设置成非阻塞的。并且网络传输不管是否阻塞会引来一些问题读不可能一次性读完、写不可能一次性写完所以读的过程当中和写的过程当中都存在一种中间的状态。所以状态迁移是必然的。
最后在了解了workflow的底层架构之后其实就能感觉到异步编程就是在实现一个状体机的过程。 本章完结