做任务反佣金的网站,网站首页建设,北京网站优化服务有限公司,wordpress验证Muduo网络库的EventLoop模块是网络编程框架中的核心组件#xff0c;负责事件循环的驱动和管理。以下是对EventLoop模块的详细介绍#xff1a;
作用与功能#xff1a;
EventLoop是网络服务器中负责循环的重要模块#xff0c;它持续地监听、获取和处理各种事件#xff0c;…Muduo网络库的EventLoop模块是网络编程框架中的核心组件负责事件循环的驱动和管理。以下是对EventLoop模块的详细介绍
作用与功能
EventLoop是网络服务器中负责循环的重要模块它持续地监听、获取和处理各种事件如IO事件、定时器事件等。它通过轮询访问Poller如EPollPoller获取激活的Channel列表然后使Channel根据自身情况调用相应的回调函数来处理事件。EventLoop确保了每个Loop都是相互独立的拥有自己的事件循环、Poller监听者和Channel监听通道列表。
与Poller的关系
Poller负责从事件监听器上获取监听结果即哪些文件描述符fd上发生了哪些事件。EventLoop会轮询访问Poller以获取这些发生事件的fd及其相关事件。
与Channel的关系
Channel类是对文件描述符(fd)以及其相关事件的封装。它保存了fd的感兴趣事件、实际发生的事件以及每种事件对应的处理函数。当Poller检测到某个fd上有事件发生时EventLoop会找到对应的Channel并调用其上的回调函数来处理该事件。
线程模型
EventLoop遵循“one loop one thread”的原则即每个EventLoop都在一个独立的线程上运行。这种设计使得事件处理更加高效和清晰避免了多线程环境下的竞态条件和同步问题。
mainLoop和subLoop
在Muduo网络库中mainLoop和subLoop都是EventLoop的实例它们分别代表主事件循环和子事件循环。
mainLoop主事件循环
mainLoop是整个Muduo网络库的核心事件循环。它负责监听服务器套接字通常是listenfd并接受来自客户端的连接请求。mainLoop运行一个Accrptor包含一个Poller用于监听一个特定的非阻塞的服务器sockfd上的读事件。当Poller检测到有读事件发生时一般是新用户连接mainLoop会在线程池中通过轮询算法选择一个subLoop来处理这个连接的读写和关闭事件。Acceptor将在后续阐述。mainLoop遵循 “one loop one thread” 的原则即每个mainLoop都在一个独立的线程上运行。这确保了事件处理的高效性和清晰性避免了多线程环境下的竞态条件和同步问题。
subLoop子事件循环
subLoop是mainLoop的子事件循环用于处理已建立的连接的读写和关闭事件。每个subLoop都在一个独立的线程上运行有一个用于唤醒自身的fd和Channel运行一个Poller并保存自己管理的多个Channel以实现并发处理多个连接的目的。当mainLoop接受到一个新的连接请求时它会根据EventLoopThreadPool中的线程来选择一个subLoop将新创建的TcpConnection的Channel放入这个subLoop中。这个subLoop会接管该连接的fd并监听其上的读写和关闭事件。subLoop中的事件处理逻辑与mainLoop类似也是通过Poller来监听fd上的事件并调用相应的回调函数来处理这些事件。由于subLoop是独立的线程因此它们可以并行处理多个连接从而提高了服务器的并发处理能力。
总的来说mainLoop和subLoop共同构成了Muduo网络库的事件驱动编程框架。mainLoop负责监听服务器套接字并接受连接请求而subLoop则负责处理已建立的连接的读写和关闭事件。通过合理的线程调度和事件处理机制Muduo网络库能够高效、稳定地处理大量的并发连接请求。
EventLoop.h
#pragma once
#include noncopyable.h
#include Timestamp.h
#include CurrentThread.h
#include LogStream.h#include functional
#include vector
#include atomic
#include memory
#include mutex
#include sys/types.hclass Channel;
class Poller;/*** 事件循环类 两大模型Channel Poller* mainLoop只负责处理IO并返回client的fd* subLoop负责监听poll并处理相应的回调* 两者之间通过weakupfd进行通信
*/
class EventLoop : noncopyable
{
public:using Functor std::functionvoid();EventLoop();~EventLoop();// 开启loopvoid loop();// 退出loopvoid quit();Timestamp pollReturnTime() const { return pollReturnTime_; }// 在当前loop执行cbvoid runInLoop(Functor cb);// 把cb放入队列唤醒subloop所在的线程执行cbvoid queueInLoop(Functor cb);size_t queueSize() const;// 唤醒loop所在的线程EventLoop::queueInLoop中调用void wakeup();// EventLoop方法 》 Poller的方法void updateChannel(Channel *channel);void removeChannel(Channel *channel);bool hasChannel(Channel *channel);// 判断EventLoop对象是否在自己的线程中bool isInLoopThread() const {return threadId_ CurrentThread::tid();}private:// waked up后的一个操作 void handleRead(); // 执行回调void doPendingFunctors(); using ChannelList std::vectorChannel *;std::atomic_bool looping_; // 原子操作通过CAS实现std::atomic_bool quit_; // 标识退出loop循环const pid_t threadId_; // 记录当前loop所属的线程idTimestamp pollReturnTime_; // poller返回发生事件的channels的时间点std::unique_ptrPoller poller_;int wakeupFd_; // 当mainLoop获取一个新用户的channel通过轮询算法选择一个subloop通过该成员唤醒subloop处理channel。使用eventfd// unlike in TimerQueue, which is an internal class,// we dont expose Channel to client.std::unique_ptrChannel wakeupChannel_;// scratch variablesChannelList activeChannels_;std::atomic_bool callingPendingFunctors_; // 标识当前loop是否有需要执行的回调操作正在执行则为truestd::vectorFunctor pendingFunctors_; // 存储loop需要执行的所有回调操作std::mutex mutex_; // 保护pendingFunctors_线程安全
};
EventLoop.cc
#include EventLoop.h
#include LogStream.h
#include Poller.h
#include Channel.h#include sys/eventfd.h
#include unistd.h
#include fcntl.h
#include errno.h
#include iostream// 防止一个线程创建多个EventLoop threadLocal
__thread EventLoop *t_loopInThisThread nullptr;// 定义Poller超时时间
const int kPollTimeMs 10000;// 创建weakupfd用来notify唤醒subReactor处理新来的channel
int createEventfd()
{int evtfd ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if (evtfd 0){LOG_FATAL Failed in eventfd errno;}return evtfd;
}EventLoop::EventLoop(): looping_(false),quit_(false),callingPendingFunctors_(false),threadId_(CurrentThread::tid()),poller_(Poller::newDefaultPoller(this)),wakeupFd_(createEventfd()),wakeupChannel_(new Channel(this, wakeupFd_))
{LOG_DEBUG EventLoop created this in thread threadId_;if (t_loopInThisThread){LOG_FATAL Another EventLoop t_loopInThisThread exists in this thread threadId_;}else{t_loopInThisThread this;}// 设置weakupfd的事件类型以及发生事件后的回调操作wakeupChannel_-setReadCallback(std::bind(EventLoop::handleRead, this));// we are always reading the wakeupfd// 每一个EventLoop都将监听weakupChannel的EPOLLIN读事件了// 作用是subloop在阻塞时能够被mainLoop通过weakupfd唤醒wakeupChannel_-enableReading();
}EventLoop::~EventLoop()
{LOG_DEBUG EventLoop this of thread threadId_ destructs in thread CurrentThread::tid();wakeupChannel_-disableAll();wakeupChannel_-remove();::close(wakeupFd_);t_loopInThisThread NULL;
}void EventLoop::handleRead()
{uint64_t one 1;ssize_t n read(wakeupFd_, one, sizeof one);if (n ! sizeof one){LOG_ERROR EventLoop::handleRead() reads n bytes instead of 8;}
}void EventLoop::loop()
{looping_ true;quit_ false;LOG_INFO EventLoop this start looping;while (!quit_){activeChannels_.clear();// 当前EventLoop的Poll监听两类fdclient的fd正常通信的在baseloop中和 weakupfdmainLoop 和 subLoop 通信用来唤醒sub的pollReturnTime_ poller_-poll(kPollTimeMs, activeChannels_);for (Channel *channel : activeChannels_){// Poller监听哪些channel发生事件了然后上报给EventLoop通知channel处理相应的事件channel-handleEvent(pollReturnTime_);}// 执行当前EventLoop事件循环需要处理的回调操作/*** IO线程 mainLoop 只 accept 然后返回client通信用的fd 用channel打包 并分发给 subloop* mainLoop事先注册一个回调cb需要subLoop来执行weakup subloop后* 执行下面的方法执行之前mainLoop注册的cb操作一个或多个*/doPendingFunctors();}LOG_INFO EventLoop this stop looping;looping_ false;
}/*** 退出事件循环* 1、loop在自己的线程中 调用quit此时肯定没有阻塞在poll中* 2、在其他线程中调用quit如在subloopwoker中调用mainLoopIO的qiut** mainLoop* * Muduo库没有 生产者-消费者线程安全的队列 存储Channel* 直接使用wakeupfd进行线程间的唤醒 ** subLoop1 subLoop2 subLoop3*/
void EventLoop::quit()
{quit_ true;// 2中此时若当前woker线程不等于mainLoop线程将本线程在poll中唤醒if (!isInLoopThread()){wakeup();}
}void EventLoop::runInLoop(Functor cb)
{// LOG_DEBUGEventLoop::runInLoop cb: (cb ! 0);if (isInLoopThread()) // 产生段错误{ // 在当前loop线程中 执行cbLOG_DEBUG 在当前loop线程中 执行cb;cb();}else{ // 在其他loop线程执行cb需要唤醒其loop所在线程执行cbLOG_DEBUG 在其他loop线程执行cb需要唤醒其loop所在线程执行cb;queueInLoop(cb);}
}void EventLoop::queueInLoop(Functor cb)
{{std::unique_lockstd::mutex ulock(mutex_);pendingFunctors_.emplace_back(cb);}// 唤醒相应的需要执行上面回调操作的loop线程// 若当前线程正在执行回调doPendingFunctors但是又有了新的回调cb// 防止执行完回调后又阻塞在poll上无法执行新cb所以预先wakeup写入一个数据if (!isInLoopThread() || callingPendingFunctors_) {wakeup(); // 唤醒loop所在线程}
}// 用来唤醒loop所在的线程向wakeupfd写一个数据wakeupChannel就发生读事件当前loop线程就会被唤醒
void EventLoop::wakeup()
{uint64_t one 1;ssize_t n ::write(wakeupFd_, one, sizeof one);if (n ! sizeof one){LOG_ERROR EventLoop::wakeup() writes n bytes instead of 8;}
}void EventLoop::updateChannel(Channel *channel)
{// channel是发起方通过loop调用pollpoller_-updateChannel(channel);
}void EventLoop::removeChannel(Channel *channel)
{// channel是发起方通过loop调用pollpoller_-removeChannel(channel);
}bool EventLoop::hasChannel(Channel *channel)
{return poller_-hasChannel(channel);
}// 执行回调由TcpServer提供的回调函数
void EventLoop::doPendingFunctors()
{std::vectorFunctor functors;callingPendingFunctors_ true; // 正在执行回调操作{ // 使用swap将原pendingFunctors_置空并且释放其他线程不会因为pendingFunctors_阻塞std::unique_lockstd::mutex lock(mutex_);functors.swap(pendingFunctors_);}for (const Functor functor : functors){functor(); // 执行当前loop需要的回调操作}callingPendingFunctors_ false;
}