海口网站建设过程,注册公司要花多少费用,wordpress最新列表,网站如何做美工总言 主要内容#xff1a;多路转接#xff1a;epoll学习。 文章目录 总言5、多路转接#xff1a;epoll5.1、相关概念与接口5.1.1、基本函数认识5.1.1.1、epoll_create5.1.1.2、epoll_ctl5.1.1.3、epoll_wait 5.1.2、epoll的工作原理5.1.2.1、准备工作#xff08;…总言 主要内容多路转接epoll学习。 文章目录 总言5、多路转接epoll5.1、相关概念与接口5.1.1、基本函数认识5.1.1.1、epoll_create5.1.1.2、epoll_ctl5.1.1.3、epoll_wait 5.1.2、epoll的工作原理5.1.2.1、准备工作一些背景知识补充5.1.2.2、epoll 的核心组件5.1.2.3、一些细节 5.2、epoll快速编写读事件5.2.1、log.hpp、sock.hpp5.2.1.1、log.hpp5.2.1.2、sock.hpp 5.2.2、epoll.hpp5.2.3、epollServer.hpp、main.cc5.2.3.1、epollServer.hpp5.2.3.2、main.cc 5.3、如何基于epoll设计一个相对完整的服务器5.3.1、epoll的工作模式LT 与 ET5.3.1.1、概念介绍5.3.1.2、细节理解 5.3.2、前情回顾5.3.2.1、问题分析5.3.2.2、reactor 设计模式 5.3.3、log.hpp、sock.hpp、Protocol.hpp5.3.3.1、log.hpp5.3.3.2、Protocol.hpp5.3.3.3、sock.hpp 5.3.4、epoll.hpp5.3.6、TcpServer.hpp5.3.6.1、Connection类5.2.5.2、Tcpserver类5.2.5.3、演示结果 5.3.7、TcpServer.cc Fin、共勉。 前情回顾
高级IO一 5、多路转接epoll
5.1、相关概念与接口
5.1.1、基本函数认识 说明 epoll是Linux内核为处理大批量文件描述符而作的改进的poll是Linux下多路复用IO接口select/poll的增强版本。它是在2.5.44内核中被引进的。epoll(4) is a new API introduced in Linux kernel 2.5.44 其涉及的相关函数如下。
5.1.1.1、epoll_create epoll_create用于创建 epoll 实例epoll 模型。 #include sys/epoll.hint epoll_create(int size);参数说明 size这是一个历史遗留参数自从linux2.6.8之后已不使用可填入256或512或其它非负值。该参数用于提示内核需要监听的文件描述符的大致数量。但请注意这个参数并不是限制 epoll 实例可以监听的文件描述符的最大数量它只是一个建议值用于内核内部可能的内存分配优化。 返回值 ①如果成功epoll_create 返回一个非负的文件描述符该描述符用于后续调用 epoll_ctl 和 epoll_wait 函数。②如果失败返回 -1 并设置 errno 以指示错误原因。 特别说明 当使用 epoll_create 函数创建了一个 epoll 文件描述符或称为 epoll 实例后应当在使用完毕后使用close()将其关闭它以释放系统资源。在 Linux 中所有的文件描述符包括 socket、pipe、FIFO、终端、文件以及 epoll 文件描述符等在不再需要时都应该被关闭。 5.1.1.2、epoll_ctl 1、基本介绍 epoll_ctl用于操作 epoll 实例epoll 模型的函数。它允许我们将文件描述符如套接字、管道等添加到 epoll 实例中或者从 epoll 实例中删除文件描述符或者修改已经添加到 epoll 实例中的文件描述符的事件。 #include sys/epoll.hint epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);返回值 ①成功返回0。②失败返回 -1 并设置 errno 以指示错误原因。 参数说明 1、epfd这是由 epoll_create 或 epoll_create1 函数返回的文件描述符代表一个 epoll 实例。 2、op这是一个操作码用于指定要对 fd 进行的操作。可能的值包括后续还会详细介绍 EPOLL_CTL_ADD向 epoll 实例中添加一个新的文件描述符 fd 和相关的事件。 EPOLL_CTL_DEL从 epoll 实例中删除一个已存在的文件描述符 fd。 EPOLL_CTL_MOD修改已添加到 epoll 实例中的文件描述符 fd 的事件。 3、fd需要进行添加、删除或修改其事件的文件描述符。 4、event一个指向struct epoll_event结构体的指针用于指定与 fd 相关联的事件。当 op 为 EPOLL_CTL_ADD 或 EPOLL_CTL_MOD 时这个参数是必需的当 op 为 EPOLL_CTL_DEL 时这个参数是未使用的可以设置为 NULL。 2、epoll_event 结构体 epoll_event 结构体是 Linux 下 epoll 接口中用于存储事件信息的结构体定义通常如下所示注意具体定义可能因 Linux 内核版本而异但基本结构相似
struct epoll_event
{uint32_t events; /* Epoll events事件类型组合如 EPOLLIN, EPOLLOUT, EPOLLERR 等 */ epoll_data_t data; /* User data variable关联的数据可以是文件描述符、指针等 */
};这里events 和我们之前在poll函数中见到的一样是一个位掩码在epoll_ctl中表示用户程序告知OS需要关注哪些文件描述符上发生的哪些事件。比如POLLIN表示需要关注相应的文件描述符的读事件。 这里events可以是以下几个宏的集合
EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
EPOLLOUT : 表示对应的文件描述符可以写;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
EPOLLERR : 表示对应的文件描述符发生错误;
EPOLLHUP : 表示对应的文件描述符被挂断;
EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
EPOLLONESHOT只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里. data是一个联合体用于存储与事件相关的数据。我们在使用时根据需求选择使用其中一种字段即可。
typedef union epoll_data
{void *ptr;// 一个指向用户定义的任何数据的指针。int fd;// 直接存储文件描述符本身这在一些情况下比使用 ptr 更为直接uint32_t u32;//u32 或 u64这两个成员允许存储 32 位或 64 位无符号整数这在特定场景下可能有用但不如 ptr 和 fd 常用。uint64_t u64;
} epoll_data_t;fd如果只需要存储文件描述符本身那么fd字段是最直接的选择。这在仅仅需要知道哪个文件描述符上发生了事件时非常有用。但是如果events字段中包含了EPOLLET边缘触发模式并且我们需要处理多个数据包或事件时仅仅依赖fd可能不足以满足需求因为边缘触发模式下epoll不会为同一文件描述符上的连续事件重复通知。 ptr如果需要存储更复杂的数据结构或上下文信息那么ptr字段可以用来指向一个包含这些信息的结构体或对象。这样在事件发生时我们可以通过ptr字段找到与事件相关联的完整上下文信息。 u32和u64这两个字段允许我们存储无符号的32位或64位整数。尽管在某些特定场景下它们可能有用但在大多数情况下你可能更倾向于使用fd或ptr因为这两个字段提供了更多的灵活性和功能性。
5.1.1.3、epoll_wait epoll_wait用于等待 epoll 实例上注册的文件描述符上的事件发生。 #include sys/epoll.hint epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);参数说明 epfd和之前一样是由 epoll_create 或 epoll_create1 函数返回的文件描述符代表一个 epoll 实例epoll 模型。 events这是一个指向 struct epoll_event 结构体的数组的指针用于存储从 epoll 实例中返回的事件。当 epoll_wait 返回时这个数组将被填充有实际发生的事件。 maxevents这个参数告诉内核这个 events 数组的大小即内核最多能返回多少个事件。这个值必须大于 0。 timeout这个参数指定了 epoll_wait 函数的超时时间以毫秒为单位。如果 timeout 是正数那么 epoll_wait 会阻塞调用进程直到有事件发生或者超时。如果 timeout 是 0那么 epoll_wait 会立即返回不论是否有事件发生。如果 timeout 是 -1那么 epoll_wait 会无限期地等待直到有事件发生。 返回值 如果成功epoll_wait 返回实际发生的事件数量可能小于 maxevents。 如果在超时时间内没有事件发生epoll_wait 返回 0。 如果发生错误epoll_wait 返回 -1 并设置 errno 以指示错误原因。
5.1.2、epoll的工作原理
5.1.2.1、准备工作一些背景知识补充 1、预备工作select、poll工作原理简单回顾 1、回顾之前我们写的select和poll无论是select还是poll二者都要求用户在应用程序层面维护一个数据结构如数组用于存储文件描述符fd及其相关的事件类型。这种维护的责任完全落在上层用户程序上成本。 2、此外我们也说过select 和 poll充斥着大量的遍历操作。例如我们要确认哪些事件就绪时就需要通过遍历数组找到对应的fd同理OS内部在监视fd时也是需要进行遍历的。 3、select or poll工作模式 a、通过select or poll的参数用户告诉内核需要关注哪些fd上的哪些event。 b、通过select or poll的返回内核告诉用户哪些fd上的哪些events已经就绪了。 2、一个背景知识(了解) 问题我们知道网卡、键盘等这些属是外设。那么OS是如何知道网卡里有数据的同理OS是如何知道键盘有用户输入的 即上层的操作系统是如何知道底层硬件有数据需要处理的 OS如何知道网卡里有数据 当网卡接收到数据包时它并不会直接通知操作系统OS。相反网卡会触发一个硬件中断。这个中断信号是硬件向CPU发送的一种通知告诉CPU有某个特定的事件在这种情况下是数据到达已经发生。 CPU在接收到中断信号后会暂停当前正在执行的程序并查找一个称为中断向量表Interrupt Vector Table 的数据结构。这个表是一个映射表它将不同的中断信号与相应的中断处理程序Interrupt Handler关联起来。中断处理程序是操作系统中负责处理特定中断事件的代码段。 在中断向量表中CPU会找到与网卡中断信号相对应的中断处理程序并执行它。这个中断处理程序会负责从网卡中读取数据并将其传递给操作系统的网络栈进行处理。 综上通过硬件中断和中断向量表操作系统能够知道网卡里有数据到达并采取相应的处理措施。 OS如何知道键盘有用户输入 道理相同 当用户按下键盘上的某个键时键盘控制器会检测到这个动作并生成一个硬件中断信号。这个中断信号会被发送给CPU通知它键盘上有用户输入事件发生。 同样地CPU在接收到这个中断信号后会查找中断向量表找到与键盘中断信号相对应的中断处理程序。这个中断处理程序会负责从键盘控制器中读取按键信息并将其传递给操作系统的输入子系统进行处理。 操作系统的输入子系统会解析按键信息将其转换为操作系统能够理解的格式如ASCII码并将其传递给正在运行的应用程序。这样应用程序就能够知道用户按下了哪个键并做出相应的响应。 综上通过硬件中断和中断向量表操作系统能够知道键盘有用户输入并采取相应的处理措施。
5.1.2.2、epoll 的核心组件 1、epoll_create创建epoll内核对象 当进程调用epoll_create方法时内核会创建一个struct eventpoll对象其中的字段可见上图后续epoll的操作大部分都是对这个数据结构的操作。在该结构体中有几个成员与epoll的使用方式密切相关
struct eventpoll{ .... /*等待队列双向链表结构作用是在软中断就绪时通过wq找到阻塞在epoll对象上的进程。*/wait_queue_head_t wq;/*epoll用于索引的结构是一颗红黑树这里是红黑树的根节点(Red Black Root)这颗树中存储着所有添加到epoll中的需要监控的文件描述符*/ struct rb_root rbr; /*就绪队列双向链表结构里面存放着将要通过epoll_wait返回给用户的满足条件的文件描述符*/ struct list_head rdlist; ....
}; 在创建eventpoll对象后内核会将其加入到当前进程的文件描述符表中这意味着eventpoll对象也是文件系统中的一员。这也是为什么epoll_create 调用成功会返回一个非负的文件描述符。我们可以通过该文件描述符使用epoll_cerate、epoll_wait等操作访问eventpoll对象进而管理监控所需的文件描述符及其事件。 2、epoll_ctl对fd进行增删查改操作 根据上一节内容每个epoll实例都对应着一个独立的eventpoll结构体它内部维护了一棵重要的红黑树作为索引结构用于高效地管理被监视的文件描述符fds。这颗红黑树的每个节点都是一个epitem结构体代表了一个特定的监视事件。
struct epitem{ struct rb_node rbn;//红黑树节点 struct list_head rdllink;//双向链表节点 struct epoll_filefd ffd; //事件句柄信息 struct eventpoll *ep; //指向其所属的eventpoll对象 struct epoll_event event; //期待发生的事件类型
} /*
解释一下使用红黑树这种结构可能的原因
因为红黑树作为一种自平衡的二叉搜索树保证了即使面对大量的文件描述符也能够快速地进行查找、插入和删除操作。最坏情况这些操作的时间复杂度为O(log n)。
比如通过epoll_ctl方法向epoll对象中添加事件这些事件都会挂载在红黑树中如此重复添加的事件就可以通过红黑树而高效的识别出来。
*/在上层用户程序调用epoll_ctl告诉OS需要关心哪些fd及其event等等一些列增删查改操作实际正是对这颗红黑树进行操作。 比如通过epoll_ctl添加一个新的监视事件时内核会在红黑树中插入一个新的epitem节点该节点包含了文件描述符、感兴趣的事件类型如可读、可写等以及可能的用户数据。 此外为了解决文件描述符及其事件就绪后内核能够做出响应epoll机制还建立了一套回调机制所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系。也就是说当对应的文件描述符上有事件发生就会调用这个回调方法比如socket缓冲区有数据了内核就会回调这个函数。该回调方法在内核中叫ep_poll_callback它会将发生的事件添加到rdlist双链表中。 相关扩展博文链接。 3、epoll_wait检查就绪队列rdllist中是否有数据 当调用epoll_wait检查是否有事件发生时只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。如果rdlist不为空则把发生的事件复制到用户态同时将事件数量返回给用户。这个操作的时间复杂度是 O ( 1 ) O(1) O(1)。 详细解释 当应用程序调用 epoll_wait 时它指定了一个等待时间可以是无限等待和一个 events 数组用于接收就绪的事件。 epoll_wait 首先检查就绪链表rdllist中是否已经有就绪的文件描述符。 如果有它会立即将这些事件复制到用户态的 events 数组中并返回事件的数量。如果没有就绪的文件描述符epoll_wait 会创建一个等待队列项wait queue entry其中包含了当前进程的信息和一个默认的唤醒函数如 default_wake_function。这个等待队列项会被添加到 eventpoll 结构体的等待队列中。然后当前进程会被置于睡眠状态直到某个事件触发或等待超时。 当设备驱动程序检测到有事件发生时如数据到达 socket它会调用注册的回调函数如 ep_poll_callback。ep_poll_callback 函数会检查事件是否满足 epoll 实例中设置的条件并将相应的文件描述符节点从红黑树中移动到就绪链表中。 接下来ep_poll_callback或通过其他机制会遍历 eventpoll 结构体的等待队列调用其中的唤醒函数如default_wake_function以唤醒所有在该等待队列中睡眠的进程。被唤醒的进程会重新执行 epoll_wait 的剩余部分此时它会发现就绪链表中有事件于是将这些事件复制到用户态的 events 数组中并返回。 应用程序接收到 epoll_wait 返回的事件后会遍历 events 数组并根据事件类型调用相应的处理函数。处理函数可能会读取数据、发送响应或执行其他 I/O 操作。 5.1.2.3、一些细节 1、要知道红黑树这种结构需要key值。在epoll中红黑树是以文件描述符FD作为唯一键Key进行索引的。 2、epoll的这种设计方式使得用户只需要关注设置自己感兴趣的事件并获取事件处理结果不用再关心任何对fd与event的管理细节这些都有操作系统来做。 3、epoll的高效体现 ①文件描述符管理的高效性 通过红黑树这一自平衡二叉搜索树替代了传统的线性数组结构实现了对大量文件描述符的快速增删查操作显著降低了管理成本。 ②事件驱动的被动响应 与先前的select和poll机制不同epoll采用事件驱动模式即只有当文件描述符上的事件真正发生时才会触发回调通知操作系统。先前两种模式中都需要OS主动遍历查看资源是否就绪这种模式下是资源就绪后主动联系OS这种资源就绪处上位的方式避免了无意义的轮询检查降低了CPU的监测成本。 ③就绪资源的直接获取 在epoll模型中所有就绪的文件描述符都会被放置在就绪链表中用户进程通过epoll_wait调用可以直接访问这个链表无需像之前那样遍历所有文件描述符来检查状态从而极大地提高了事件处理的效率。前两种模式需要主动遍历判断查找就绪资源 4、生产者消费者模型 epoll的设计体现了这一理念。在底层一旦有文件描述符生产者上的事件就绪操作系统会自动为该文件描述符构建相应的节点并将其添加到就绪队列中生产。而在上层用户进程消费者只需不断地从就绪队列中取出数据即事件即可完成对就绪事件的获取和处理任务。由于是共享资源epoll接口已经设计为线程安全的。
5.2、epoll快速编写读事件 此处仍旧和先前一样先只演示读事件熟悉一下epoll的使用。
5.2.1、log.hpp、sock.hpp 这部分的代码和之前使用的一样。
5.2.1.1、log.hpp
#pragma once#includeiostream
#includestring
#includecstdio
#includectime
#includecstdarg// 日志分类等级
#define DEBUG 0
#define NORMAL 1
#define WARNING 2
#define ERROR 3
#define FATAL 4const char* gLevelMap[]{DEBUG,NORMAL,WARNING,ERROR,FATAL
};#define LOGFILE ./epollServer.logvoid logMessage(int level, const char* format, ...)
{// 标准部分固定输出的内容char stdBuffer[1024];time_t timestamp time(nullptr);snprintf(stdBuffer, sizeof(stdBuffer), [%s][%ld] , gLevelMap[level], timestamp);// 自定义部分允许用户根据自己的需求设置char logBuffer[1024];va_list args;va_start(args,format);vsnprintf(logBuffer, sizeof(logBuffer), format, args);va_end(args);printf(%s%s\n,stdBuffer,logBuffer);
}5.2.1.2、sock.hpp
#pragma once#include iostream
#include string
#include assert.h
#include unistd.h
#include string.h
#include sys/types.h
#include sys/socket.h
#include arpa/inet.h
#include netinet/in.hclass Sock
{const static int gbacklog 10;public:static int Socket(){// 创建套接字// int socket(int domain, int type, int protocol);int listensock socket(AF_INET, SOCK_STREAM, 0);if (listensock 0)exit(2);// 为了防止服务端断开后无法立即重启// int getsockopt(int sockfd, int level, int optname,void *optval, socklen_t *optlen);int opt 1;setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, opt, sizeof(opt));return listensock;}static void Bind(int sock, uint16_t port, std::string ip 0.0.0.0){// 绑定套接字// int bind(int sockfd, const struct sockaddr *addr,socklen_t addrlen);struct sockaddr_in local;bzero(local, sizeof local); // 清零local.sin_family AF_INET;local.sin_port htons(port); // 主机字节序--网络字节序inet_aton(ip.c_str(), local.sin_addr); // 主机字节序点分十进制---网络字节序四字节序if (bind(sock, (const sockaddr *)local, sizeof(local)) 0)exit(3);}static void Listen(int sock){// 监听: int listen(int sockfd, int backlog);if (listen(sock, gbacklog) 0)exit(4);}static int Accept(int sock, uint16_t *port, std::string *ip){// 获取连接// int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);struct sockaddr_in client;bzero(client, sizeof(client));socklen_t len sizeof(client);int servicesock accept(sock, (struct sockaddr *)client, len);if (servicesock 0)exit(5);// 将获取到的客户端端口号和ip返回给服务器这里是通过输出型参数的方式if (port)*port ntohs(client.sin_port); // 网络字节序--主机字节序if (ip)*ip inet_ntoa(client.sin_addr);return servicesock;}static bool Connect(int sock, const uint16_t port, const std::string ip){// int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);struct sockaddr_in server;bzero(server, sizeof(server));server.sin_family AF_INET;server.sin_port htons(port);server.sin_addr.s_addr inet_addr(ip.c_str());if (connect(sock, (struct sockaddr *)server, sizeof server) 0)return false;elsereturn true;}
};5.2.2、epoll.hpp 我们对epoll做一个简单的封装和sock.hpp的封装类似
#pragma once
#include iostream
#include sys/epoll.h
#include unistd.hclass Epoll
{static const int gsize 256;public:// 创建epoll对象int epoll_create(int size);static int EpollCreate(){int epfd epoll_create(gsize);if (epfd 0) // 如果成功epoll_create 返回一个非负的文件描述符exit(6);return epfd;}// 对epoll对象增删查改// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);static int EpollCtl(int epfd, int op, int fd, uint32_t events){struct epoll_event ev;ev.events events;ev.data.fd fd;return epoll_ctl(epfd, op, fd, ev); // 这里我们把返回值的处理放在外面让它保持和原函数参数、返回值一致的效果。直接在此处处理也行看个人写法。}// 从就绪队列中捞取事件// int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);static int EpollWait(int epfd, struct epoll_event revs[], int num, int timeout){return epoll_wait(epfd, revs, num, timeout);}
}; 5.2.3、epollServer.hpp、main.cc
5.2.3.1、epollServer.hpp epollServer.hpp相关代码如下
#pragma once
#include iostream
#include string
#include assert.h
#include functional
#include log.hpp
#include Sock.hpp
#include Epoll.hppnamespace ns_epoll
{const static int default_port 8080; // 默认端口号const static int gnum 64; // epoll中就绪事件最大值class epollServer{using func_t std::functionvoid(std::string);public:// 构造函数epollServer(func_t Handlerrequest, const int port default_port): _HandlerRequest(Handlerrequest), _port(port), _revs_maxnums(gnum){// 1、这是网络套接字部分// a、创建套接字_listensock Sock::Socket();// b、绑定套接字Sock::Bind(_listensock, _port);// c、监听套接字Sock::Listen(_listensock);logMessage(DEBUG, init socket, the listensock is %d, _listensock);// 2、这里我们要使用epoll模型// a、创建epoll实例_epfd Epoll::EpollCreate();// b、将上述_listensock添加到epoll实例中让其帮我们监视管理if (Epoll::EpollCtl(_epfd, EPOLL_CTL_ADD, _listensock, EPOLLIN) 0) // 注意这里的参数使用对于listensock我们主要关系其读事件。详细可以见先前小节的博文函数介绍exit(7);// c、申请就绪事件存储空间用于存储从epoll实例中返回的事件_revs new struct epoll_event[_revs_maxnums];logMessage(DEBUG, add listensock to epoll success. the epfd is %d. , _epfd);}// 析构函数~epollServer(){if (_listensock 0)close(_listensock);if (_epfd 0) // 我们使用epoll_create函数创建了一个epoll文件描述符应当在使用完毕后使用close()将其关闭它以释放系统资源close(_epfd);if (_revs)delete[] _revs; // 同理这是我们new出来的空间析构时也要进行释放}// 启动服务器void Start(){int timeout -1; // 单位是毫秒。这里我们设置为阻塞式等待正数为超时等待0表示立即返回-1表示无限期阻塞等待直到有事件发生while (true) // 轮循{LoopOnce(timeout); // 单轮这里和之前使用select、poll时一样只不过我们又封装了一层将一回合循环单独拎出构成函数}}private:// 启动服务器单回合void LoopOnce(int timeout){int n Epoll::EpollWait(_epfd, _revs, _revs_maxnums, timeout);switch (n){case 0: // 在指定的时间内没有任何文件描述符就绪logMessage(DEBUG, %s, time out, please try again. );break;case -1: // epoll_wait调用失败logMessage(WARNING, epoll wait error: %d, %s. , errno, strerror(errno));break;default: // 等待成功返回已经就绪的文件描述符个数logMessage(DEBUG, epoll success, get new events. );HandlerEvents(n); // 这里传入的n实则就是epoll_wait从就绪队列中捞取到的事件个数最大上限值为_revs_maxnum,但有可能就绪的事件比之要小break;}}// 等待成功, 执行相应事件void HandlerEvents(int n){assert(n 0); // 主要是用于判断是否真的有至少一个事件就绪for (int i 0; i n; i){// 先从_revs结构体数组中取对应成员uint32_t revents _revs[i].events;int sock _revs[i].data.fd; // 这里的联合体我们用的是fd文件描述符字段// 判断是什么事件if (revents EPOLLIN) // 读事件就绪{if (sock _listensock)Accepter(sock); // 连接事件到来elseRecever(sock); // INPUT事件到来}if (revents EPOLLOUT) // 写事件到来{// TODO多类型事件相关写法我们在后面介绍此小节先演示读事件}}}void Accepter(int listensock)//这里其实可以不用传参写是让Accepter(sock)、Recever(sock)统一接口。{// 准备工作用于存储获取到的客户端端口号和IP地址uint16_t clientport;std::string clientip;// 获取客户端连接此时使用accept将不会再被阻塞int servicesock Sock::Accept(listensock, clientport, clientip); // 后两个是输出型参数if (servicesock 0){logMessage(WARNING, accept error: %d,$s. , errno, strerror(errno));return;}logMessage(DEBUG, get a new link success. [%s: %d], sock is %d. , clientip.c_str(), clientport, servicesock);// 接下来该客户端可以和服务器进行网络通信涉及到数据读写这里仍旧需要将其放入epoll中监管if (Epoll::EpollCtl(_epfd, EPOLL_CTL_ADD, servicesock, EPOLLIN) 0){logMessage(WARNING, epoll_ctl servicesock error: %d,%s, close: %d , errno, strerror(errno), servicesock);close(servicesock);return;}logMessage(DEBUG, add a new servicesock %d to epoll success., servicesock);}void Recever(int sock){// 目前此处写法仍旧不完善这里我们假设读取到的就是完整报文char buffer[10240];ssize_t n recv(sock, buffer, sizeof(buffer) - 1, 0); // 读取数据if (n 0){buffer[n] 0;_HandlerRequest(buffer); // 根据需求处理数据}else if (n 0) // 读取到文件尾{// 1、在epoll中去掉对该sock的关心这里的顺序不能颠倒。epoll_ctl只针对合法的fd有效。int ret Epoll::EpollCtl(_epfd, EPOLL_CTL_DEL, sock, 0);assert(ret ! -1);(void)ret;// 2、关闭该sock文件close(sock);logMessage(NORMAL, client[%d] has quit, server will close its sock. , sock);}else{// 1、在epoll中去掉对该sock的关心bool ret Epoll::EpollCtl(_epfd, EPOLL_CTL_DEL, sock, 0);assert(ret);(void)ret;// 2、关闭该sock文件close(sock);logMessage(WARNING, %d sock recv error. the errno is %d, %s. , sock, errno, strerror(errno));}}private:int _listensock; // 监听套接字uint16_t _port; // 端口号int _epfd; // 用于记录epoll实例的fdstruct epoll_event *_revs; // 用于接收返回的就绪事件int _revs_maxnums; // 就绪事件的最大值func_t _HandlerRequest; // 函数调用用于完成服务器业务逻辑};
}这里需要注意几个细节 细节一 若底层存在大量已就绪的套接字(sock)而我们动态分配的_revs结构体数组在接收这些就绪事件时单次可能无法容纳所有就绪的套接字。此时该怎么办 回答不影响。当我们调用epoll_wait从就绪队列中捞取事件时一次取不完就下一次再取。此外鉴于我们使用的是动态空间管理也可以在检测到当前空间不足以容纳所有就绪事件时对_revs数组进行扩容 // 启动服务器单回合void LoopOnce(int timeout){int n Epoll::EpollWait(_epfd, _revs, _revs_maxnums, timeout);if(n _revs_maxnums){// 扩容……}}细节二 关于epoll_wait的返回值问题。相比于select和poll需要遍历判断是否是有效资源。该函数直接返回就绪的文件描述符fd的数量省去了对每一个fd进行有效性检查的繁琐过程。此外epoll_wait在返回时会按照一定顺序通常是它们被添加到epoll实例中的顺序 将所有就绪的事件填充到_revs数组中且这些事件的索引从0开始连续排列。 这方便了处理就绪事件因为_revs接收到的都是有效事件且我们还通过返回值获得了它们的数量这就不必一个个从头到尾遍历完实际数组大小再做判断。也就是上述epoll_wait等待成功后HandlerEvents的处理逻辑
// 等待成功, 执行相应事件
void HandlerEvents(int n)
{assert(n 0); // 主要是用于判断是否真的有至少一个事件就绪for (int i 0; i n; i)//这里的参数n就是根据epoll_wait的返回值得来的而不用判断到_revs_maxnums{// 先从_revs结构体数组中取对应成员uint32_t revents _revs[i].events;int sock _revs[i].data.fd; //……}
}细节三和之前讲述select、poll一样上述接收客户端数据时仍旧存在一个bug即如何保障读取到的是完整报文 关于此问题后续会介绍。
5.2.3.2、main.cc 在main函数中调用如下代码启动服务器进行演示
#includememory
#includeepollServer.hppvoid Handlerrequest(std::string str)
{//用于完成服务器业务逻辑std::cout str std::endl;
}int main()
{std::unique_ptrns_epoll::epollServer server(new ns_epoll::epollServer(Handlerrequest));server-Start();return 0;
}演示结果如下
5.3、如何基于epoll设计一个相对完整的服务器
5.3.1、epoll的工作模式LT 与 ET epoll是Linux下用于I/O事件多路复用的机制之一它支持两种工作模式水平触发Level-TriggeredLT和边缘触发Edge-TriggeredET。
5.3.1.1、概念介绍 1、场景引入 想象一下当你有包裹放在A、B两个驿站时。 对A驿站每当你有包裹到达时A驿站的工作人员会非常贴心地给你发送一条短信告诉你“您有包裹到了请来取件。”如果你因为包裹太多一次只取走了部分A驿站并不会就此罢休。它会再次检查剩余的包裹并继续给你发送短信提醒你“还有包裹未取出请继续来取。”这个过程会一直持续直到你取走了所有的包裹A驿站才会停止发送提醒。 这种场景就像epoll的水平触发模式。只要文件描述符上有可读的数据或可写的空间就像驿站里有你的包裹epoll_wait就会返回事件通知你的应用程序去处理。如果你没有一次性处理完所有数据没有取走所有包裹epoll_wait在下次调用时仍然会返回相同的事件直到你处理完所有数据为止。 对B驿站驿站的服务方式则与A驿站截然不同。它只会在首次收到你的新包裹时给你发送一条短信通知“您有新包裹到了请来取件。”如果你因为忙碌几天都没有去取包裹B驿站也不会再次发送短信提醒。同样如果你首次去取件时拿不下或其它原因只取走了部分包裹B驿站同样不会再次发短信通知告诉你还有剩余包裹未取。它只会在你又有新包裹到达时才会再次发送通知。 这就好比epoll的边缘触发模式。文件描述符的状态发生变化时如新的数据到达或发送缓冲区有空闲空间epoll_wait会返回事件。但如果你没有在一次通知中处理完所有数据没有取走所有包裹epoll_wait不会再次返回相同的事件直到有新的状态变化发生。这就要求你的应用程序必须一次处理完当前批次的数据确保不会有任何遗漏。否则就像驿站B里的包裹一样如果长时间无人问津这些数据可能会被丢弃或被视为丢失。 2、概念说明 水平触发 指在I/O事件发生时只要文件描述符如socket的状态满足触发条件如可读、可写或有错误发生系统就会一直通知应用程序直到该状态不再满足为止。 具体 1、当文件描述符的状态变为满足触发条件时例如接收缓冲区中有数据可读系统会将该事件放入事件队列中。 2、应用程序通过调用如epoll_wait等函数来检查事件队列获取并处理这些事件。 3、如果在处理事件后文件描述符的状态仍然满足触发条件例如接收缓冲区中仍有未读数据则下一次调用epoll_wait等函数时系统仍然会返回该事件。 4、只有在文件描述符的状态不再满足触发条件时例如接收缓冲区为空系统才不会再返回该事件。 边缘触发 指在I/O事件的状态发生变化时即从一种状态变为另一种状态。从无到有从有到多系统只通知应用程序一次。 具体 1、当文件描述符的状态从不满足触发条件变为满足触发条件时例如从接收缓冲区为空变为有数据可读系统会将该事件放入事件队列中。 2、应用程序通过调用如epoll_wait等函数来检查事件队列获取并处理这些事件。 3、与水平触发不同的是即使处理事件后文件描述符的状态仍然满足触发条件例如接收缓冲区中仍有未读数据系统也不会再次返回该事件除非状态再次发生变化例如接收缓冲区中的数据被完全读取再次变为空。 4、为了确保不会遗漏数据应用程序通常需要采用循环读取的方式直到读取操作返回特定的错误码如EAGAIN或EWOULDBLOCK表示当前已经没有更多的数据可读。 一个感性的理解图 3、再次举例理解 具体到代码场景中我们已经将一个TCP socket添加到epoll描述符中。此时socket的另一端写入了2KB的数据。当我们调用epoll_wait时它会返回表明该socket已经准备好进行读取操作。随后我们调用read函数但只读取了1KB的数据。如果我们再次调用epoll_wait 对水平触发LT epoll在默认状态下即为LT工作模式。当epoll检测到socket上有事件就绪时我们可以不立即处理它或者只处理其中的一部分。 以上述场景为例由于我们只读取了1KB的数据缓冲区中仍留有1KB的数据未处理。因此在第二次调用epoll_wait时它会再次立即返回并通知我们该socket的读事件已经就绪。这个过程会一直持续直到缓冲区中的所有数据都被完全处理。 LT模式支持阻塞读写和非阻塞读写两种方式。 边缘触发ET 如果我们在将socket添加到epoll描述符时使用了EPOLLET标志epoll就会进入ET工作模式。在ET模式下当epoll检测到socket上有事件就绪时我们必须立即处理它。 以上述场景为例即使我们只读取了1KB的数据而缓冲区中仍留有1KB的数据未处理但在第二次调用epoll_wait时它不会再返回该socket的读事件。也就是说在ET模式下文件描述符上的事件就绪后我们只有一次处理机会。因此我们必须确保在一次事件通知中尽可能多地处理数据以避免数据的遗漏。 ET模式只支持非阻塞的读写方式。
5.3.1.2、细节理解 1、细节说明一 回过头我们再来盘点一下根据上述内容可知 1、在ET模式下若上层应用未能及时取走已就绪的数据底层系统将不会再次发送通知。其结果为上层应用后续尝试读取数据时会发现无法获取到 fd 就绪事件变相等于数据丢失。因此ET模式实际上是在倒逼程序员一旦检测到有数据就必须一次将本轮就绪数据全部取走。 2、相比之下在LT模式下上层应用即使暂时不处理被通知的事件或者只处理其中的一部分数据也不会导致数据的丢失。因为底层系统会持续保持fd的就绪状态给予上层应用多次读取的机会直到所有数据都被处理完毕。 2、细节说明二 问题LT模式、 ET模式,原则上谁更高效? 为什么? 回答ET。 1、更少的返回次数 ET模式相较于LT模式能够显著减少数据的返回次数同一批次数据epoll_wait不必循环多次。这是因为ET模式的设计初衷就是促使应用程序尽快地从缓冲区中读取数据。 2、优化数据在网络通信过程中的传输效率 之前我们说过ET模式等同于倒逼应用程序尽快将缓冲区中的数据全部取走。而当应用层尽快地取走了接受缓冲区中的数据时单位时间内该模式下的接收方在返回ACK应答时就可以在报头中填入一个更大的接收窗口16位窗口大小所以发送方下一次发生报文时就可以拥有一个更大的滑动窗口一次向接收方发送更多的数据。这提高了IO的吞吐。 3、注意事项 实际上若在LT模式下上层应用也一次将所有就绪的数据全部读取完那么在这种情况下LT模式和ET模式在效率上其实也没有差别。所以对于ET的高效性要辩证的看待。类似于被动式学习和主动式学习ET模式是被迫不得已必须一次将数据取完LT模式如果它愿意它也可以主动地一次取完数据。 3、为什么epoll模式只支持非阻塞的读写 ET模式下sock必须是非阻塞工作模式。原因说明 ①根据上述为了保证一次就将本轮数据全部读取完成应用程序就需要一直循环读取直到确认没有更多的数据可读。 ②然而上层应用程序无法确认当前是否读取完成。因此在最后一次正常读取完毕有效数据后势必还会进行下一次读取即循环读取直到读取出错EAGAIN。 ③因此当所有数据都已被读取完毕后“下一次读取”是没有数据的。如果此时 sock 是阻塞模式由于要等待新的数据到来读取操作会将进程挂起等待这不符合我们使用多路转接的需求。 ④因此需要将sock设置为非阻塞模式。在非阻塞模式下当读取操作没有数据可读时它会立即返回一个特定的错误码如EAGAIN而不是挂起等待。这样应用程序就可以根据这个错误码决定是否继续读取或进行其他操作。
5.3.2、前情回顾
5.3.2.1、问题分析 说明 回顾我们之前5.2中写的内容当时我们曾提到Recever的写法是有问题。 void Recever(int sock){// 目前此处写法仍旧不完善这里我们假设读取到的就是完整报文char buffer[10240];ssize_t n recv(sock, buffer, sizeof(buffer) - 1, 0); // 读取数据if (n 0){buffer[n] 0;_HandlerRequest(buffer); // 根据需求处理数据}else if (n 0) // 读取到文件尾{// 1、在epoll中去掉对该sock的关心这里的顺序不能颠倒。epoll_ctl只针对合法的fd有效。int ret Epoll::EpollCtl(_epfd, EPOLL_CTL_DEL, sock, 0);assert(ret ! -1);(void)ret;// 2、关闭该sock文件close(sock);logMessage(NORMAL, client[%d] has quit, server will close its sock. , sock);}else{// 1、在epoll中去掉对该sock的关心bool ret Epoll::EpollCtl(_epfd, EPOLL_CTL_DEL, sock, 0);assert(ret);(void)ret;// 2、关闭该sock文件close(sock);logMessage(WARNING, %d sock recv error. the errno is %d, %s. , sock, errno, strerror(errno));}}观察上述代码有一个问题我们一直没有思考过即如何保证读取到的是完整的报文上述代码能保证吗 回答不能保证。 1、因此为了以防我们读取到的数据不完整势必需要对每一次读取到的数据暂存在buffer缓冲区中直到后续读取到完整报文时把数据拼接在一起之后再向上交付。这就要求应用层中我们的buffer缓冲区它不能只是一个局部变量。因为局部变量在函数执行完毕后会被销毁导致数据丢失。 2、此外要知道服务器面对的可不仅仅只是一个客户端这就带来一个问题如何能区分清楚缓冲区中的数据各自对应的客户端所以这里的“完整性”不仅体现在数据流上还体现在数据来源上。我们需要为每个客户端维护一个独立的缓冲区以便正确区分和拼接数据。 3、因此为了保证未来能够正确读取到完整的报文对于每一个sock都应该有属于它自己的缓冲区接收缓冲区发送缓冲区。每个客户端的缓冲区都应该是独立的这样才能确保数据的完整性和正确性。我们之前写的这个缓冲区buffer它只是一个临时变量且为所有sock共享这是不符合要求的。
5.3.2.2、reactor 设计模式 Reactor模型是一种在事件驱动架构中用于处理非阻塞I/O操作的设计模式。 相关扩展博文Reactor模型详解 Reactor模型将客户端请求提交到一个或多个服务处理程序其核心思想是通过一个或多个“反应器”Reactor来统一处理多个非阻塞I/O操作。Reactor负责监听I/O事件如连接请求、数据读写等并将这些事件分发给相应的事件处理器Handlers进行处理。这样Reactor模型实现了事件的接收、处理和分发的解耦提高了程序的灵活性和可维护性。 主要包含以下几个组件 Reactor 负责监听和分发事件。Reactor通常是一个单线程的事件监听器它使用I/O多路复用机制如select、poll、epoll等来监听多个I/O事件源。 事件处理器Handlers 具体处理I/O事件的逻辑。每个事件处理器都关联一个或多个事件类型当Reactor监听到相应的事件时会调用对应的事件处理器进行处理。 事件源 产生I/O事件的实体如套接字、文件描述符等。事件源在发生I/O操作时会生成相应的事件并注册到Reactor中。 工作流程如下
1、应用程序将需要处理的事件及其对应的处理器注册到Reactor中。
2、Reactor开始监听所有注册的事件源。
3、当某个事件源发生事件时如连接请求、数据到达等Reactor会捕获这个事件。
4、Reactor根据事件类型将事件分发给对应的事件处理器进行处理。
5、事件处理器执行具体的业务逻辑处理完事件后将结果返回给应用程序或进行下一步操作。这里我们主要使用epoll演示单reactor单线程模型。 5.3.3、log.hpp、sock.hpp、Protocol.hpp
5.3.3.1、log.hpp 这里日志信息不变和之前一样主要用于方便我们监测信息有需要可自行修改。由于上文展示过这里不再重复。相关跳转。
5.3.3.2、Protocol.hpp 这里使用了之前的网络版计算器自定义协议版。使用telnet测试简化了一下报文粘包问题。
#pragma once
#include iostream
#include cstring
#include string
#include unistd.h
#include vector
#include sys/types.h
#include sys/socket.h// 这里的协议是用于服务网络版计算器。// 解决单个报文数据读取问题使用空格作为分隔符定义成宏方便根据需求修改
#define SPACE
#define SPACE_LINE strlen(SPACE)
// 解决粘包问题使用特殊字符#区分各报文
#define SEP #
#define SEP_LINE strlen(SEP)/// 请求结构体对象 ///
class Request
{
public:// 构造Request() {};Request(int x, int y, char op): x_(x), y_(y), op_(op){}// 对请求进行序列化结构化数据→字节流数据std::string Serialize() // 将x_、y_、op_{// version1 x_[空格] op_[空格] y_std::string str;str std::to_string(x_); // 先将对应的运算数转换为字符类型例如32--32。这里注意与ASCII中值为32的字符区别str SPACE; // 中间以我们设置的间隔符分割为了反序列化时能够提取每部分str op_; // op_本身就是char类型str SPACE;str std::to_string(y_);return str;}// 对请求进行反序列化字节流数据→结构化数据bool Deserialized(const std::string str) // 获取x_、y_、op_{//----------------------------------// version1 x_[空格] op_[空格] y_ 根据分隔符提取有效数放入结构化对象中// 例如1234[空格][空格]5678// a、找左运算数std::size_t left_oper str.find(SPACE);if (left_oper std::string::npos) // 没找到return false;// b、找右运算数std::size_t right_oper str.rfind(SPACE);if (right_oper std::string::npos) // 没找到return false;// c、提取运算数赋值给结构化对象成员x_ atoi((str.substr(0, left_oper)).c_str()); // string substr (size_t pos 0, size_t len npos) const;y_ atoi((str.substr(right_oper SPACE_LINE).c_str())); // 注意这里右运算符需要将[空格]跳过if (left_oper SPACE_LINE str.size())return false;elseop_ str[left_oper SPACE_LINE]; // 提取运算符时也要注意跳过分隔符[空格]return true;//----------------------------------}public:int x_; // 左运算数int y_; // 右运算数char op_; // 运算符
};/// 响应结构体对象 ///
class Response
{
public:// 构造函数Response(int result, int code): result_(result), code_(code){}Response() {}// 析构函数~Response() {}// 对响应序列化结构化数据→字节流数据std::string Serialize(){// version1code_ [空格] result_// 例如0[空格]6912std::string str;str std::to_string(code_);str SPACE;str std::to_string(result_);return str;}// 对响应反序列化字节流数据→结构化数据bool Deserialized(const std::string str){//----------------------------------// version1code_ [空格] result_// 例如0[空格]6912// a、找分隔符std::size_t pos str.find(SPACE);if (pos std::string::npos) // 没找到return false;// b、获取状态码code_ atoi((str.substr(0, pos)).c_str());// c、获取计算结果result_ atoi((str.substr(pos SPACE_LINE)).c_str());return true;//----------------------------------}public:int result_; // 计算结果int code_; // 状态码用于判断结果是否正常
};// 我们要把传入进来的缓冲区进行切分
// 1. 从buffer中被切走的部分也同时要从buffer中移除
// 2. 可能会存在多个报文多个报文要依次放入out
// buffer: 输入输出型参数
// out: 输出型参数
void Decode(std::string buffer, std::vectorstd::string *out)
{// 100 // 100 123#110// 100 123#110 / 2while (true){std::size_t pos buffer.find(SEP);// 分包找SEPif (pos std::string::npos) // 没找到说明本次报文不完整需要继续读取/接收break;// 执行到此说明确实有#但不一定代表数据完整。std::string message buffer.substr(0, pos);//找单个子串buffer.erase(0, pos SEP_LINE);//移除buffer中相关子串out-push_back(message);// std::cout debug: message is: message , then the buffer is: buffer std::endl;// sleep(1); // 用于测试}
}// 构建应答报文
std::string Encode(std::string str)
{// 1、加上SEP分隔符str SEP;return str;
} 5.3.3.3、sock.hpp 1、相关说明 基本介绍 大体不变主要是对sock套接字编程的相关接口进行封装。与之区别的是此处我们要使用 epoll 的边缘触发模式ET模式根据之前介绍ET模式只支持非阻塞的读写方式因此我们 需要将套接字设置为非阻塞模式。 原因解释 这是因为边缘触发模式只会在状态变化时通知一次如果套接字是阻塞的那么一次读取操作可能无法完全读取所有可用数据导致后续读取操作被阻塞从而错过其他事件。 如何操作 关于如何设置非阻塞IO可以使用fcntl()函数其具体用法之前演示过这里不再说明。
NAMEfcntl - manipulate file descriptorSYNOPSIS#include unistd.h#include fcntl.hint fcntl(int fd, int cmd, ... /* arg */ );2、相关代码 在socket TCP套接字编程中accept函数在阻塞socket和非阻塞socket上的底层行为各有不同 阻塞socket上的accept行为 当服务器端调用accept函数时如果监听队列中没有已完成的连接请求accept函数将会阻塞即暂停执行直到有一个连接请求被接受或发生错误。一旦有客户端成功连接到服务器监听队列中会有一个已完成的连接请求。此时accept函数会从监听队列中取出这个连接请求并创建一个新的套接字也称为已连接套接字。 非阻塞socket上的accept行为 在非阻塞模式下当服务器调用accept函数时它不会阻塞等待连接请求。如果监听队列中没有已完成的连接请求accept函数会立即返回一个错误码通常是EWOULDBLOCK或EAGAIN表示当前没有可接受的连接。由于非阻塞模式下accept函数不会阻塞因此服务器需要轮询调用accept函数来检查是否有新的连接请求。这通常是通过事件驱动机制如select、poll、epoll等来实现的。
#pragma once#include iostream
#include string
#include assert.h
#include unistd.h
#include string.h
#include sys/types.h
#include sys/socket.h
#include fcntl.h
#include arpa/inet.h
#include netinet/in.hclass Sock
{// listen的第二个参数意义底层全连接队列的长度 listen的第二个参数1const static int gbacklog 10;public:static int Socket(){// 创建套接字// int socket(int domain, int type, int protocol);int listensock socket(AF_INET, SOCK_STREAM, 0);if (listensock 0)exit(2);// 为了防止服务端断开后无法立即重启// int getsockopt(int sockfd, int level, int optname,void *optval, socklen_t *optlen);int opt 1;setsockopt(listensock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, opt, sizeof(opt));return listensock;}static void Bind(int sock, uint16_t port, std::string ip 0.0.0.0){// 绑定套接字// int bind(int sockfd, const struct sockaddr *addr,socklen_t addrlen);struct sockaddr_in local;bzero(local, sizeof local); // 清零local.sin_family AF_INET;local.sin_port htons(port); // 主机字节序--网络字节序inet_aton(ip.c_str(), local.sin_addr); // 主机字节序点分十进制---网络字节序四字节序if (bind(sock, (const sockaddr *)local, sizeof(local)) 0)exit(3);}static void Listen(int sock){// 监听: int listen(int sockfd, int backlog);if (listen(sock, gbacklog) 0)exit(4);}static int Accept(int sock, uint16_t *port, std::string *ip, int* accept_errno){// 获取连接// int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);struct sockaddr_in client;bzero(client, sizeof(client));socklen_t len sizeof(client);int servicesock accept(sock, (struct sockaddr *)client, len);if (servicesock 0){*accept_errno errno;// 获取错误码return -1;}// 将获取到的客户端端口号和ip返回给服务器这里是通过输出型参数的方式if (port)*port ntohs(client.sin_port); // 网络字节序--主机字节序if (ip)*ip inet_ntoa(client.sin_addr);return servicesock;}static bool Connect(int sock, const uint16_t port, const std::string ip){// int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);struct sockaddr_in server;bzero(server, sizeof(server));server.sin_family AF_INET;server.sin_port htons(port);server.sin_addr.s_addr inet_addr(ip.c_str());if (connect(sock, (struct sockaddr *)server, sizeof server) 0)return false;elsereturn true;}static bool SetNonBlock(int sock){int fl fcntl(sock, F_GETFL);if(fl 0)//On error, -1 is returned, and errno is set appropriately.return false;if(fcntl( sock, F_SETFL, fl | O_NONBLOCK) 0) return false;return true;}
};5.3.4、epoll.hpp 1、相关说明 在实现 reactor 模型时实际开发中通常会使用虚基类来抽象不同的多路复用 I/O 机制如 select、poll、epoll 等。这种设计允许在不修改 reactor 核心逻辑的情况下轻松地切换底层的多路复用机制。 举例如下
// 虚基类定义接口
class Poll
{
public:virtual ~Poll() default;// 添加文件描述符到监听列表举例virtual void add(int fd) 0;// 从监听列表中移除文件描述符举例virtual void remove(int fd) 0;// 等待并处理事件举例virtual void wait(std::functionvoid(int) callback) 0;
};// select 实现
class SelectPoll : public Poll
{// ... 省略具体实现细节 ...
};// poll 实现
class PollPoll : public Poll
{// ... 省略具体实现细节 ...
};// epoll 实现
class Epoll : public Poll
{// ... 省略具体实现细节 ...
}; 这里我们不作继承处理直接使用epoll来进行reactor底层的多路转接。 2、相关代码
#pragma once
#include iostream
#include unistd.h
#include sys/epoll.hclass Epoll
{const static int gnum 256; // epoll_create的参数已废弃const static int gtimeout 5000; // 默认timeout的时间public:Epoll(int timeout gtimeout): _epfd(-1),_timeout(timeout) // 这里的设置方式是在实例化Epoll类时传入另外一种写法可以在调用epoll_wait的类成员函数接口中作为参数传入。可根据自己的需求任意调整{}~Epoll(){if (_epfd 0)close(_epfd); // 析构需要释放掉epoll的fd}// 创建一个epoll实例int epoll_create(int size);void CreateEpoll(){_epfd epoll_create(gnum);if (_epfd 0)exit(6); // On error, -1 is returned}// epoll对象的从就绪队列中捞取就绪事件int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);int WaitEpoll(struct epoll_event revs[], int revs_num){return epoll_wait(_epfd, revs, revs_num, _timeout);}// 在下述分别实现对epoll对象增删查改// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);// EPOLL_CTL_ADD向 epoll 实例中添加一个新的文件描述符 fd 和相关的事件。bool AddFromEpoll(int sock, uint32_t events){struct epoll_event ev;ev.events events;ev.data.fd sock; // 这里传入的sock是文件描述符return epoll_ctl(_epfd, EPOLL_CTL_ADD, sock, ev) 0; // When successful, returns zero.}// EPOLL_CTL_DEL从 epoll 实例中删除一个已存在的文件描述符 fd。bool DelFromEpoll(int sock){return epoll_ctl(_epfd, EPOLL_CTL_DEL, sock, nullptr) 0;}// EPOLL_CTL_MOD修改已添加到 epoll 实例中的文件描述符 fd 的事件。bool ModFromEpoll(int sock, uint32_t events){events | EPOLLET; // 默认为LT模式这里将epoll修改为ET模式边缘触发struct epoll_event ev;ev.events events;ev.data.fd sock;return epoll_ctl(_epfd, EPOLL_CTL_MOD, sock, ev) 0;}private:int _epfd;int _timeout; // 指定多读转接的超时时间这里我们将其暴露给上层可根据需要设置。
};5.3.6、TcpServer.hpp
5.3.6.1、Connection类 1、相关说明 根据之前5.3.2中的分析可知在采用Reactor设计模式构建TCP服务端的过程中为了确保未来能够正确接收到完整的报文对于每一个socketsock都应配备独立的接收发送缓冲区与发送缓冲区。 基于这一需求这里我们设计了一个专门用于管理socket的类。这就意味着在 TcpServer 内部将维护一个由众多Connection对象组成的集合每当有新的连接建立时不再是仅仅操作一个socket而是为该连接创建一个新的Connection实例并填充必要的信息。 2、相关代码 可以把下属类单独封装成一个Connection.hpp文件也可以直接写在TcpServer.hpp中形式不一主要学习理解设计思想。
class Connection;
using func_t std::functionvoid(Connection *);// 解释这个类TCP Server里会维护大量的Connection每获取一个连接不再是简简单单的使用sock而是对其new一个Connection对象并填入相关信息。
class Connection
{
public:Connection(int sock -1): _sock(sock), _ptsv(nullptr){}// 一个客户端连接通常面临三类事件读、写、异常。因此我们为每个socket连接设置了三个回调函数分别用于响应这三种事件。// 这些回调函数的具体实现由上层逻辑决定Connection类无需维护这些实现的细节只需在相应事件发生时调用对应的回调函数即可。void SetCallBack(func_t recv_cb, func_t send_cb, func_t except_cb){_recv_cb recv_cb;_send_cb send_cb;_except_cb except_cb;}public:// 用于进行IO的文件描述符int _sock;// 三个回调方法表征的就是对_sock进行特定读写对应的方法func_t _recv_cb; // 读回调func_t _send_cb; // 写回调func_t _except_cb; // 异常回调// 每个sock都需要有属于自己的接收/发送缓冲区使用string的这种写法目前无法处理二进制流只是针对文本std::string _inbuffer; // 接收缓冲区/输入缓冲区std::string _outbuffer; // 发送缓冲区/输出缓冲区// 设置对Tcp服务器的回值指针后续有用TcpServer *_ptsv;
}; 5.2.5.2、Tcpserver类 // 即这个TCP服务器是基于reactor模式设计的。
class TcpServer
{// const修饰的类的静态成员变量可以在类内初始化const static int default_port 9090; // 默认端口号const static int default_revs_num 128; // 默认的就绪事件集最大数量
public:TcpServer(int port default_port): _port(port), _revs_num(default_revs_num){// 1、这是网络套接字部分_listensock Sock::Socket(); // a、创建套接字Sock::Bind(_listensock, _port); // b、绑定Sock::Listen(_listensock); // c、监听// 2、这是epoll多路转接部分_poll.CreateEpoll(); // a、创建多路转接对象AddConnection(_listensock, std::bind(TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); // b、添加listensock到服务器中监听sock只用于接受客户端连接只负责读取并不关心写入和异常// 语法解释构造函数的函数体内能使用该类的成员函数因为初始化列表结束该对象就生成了// 语法解释C11这里的写法涉及function包装器对类内非静态成员函数的使用以及如何使用bind解决该问题。_revs new struct epoll_event[_revs_num]; // c、构建存放就绪事件的事件集}~TcpServer(){if(_listensock 0)close(_listensock);if(_revs)delete[] _revs;}// 解释该函数的作用为什么需要单独拎出// 要知道除了最初的_listensock未来服务器会连接大量的客户端socket而每一个sock都必须被封装成为一个Connection且每一个sock都需要交付给epoll监测// 当服务器中存在大量的Connection的时候TcpServer就需要将所有的Connection要进行管理而管理的方式是“先描述在组织”描述我们有Connection对象如何组织// 自然是需要建立一个sock与其对应Connection的集合体由此才有了TCPServer中的映射表std::unordered_mapint, Connection * _connections这里不使用这个结构也行的关键在于要理解为什么需求这个表。// 上述是一系列前因为什么要有这个函数就是因为这些大量的serversock和listensock一样对每一个到来的sock都需要完成如下相同的步骤操作因此我们将其封装成一个函数专门针对任意sock进行添加TcpServer。void AddConnection(int sock, func_t recv_cb, func_t send_cb, func_t except_cb){// 0、ET模式soke需要设置为非阻塞状态Sock::SetNonBlock(sock);// 1、构建conn对象封装sockConnection *conet new Connection(sock);conet-SetCallBack(recv_cb, send_cb, except_cb);conet-_ptsv this;// 2、将sock添加到epoll中进行监管_poll.AddFromEpoll(sock, EPOLLIN | EPOLLET); // 解释这里需要监管事件多路转接的服务器一般默认只打开对读取事件的关心而写入事件则按需打开// 3、不要忘记Tcpserver中用于维护映射关系的Connection映射表_connections.insert(std::make_pair(sock, conet));}// 上层调取根据就绪的事件进行特定事件的派发void Dispather(callback_t call){_call call; // 设置上层的业务处理函数while (true){loopOnce(); // 单次事务处理}}void loopOnce(){int n _poll.WaitEpoll(_revs, _revs_num); // 从epoll中捞取就绪事件集for (int i 0; i n; i) // 挨个处理细节如果底层没事件就绪/等待超时那么此处n0是不会进入循环的{int sock _revs[i].data.fd;uint32_t revents _revs[i].events;// 统一将所有异常交给read或write处理。if (revents EPOLLERR)// EPOLLERR文件描述符上发生了错误revents | (EPOLLIN | EPOLLOUT); if (revents EPOLLHUP)// EPOLLHUP: 表示文件描述符上的连接已经挂起例如TCP连接被对方关闭。revents | (EPOLLIN | EPOLLOUT); // 这里不用判断就绪的sock是监听sock还是普通sock因为我们建立了connection体系其中就包含有这些sock对应需要的方法即listensock会去调用Accepterserversock会调用receiver、Sender、Excepterif (revents EPOLLIN) // 读事件就绪{// 为什么需要判断sock是否存在// 1、验证其合法性。比如我们这种写法中有可能读写事件都就绪了但某次读事件读取失败导致跳转调用了异常事件将sock关闭了。那么后续写事件处就会因为sock不合法而不执行。// 2、从代码角度后续_connections[sock]处要调用就需要保证-合法。if (IsConnectionExist(sock) _connections[sock]-_recv_cb)_connections[sock]-_recv_cb(_connections[sock]); // 注意这里unordered_map::operator[]的返回值含义。}if (revents EPOLLOUT) // 写事件就绪{if (IsConnectionExist(sock) _connections[sock]-_send_cb)_connections[sock]-_send_cb(_connections[sock]);}}}bool IsConnectionExist(int sock){auto iter _connections.find(sock);if (iter ! _connections.end())return true;return false;}void Accepter(Connection *conet){// 来到此处我们可以保证的是此时底层一定有连接事件就绪。本次读取accepte不会被阻塞。logMessage(DEBUG, Accepter be called. sock is %d, conet-_sock);// 为什么要循环监听你怎么保证底层只有一个连接就绪呢// epoll从底层捞取就绪事件有可能在一次调用中捞取到多个就绪的连接事件。ET模式下如何保证本轮捞取完全需要不断循环一直捞取直到accept返回失败为止。// 我们设置循环有可能底层就只有一个连接就绪那后续循环中accept难道不会阻塞吗不会,因为我们已经将监听socket设置为NONBLOCK非阻塞模式,如果没有连接请求accept函数会立即返回一个错误码.while (true){std::string clientip;uint16_t clientport;int accept_errno 0; // 获取accept的错误码以便后续判断处理int sock Sock::Accept(conet-_sock, clientport, clientip, accept_errno);if (sock 0) // accept失败判断情况{if (accept_errno EAGAIN || accept_errno EWOULDBLOCK) // 非阻塞I/O操作无法立即完成:底层没有新的连接到来break;else if (accept_errno EINTR) // IO过程被信号中断continue;else{ // 来到这里才是真正的accept失败logMessage(WARNING, accept error, %d, %s, accept_errno, strerror(accept_errno));break; // 为什么使用break:这里的失败并不影响我们最终通信处理,连接失败了客户端再连接一次即可.}}// 连接事件就绪不代表读写就绪客户端不一定立马会发送数据因此我们需要将sock托管给TcpServerepoll connectionif (sock 0) // 上述已经判断过这个条件判断不加也行这里是为了逻辑完善{// 此时获取到的是常规IO sock因此其需要关心的事件就是读、写、异常AddConnection(sock, std::bind(TcpServer::Receiver, this, std::placeholders::_1),std::bind(TcpServer::Sender, this, std::placeholders::_1),std::bind(TcpServer::Expecter, this, std::placeholders::_1));logMessage(DEBUG, accept client, %s:%d, add its to epoll and connection, sock is %d, clientip.c_str(), clientport, sock);}}}void Receiver(Connection *conet){// 来到这里我们可以保证的是此时底层一定有读事件到来。logMessage(DEBUG, Receiver be called. sock is %d, conet-_sock);const int buffer_num 1024;bool read_error false; // 用于判断recv读取数据时的错误情况while (true){char buffer[buffer_num]; // 临时缓冲区因为我们始终会循环读取直到取完本次读事件中的数据将其拼接到sock的输入缓冲区中因此这里的大小设置实则影响不大。ssize_t n recv(conet-_sock, buffer, sizeof(buffer) - 1, 0); // 这里flage虽然设置为0但实际读取时一定会是非阻塞读取。if (n 0) // 读取失败,判断情况{if (errno EAGAIN || errno EWOULDBLOCK)break;else if (errno EINTR)continue;else{ // 来到这里才是真正的读取出错。交由异常函数处理即本轮读取作废不处理。logMessage(ERROR, recv error, %d : %s, errno, strerror(errno));conet-_except_cb(conet);read_error true; // 设置break;}}else if (n 0){// 读取到文件尾在网络通信中意味着客户端关闭连接这里我们也统一交由异常回调处理.logMessage(DEBUG, client[%d] quit, server will close its sock: %d, conet-_sock, conet-_sock);conet-_except_cb(conet);read_error true;break;}else{ // 读取成功buffer[n] 0;conet-_inbuffer buffer; // 将读取到的数据存入自己的输入缓冲区中}}// 来到此处上述recv循环退出。我们读取客户端事件是为了进行业务处理因此需要对拿到的完整数据进行后续的业务处理。if (!read_error) // 只要读取不出错{logMessage(DEBUG, conet-_inbuffer[sock:%d]:\n%s, conet-_sock, conet-_inbuffer.c_str());// 首先要解决粘包问题虽然上述解决了单批次读取报文的完整性但这并不代表这一批次获取到的报文就是独立的。std::vectorstd::string messages;Decode(conet-_inbuffer, messages);// 来到此处就能保证读取到的是一个一个的独立、完整的报文可以进行后续的业务处理for (auto msg : messages) // 如果message为空 本次循环不会被调用{_call(conet, msg); // 网络服务器一般不和上层业务强耦合。// TcpServer只需要完成事件派发、处理事件、根据协议获取到数据即可致于这些数据是用来做什么业务的不关心。这是上层的事。// 扩展这里还可以将message封装成Task然后Push到任务队列中让线程池处理。}}}void Sender(Connection *conet){// 来到这里我们可以保证的是此时底层一定有写事件到来。while (true){ssize_t n send(conet-_sock, conet-_outbuffer.c_str(), conet-_outbuffer.size(), 0);if (n 0) // On success, these calls return the number of characters sent.{ // 将_outbuffer中的数据发送给对方不一定保证对方接受端就能存储这么多的数据因此这里send的返回值用于判断实际发送的数据大小conet-_outbuffer.erase(0, n); // 需要将已发送的数据从输出缓冲区中清除PS是否需要考虑丢包问题这是TCP底层缓冲区要做的事不是我们应用层负责的事if (conet-_outbuffer.empty()) // 缓冲区中全部数据发送完毕break;}else // On error, -1 is returned, and errno is set appropriately.{if (errno EAGAIN || errno EWOULDBLOCK) // 非阻塞IO如果系统内核中的发送缓冲区已满send函数会立即返回一个错误码EWOULDBLOCK或EAGAIN表示当前无法发送数据。// 实则问题不大因为我们上层发送缓冲区中保留着需要发送的数据不过就是再次触发写事件break;else if (errno EINTR) // 被信号中断continue;else{ // 来到这里才是真正的读取出错。交由异常函数处理即本轮读取作废不处理。logMessage(ERROR, send error, %d : %s, errno, strerror(errno));conet-_except_cb(conet);break;}}}// 来到这里能保证数据发完了吗// 回答根据上述break的情况可知不能确定。能保证的是来到此处要么是发送完成要么是发送条件不满足需要下此发送。因此这里需要判断输出缓冲区的实际情况。if (conet-_outbuffer.empty())EnableReadWirte(conet, true, false); // 如果输出缓冲区中无数据说明发送完成此时应该关闭写事件(当TCP套接字的发送缓冲区未满时会根据ET模式、LT模式触发EPOLLOUT事件elseEnableReadWirte(conet, true, true); // 如果输出缓冲区中还有数据继续触发写事件}// 汇集了服务器里上述种种情况中的异常。void Expecter(Connection *conet){if (!IsConnectionExist(conet-_sock))// 说明之前被处理过return;logMessage(DEBUG, Excepter: 出现异常事件回收异常sock, sock is %d, conet-_sock);// 1、从epoll中移除bool ret _poll.DelFromEpoll(conet-_sock);assert(ret);// 2、从_connections映射表中移除_connections.erase(conet-_sock);// 3、关闭异常套接字close(conet-_sock);// 4、释放为其申请的Connection对象delete conet;logMessage(DEBUG, Excepter: 资源回收完毕);}// 使能读写用于修改epoll对一个sock的读写事件的监控void EnableReadWirte(Connection *conet, bool readable, bool writeable){// 后两个参数是否关心该sock的读事件/是否关心该sock的写事件uint32_t events ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0));if (IsConnectionExist(conet-_sock)){int ret _poll.ModFromEpoll(conet-_sock, events); // 由于我们在该函数内部 或等(|)了 EPOLLET此处可以不添加ET模式。assert(ret);}}private:// 网络编程这一套int _listensock;int _port;// 存储sock的Connection 类映射表std::unordered_mapint, Connection * _connections; // 建立了一个映射关系由sock作为key值就能获取到该sock对应的缓冲区、处理事件的响应函数。// 附注这里我们是直接使用了Connection*的指针没有使用RAII智能指针进行管理相应的这就需要我们在编写时设计、考虑得全面一些有申请就要有释放。// 多路转接这一套底层负责检测sock及其事件Epoll _poll; // epoll实例struct epoll_event *_revs; // 存储就绪事件集结构体数组int _revs_num; // 就绪事件最大接收值callback_t _call; // 函数调用用于完成服务器业务逻辑
}; 5.2.5.3、演示结果 5.3.7、TcpServer.cc
#include TcpServer.hpp
#include Protocol.hpp
#include memoryResponse Calculator(const Request req)
{// 根据op选项进行计算Response resp(0, 0);switch (req.op_){case :resp.result_ req.x_ req.y_;break;case -:resp.result_ req.x_ - req.y_;break;case *:resp.result_ req.x_ * req.y_;break;case /:if (req.y_ 0) // 除零错误需要设置状态码resp.code_ 1;elseresp.result_ req.x_ / req.y_;break;case %:if (req.y_ 0) // 模零错误需要设置状态码resp.code_ 2;elseresp.result_ req.x_ % req.y_;break;default: // 输入错误需要设置状态码resp.code_ -1;break;}return resp; // 返回结果响应结构体对象
}void Call(Connection *conet, std::string str)
{logMessage(DEBUG, call service, sock is %d, str is# %s, conet-_sock, str.c_str());// 1、反序列化Request reque;reque.Deserialized(str);// 2、业务处理获得结果Response respon Calculator(reque);// 3、将结果序列化编码构建应答std::string result_str respon.Serialize();result_str Encode(result_str);// 4、将结果返回给客服端这不是我上层需要关系的事交给Tcp服务器处理// a、将待发送数据交给当前sock的输出缓冲区conet-_outbuffer result_str;// b、服务器中写入事件默认关闭按需打开。这里要发送数据就需要想办法让底层的写事件就绪。// c、这就是使能读写函数EnableReadWirte的诞生与connection中Tcp服务器的回值指针的用处用于触发发送的动作conet-_ptsv-EnableReadWirte(conet, true, true); // d、需要注意一旦我们开启EPOLLOUTepoll会自动立马触发一次发送事件就绪。// e、由于我们写的Sender的逻辑只要触发了这里的首次调用后续如果需要保持服务端数据发送epoll自动监测。
}void Usage(std::string proc)
{std::cout \n Usage: proc port\n std::endl;
}int main(int argc, char *argv[])
{if (argc ! 2){Usage(argv[0]);exit(1);}uint16_t port atoi(argv[1]);std::unique_ptrTcpServer server(new TcpServer(port)); // 构造服务器server-Dispather(Call); // 进行任务派发return 0;
}Fin、共勉。