标准网站建设推荐,公司注册的流程与步骤,深圳龙岗做网站的公司,制作网页网站的软件是文章目录 1 功能简介线程池的初始化线程池执行流程 2 线程池类的设计线程类XThreadXThread.hXThread.cpp 线程池类XThreadPoolXThreadPool.hXThreadPool.cpp 任务基类taskXTask.h 3 自定义任务的例子自定义任务类ServerCMDServerCMD.hServerCMD.cpp 测试程序运行效果 1 功能简介… 文章目录 1 功能简介线程池的初始化线程池执行流程 2 线程池类的设计线程类XThreadXThread.hXThread.cpp 线程池类XThreadPoolXThreadPool.hXThreadPool.cpp 任务基类taskXTask.h 3 自定义任务的例子自定义任务类ServerCMDServerCMD.hServerCMD.cpp 测试程序运行效果 1 功能简介
本文利用libevent实现一个C线程池可自定义用户任务类继承于任务task基类重写任务基类的纯虚函数实现多态。比如将定义定义处理客户端的请求任务类实现对客户端请求的并发处理。 工作队列可以理解为线程的队列一个线程同时可以处理一个任务空闲的线程回从任务队列取出任务执行。当工作队列空时线程会睡眠。 任务队列用户将任务加入任务队列然后通知工作队列取出一个任务到线程中执行。
线程池的初始化 线程池执行流程 2 线程池类的设计
线程类XThread
线程类的接口功能
Start() - 管道可读就激活线程设置管道属性进入事件循环等待管道可读激活线程执行任务
Setup() - 设置管道属性将管道读事件绑定到event_base中等待触发调用回调
Main() - 此函数只进入事件循环等待事件循环退出Notify() - 读取管道数据从当前线程对象的任务队列中取出任务执行任务
AddTask() - 将任务对象加入线程对象的任务队列将线程的事件处理器base保存到任务对象中
Activate() - 通过管道发送启动标志来激活线程发送一个字符c激活相当于加入一个任务对象到当前线程的任务队列通过Notify()处理。调用多次Activate表示加入多个任务任务顺序被执行。XThread.h
#pragma once
#include vector/*线程类声明*/
class XThread;/*任务类声明*/
class XTask;/*线程池类*/
class XThreadPool
{
public://单例模式创建返回唯一对象static XThreadPool* GetInstance();//初始化所有线程并启动线程void Init(int threadCount);//分发线程void Dispatch(XTask* task);private://将构造函数的访问属性设置为 private//将构造函数构造声明成私有不使用//声明成私有不使用XThreadPool(){} //无参构造XThreadPool(const XThreadPool); //拷贝构造XThreadPool operator (const XThreadPool); //赋值运算符重载//线程数量int threadCount 0;//用来标记下一个使用的线程号int lastThread -1;//线程对象数组std::vectorXThread * threads;//线程池对象static XThreadPool* pInstance;
};XThread.cpp
#include XThread.h
#include XTask.h
#include thread
#include iostream
#include event2/event.h
#include unistd.husing namespace std;XThread::XThread()
{}XThread::~XThread()
{}//sock 文件描述符which 事件类型 arg传递的参数
/*
* 函数名: NotifyCB
* 作用: 管道可读事件触发回调函数
*/
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{XThread *th (XThread*)arg;th-Notify(fd, which);
}/*
* 函数名: XThread::Start
* 作用: 启动线程
* 解释: 管道可读就激活线程设置管道属性进入事件循环等待管道可读激活线程执行任务。
*/
void XThread::Start()
{//安装线程初始化event_base和管道监听事件用于激活Setup();//启动线程thread th(XThread::Main, this);//线程分离th.detach();
}/*
* 函数名: XThread::Main
* 作用: 线程入口函数
* 解释: 此函数只进入事件循环等待事件循环退出
*/
void XThread::Main()
{cout id XThread::Main() begin endl;event_base_dispatch(base); //进入事件循环event_base_free(base);cout id XThread::Main() end endl;
}/*
* 函数名: XThread::Setup
* 作用: 安装线程
* 解释: 设置管道属性将管道读事件绑定到event_base中等待触发调用回调
*/
bool XThread::Setup()
{//windows用配对socket linux用管道//创建的管道int fds[2];if(pipe(fds)){cerr pipe failed! endl;return false; }//读取绑定到event事件中写入要保存//保存管道的写fdnotify_send_fd fds[1];//创建一个新的事件处理器对象this-base event_base_new();//创建一个新的事件对象//添加管道监听事件读fd用于激活线程执行任务event *ev event_new(base, fds[0], EV_READ|EV_PERSIST, NotifyCB, this);//将事件对象struct event添加到指定的事件处理器event_base中event_add(ev, 0);return true;
}/*
* 函数名: XThread::Notify
* 作用: 线程激活执行任务
* 解释: 读取管道数据从当前线程对象的任务队列中取出任务执行任务
*/
void XThread::Notify(evutil_socket_t fd, short which)
{//水平触发 只要没有接受完成会再次进来char buf[2] {0};int len read(fd, buf, 1);if (len 0)return;cout id thread buf endl;//获取任务并初始化任务XTask* task NULL;tasks_mutex.lock();if(tasks.empty()){ //队列为空tasks_mutex.unlock();return;}task tasks.front(); //先进先出tasks.pop_front();tasks_mutex.unlock();task-Init();
}/*
* 函数名: XThread::Activate
* 作用: 激活线程
* 解释: 通过管道发送启动标志来激活线程发送一个字符c激活相当于加入一个任务对象到当前线程的任务队列通过Notify()处理。
* 调用多次Activate表示加入多个任务任务顺序被执行。
*/
void XThread::Activate()
{char act[10] {0};int len write(this-notify_send_fd, c, 1);if (len 0){cerr XThread::Activate() failed! endl;}cout currect thread: id , notify_send_fd: this-notify_send_fd endl;
}/*
* 函数名: XThread::AddTask
* 作用: 将任务对象加入线程对象的任务队列将线程的事件处理器base保存到任务对象中
*/
void XThread::AddTask(XTask* task)
{if(!task)return;task-base this-base;tasks_mutex.lock();tasks.push_back(task);tasks_mutex.unlock();
}线程池类XThreadPool
线程类的接口功能
GetInstance() - 单例模式创建返回唯一对象
Init() - 创建指定数量线程对象启动线程并把线程对象加入到线程池的线程对象数组
Dispatch() - 从线程对象数组取出线程对象并把任务加入线程对象的任务队列中激活该线程执行任务XThreadPool.h
#pragma once
#include vector/*线程类声明*/
class XThread;/*任务类声明*/
class XTask;/*线程池类*/
class XThreadPool
{
public://单例模式创建返回唯一对象static XThreadPool* GetInstance();//初始化所有线程并启动线程void Init(int threadCount);//分发线程void Dispatch(XTask* task);private://将构造函数的访问属性设置为 private//将构造函数构造声明成私有不使用//声明成私有不使用XThreadPool(){} //无参构造XThreadPool(const XThreadPool); //拷贝构造XThreadPool operator (const XThreadPool); //赋值运算符重载//线程数量int threadCount 0;//用来标记下一个使用的线程号int lastThread -1;//线程对象数组std::vectorXThread * threads;//线程池对象static XThreadPool* pInstance;
};XThreadPool.cpp
#include XThreadPool.h
#include XThread.h
#include thread
#include iostream
//#include chronousing namespace std;//静态成员变量类外初始化
XThreadPool* XThreadPool::pInstance NULL;/*
* 函数名: XThreadPool::GetInstance
* 作用: 单例模式创建返回唯一对象
*/
XThreadPool* XThreadPool::GetInstance()
{//当需要使用对象时访问instance 的值//空值:创建对象并用instance 标记//非空值: 返回instance 标记的对象if( pInstance NULL ){pInstance new XThreadPool();}return pInstance;
}/*
* 函数名: XThreadPool::Init
* 作用: 初始化所有线程并启动线程
* 解释: 创建指定数量线程对象启动线程并把线程对象加入到线程池的线程对象数组
*/
void XThreadPool::Init(int threadCount)
{this-threadCount threadCount;this-lastThread -1;for (int i 0; i threadCount; i){XThread *t new XThread();t-id i 1;cout Create thread i endl;//启动线程t-Start();threads.push_back(t);this_thread::sleep_for(std::chrono::microseconds(10));}
}/*
* 函数名: XThreadPool::Dispatch
* 作用: 分发线程
* 解释: 从线程对象数组取出线程对象并把任务加入线程对象的任务队列中激活该线程执行任务。
*/
void XThreadPool::Dispatch(XTask* task)
{//轮询if(!task)return;int tid (lastThread 1) % threadCount;lastThread tid;cout lastThread: lastThread endl;XThread *XTh threads[tid];//添加任务XTh-AddTask(task);//线程激活XTh-Activate();
}任务基类task
XTask.h
#pragma once
#include iostreamclass XTask
{
public://事件处理器对象struct event_base* base NULL;//客户端连接的socketint sock 0;//初始化任务 纯虚函数virtual bool Init() 0;
};3 自定义任务的例子
自定义任务类ServerCMD
线程类的接口功能
Init() - 初始化任务注册当前socket的读事件和超时事件绑定回调函数
ReadCB() - 读事件回调函数
EventCB() - 客户端超时未发请求断开连接退出任务ServerCMD.h
#pragma once#include XTask.hclass XFtpServerCMD : public XTask
{
public://初始化任务virtual bool Init();XFtpServerCMD();~XFtpServerCMD();
};ServerCMD.cpp
#include XFtpServerCMD.h
#include event2/event.h
#include event2/bufferevent.h
#include iostream
#include string.husing namespace std;/*
* 函数名: EventCB
* 作用: 超时事件回调函数
* 解释: 客户端超时未发请求断开连接退出任务
*/
void EventCB(struct bufferevent *bev, short what, void *arg)
{XFtpServerCMD* cmd (XFtpServerCMD*)arg;//如果对方网络断掉或者机器死机有可能收不到BEV_EVENT_EOF数据if(what (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)){cout BEV_EVENT_EOF | BEV_EVENT_ERROR |BEV_EVENT_TIMEOUT endl;bufferevent_free(bev);delete cmd;}
}/*
* 函数名: ReadCB
* 作用: 读事件回调函数
*/
void ReadCB(struct bufferevent *bev, void *arg)
{XFtpServerCMD* cmd (XFtpServerCMD*)arg;char data[1024] {0};for (;;){int len bufferevent_read(bev, data, sizeof(data)-1);if(len 0)break;data[len] \0;cout data endl flush;//测试代码要清理掉if(strstr(data, quit)){bufferevent_free(bev);delete cmd;break;}}
}/*
* 函数名: XFtpServerCMD::Init
* 作用: 初始化任务
* 解释: 初始化任务注册当前socket的读事件和超时事件绑定回调函数。
*/
bool XFtpServerCMD::Init()
{cout XFtpServerCMD::Init() sock: sock endl;//监听socket bufferevent// base socketbufferevent* bev bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);bufferevent_setcb(bev, ReadCB, 0 ,EventCB, this);bufferevent_enable(bev, EV_READ | EV_WRITE);//添加超时timeval rt {10, 0}; //10秒bufferevent_set_timeouts(bev, rt, 0); //设置读超时回调函数return true;
}XFtpServerCMD::XFtpServerCMD()
{}XFtpServerCMD::~XFtpServerCMD()
{}测试程序
#include event2/event.h
#include event2/listener.h
#include string.h
#include XThreadPool.h
#include signal.h
#include iostream#include XFtpServerCMD.husing namespace std;
#define SPORT 5001/*
* 函数名: listen_cb
* 作用: 接收到连接的回调函数
* 解释: 通过多态来创建任务对象将当前socket保存到任务对象中分发任务执行
*/
void listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{cout listen_cb endl;XTask* task new XFtpServerCMD();task-sock s;XThreadPool::GetInstance()-Dispatch(task);
}int main()
{//忽略管道信号发送数据给已关闭的socketif (signal(SIGPIPE, SIG_IGN) SIG_ERR)return 1;//1 初始化线程池XThreadPool::GetInstance()-Init(5);std::cout test thread pool!\n; //创建libevent的上下文event_base* base event_base_new();if (base){cout event_base_new success! endl;}//监听端口//socket bindlisten 绑定事件sockaddr_in sin;memset(sin, 0, sizeof(sin));sin.sin_family AF_INET;sin.sin_port htons(SPORT);evconnlistener* ev evconnlistener_new_bind(base, // libevent的上下文listen_cb, //接收到连接的回调函数base, //回调函数获取的参数 argLEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, //地址重用evconnlistener关闭同时关闭socket10, //连接队列大小对应listen函数(sockaddr*)sin, //绑定的地址和端口sizeof(sin));//事件分发处理if(base)event_base_dispatch(base);if(ev)evconnlistener_free(ev);if(base)event_base_free(base);return 0;
}运行效果
初始化线程池创建5个线程通过telnet和网络调试软件模拟客户端的接入客户端发送信息服务器打印出来当客户端超时未发请求断开连接退出任务。