艺术学院网站模板,家里做网站买什么服务器好,怎么设置网站字体,wordpress小工具功能线程详解第四篇 前言正式开始信号量引例信号量的本质信号量相关的四个核心接口生产消费者模型用环形队列实现生产者消费者模型基于环形队列的生产消费模型的原理代码演示单生产者单消费者多生产者多消费者 计数器的意义 线程池基本概念代码 单例模式STL,智能指针和线程安全STL中… 线程详解第四篇 前言正式开始信号量引例信号量的本质信号量相关的四个核心接口生产消费者模型用环形队列实现生产者消费者模型基于环形队列的生产消费模型的原理代码演示单生产者单消费者多生产者多消费者 计数器的意义 线程池基本概念代码 单例模式STL,智能指针和线程安全STL中的容器是否是线程安全的?智能指针是否是线程安全的? 其他常见的各种锁读者写者问题读写锁接口介绍 前言
线程详解前三篇
【Linux】详解线程第一篇——由单线程到多线程的转变【Linux】详解线程第二篇——用黄牛抢陈奕迅演唱会门票的例子来讲解【 线程互斥与锁 】【Linux】详解线程第三篇——线程同步和生产消费者模型
本篇主要讲解信号量的概念以及对这些概念进行代码演示对于线程池提供代码和详细注释单例模式结合前面C中的博客来讲解以及一些线程部分的收尾知识。
正式开始
信号量
说说共享资源我前面的博客讲临界资源的时候是按照任意时刻临界资源都只有一个执行流在进行访问的但讲到信号量这里要变一变。
如果单纯的把某一块临界资源看作是一个整体的话且线程之间都是互斥的看图
但是如果不把一块共享资源当做一个整体而是让不同的执行流去访问临界资源中的不同区域的话就可以继续让多线程并发执行了而当只有访问其中的小块临界资源的时候才会进行线程同步和互斥看图
这样多个线程的执行效率就会比上一个高不少。当某一小块被多个线程需要时此时再加对需要这一小块的线程进行加锁互斥就行。
但是有四个问题
如何确定一整个资源被分成了多少小块的资源如何确定当申请了m个小块资源后还剩多少个资源如何保证某块小资源是给哪个线程的是否能确定某个线程一定会有一块小资源
可能屏幕前的你看到这四个问题已经懵了不要懵我来给你讲。
引例
来用一个生活中的例子讲讲信号量。 我们看电影前都要买票而买票的就是对座位资源的预定机制。把某一个放映厅中的所有座位看作是整块资源其中的每一个座位当做是一小块资源。其中座位可以分为有效座位和无效座位有效座位是指某一座位还未被预定无效座位就是指这个座位被其他人预定了。 当买一张票票上会有你的座位信息那么该座位就你被提前预定了此时这个座位就无法被其他人预定也就从有效座位变为无效座位但有效座位数是定的也就是说总票数是定的那么买一张票就会让总票数减一也就是有效座位数减一。 当电影看完后走一个人其对应的座位就不属于这个人了但座位还是在的也就是说该座位由无效座位变回了有效座位人不断走那么有效的座位数就会不断加一直到恢复到总的座位数。同样下一场电影的票数还是总座位数。又是一个循环。
上面有两点比较重要就是票数和座位数。这两个其实可以看作是一个东西但是座位数可以分为有效作为和无效座位假如总的座位数是100那么两种座位的范围就是 [0 ~ 100] 和 [100 ~ 0]一个增另一个就会减。
例子就讲到这其实已经把信号量的核心思想讲出来了就两个字预定。
信号量的本质
信号量英文semaphore下面一些关于信号量的东西我就以sem来代替了。
信号量的本质其实就是一个计数器sem当某一线程想要访问临界资源时必须先申请信号量资源此时就是对sem减一也就是预定信号量资源在信号量这里有专门的术语即P操作当使用完毕信号量资源后就要让sem加一也就是释放信号量资源专业术语即V操作。这里的PV操作是具有原子性的是库中保证的不像自增 / 自减操作没有原子性。
对应到上面的买电影票可以用semValidSet来表示有效座位数那么semValidSet的起始值就是100还可以用semInvalidSet来表示无效的座位数那么semInvalidSet的起始值就是0。当有人买票了之后就会使得semValidSet减一也就变成了99对应的semInvalidSet加一也就变成了1以此类推直到semValidSet变为0semInvalidSet变为100。然后再返回来下一个循环。
再专业点来说当申请了一个信号量后当前的执行流就一定具有了某一小块资源此时如何使用这块资源就要看执行流本身了。具体会分配到哪块资源需要结合场景来看。
信号量相关的四个核心接口
最核心的接口就四个也是等会要用的头文件都是semaphore.h。
信号量类型为sem_t定义一个信号量就和定义一个int变量一样。
sem_init 这个函数就是给信号量做初始化工作。 第一个参数sem就是你要初始化的信号量的地址。 第二个参数pshared是用来说明你当前是的信号量是让进程用的还是让线程用的如果给零就是让线程用的如果给非零值就是让进程用的。 man手册中是这么说的 The pshared argument indicates whether this semaphore is to be shared between the threads of a process, or between processes.If pshared has the value 0, then the semaphore is shared between the threads of a process, and should be located at some address that is visible to all threads (e.g., a global variable, or a variable allocated dynamically on the heap).If pshared is nonzero, then the semaphore is shared between processes, and should be located in a region of shared memory (see shm_open(3), mmap(2), and shmget(2)). (Since a child created by fork(2) inherits its parent’s memory mappings, it can also access the semaphore.) Any process that can access the shared memory region can operate on the semaphore using sem_post(3), sem_wait(3), etc. 这里讲的是线程就直接给零了。 刚刚讲的信号量是一个计数器是有初始值的这里第三个参数value就是计数器的初始值由我们自己来设定。 函数的返回值成功返回0失败返回-1并设置错误码。 sem_destroy 这个函数就是销毁其参数指向的信号量。没啥讲的。
sem_wait
这个函数就是前面说的P操作就是申请信号量让sem指向的信号量减一。
不过下面还有两个函数我直接把man手册中的解释给出来吧 sem_trywait() is the same as sem_wait(), except that if the decrement cannot be immediately performed, then call returns an error (errno set to EAGAIN) instead of blocking. sem_timedwait() is the same as sem_wait(), except that abs_timeout specifies a limit on the amount of time that the call should block if the decrement cannot be immediately performed. The abs_timeout argument points to a structure that specifies an absolute timeout in seconds and nanoseconds since the Epoch, 1970-01-01 00:00:00 0000 (UTC). This structure is defined as follows: struct timespec {time_t tv_sec; /* Seconds */long tv_nsec; /* Nanoseconds [0 .. 999999999] */};If the timeout has already expired by the time of the call, and the semaphore could not be locked immediately, then sem_timedwait() fails with a timeout error (errno set to ETIMEDOUT). If the operation can be performed immediately, then sem_timedwait() never fails with a timeout error, regardless of the value of abs_timeout. Furthermore, the validity of abs_timeout is not checked in this case. sem_post 这个函数就是V操作也就是对信号量资源的释放让sem指向的信号量加一。
核心的就是这四个init、destroy、wait、post。
生产消费者模型
我上一篇已经细讲过生产消费者模型了如果屏幕前的你不懂可以看看我上一篇博客也就是线程详解第三篇【Linux】详解线程第三篇——线程同步和生产消费者模型。
我等会就要用到其中的一些术语来讲某些东西了。
用环形队列实现生产者消费者模型
看到这的同学我就默认你懂生消模型了。如果屏幕前的你不懂那我建议你还是回头看一下那篇博客。因为等会的讲解中要用到生消模型中的很多概念。
先来说说环形队列逻辑结构长这样
其实我前面讲数据结构中有一篇博客对于模拟实现了环形队列我这里就不再讲的那么细了如果你想了深入了解一下可以看看这篇【C】循环队列力扣622
上面那张图给出的是逻辑结构而真实的物理结构可以用顺序表也可用链表还可用别的就不说那么多了这里我就直接用顺序表了上面给的链接中用的是定长的顺序表这里也就用顺序表来表示了只不过我直接用STL中的vector了。就不自己模拟实现一个了。
物理结构
上面的逻辑结构和物理结构都是空出来一个位置方便判断队列是否已满(tail 1) % N 如果等于front就是满了不等于就是没满。
但是今天我们用的信号量可以不需要空出来这一个位置。
基于环形队列的生产消费模型的原理
队列还是那个队列不过这次是一个生产者和一个消费者在用这个队列。
先来个生产者不断生产的大致流程 各位知道为什么最后可以让生产者和消费者指向同一块空间吗 我先不说为啥先来讲点概念。
生产者最关心的是空间资源因为没有空间就无法生产数据当生产数据的同时空间也在减小。 消费者最关心的是数据资源因为没有数据就无法消费数据当消费数据的同时空间也在增加。
而上面图中每个空格子就是空间其中的数字就是数据。
我们可以用一个计数器spaceSem来表示空间个数用一个计数器dataSem来表示数据个数。 这里队列的总大小是8那么spaceSem的初始值就是8dataSem的初始值就是0。
生产者生产时先申请空间资源申请不到了就会阻塞这是sem_wait本身所能保证的下面我就用P来表示sem_wait用V来表示sem_post了当生产者线程申请到了信号量时spaceSem就会减一而减的这个一就是一个空格子这个空格子就会被这个生产者线程所拥有在这个空格子生产完后这个空格子就不为空了也就是说这个空格子被数据占有了对应到上面买票的那个例子就可以说空格子被数据占有了后就变成了无效空格对于生产者而言的无效空格。
P操作后要紧跟着V操作但是这里不能对spaceSem进行V操作因为V操作会使得spaceSem加一而空格子数是减少了的显然是错误的。但是数据个数增加了所以说要对dataSem进行V操作。
看图
这就能对上了空格子少一个数据就多一个。不过也不能完全说空格子因为会出现消费者消费的情况有数据的格子被消费过后就不能再次被消费了也就成了“空格子”不过说成有效格子更好。来一个消费者消费的图 再来看看队列数据装满后是啥样子的
此时spaceSem为0这样生产者在申请信号量的时候就会直接阻塞因为spaceSem为0无法继续减一操作。所以此时一定会让消费者线程执行消费。
再来看看空
此时dataSem为0这样消费者在申请信号量的时候就会直接阻塞因为dataSem为0无法继续减一操作。所以此时一定会让生产者线程执行生产。
故生产者消费者初始情况下的执行先后顺序不必关心在逻辑上一定能让生产者先执行。因为若消费者先执行那么就是上面这张图的情况。生产者先执行那就直接生产。
代码演示
单生产者单消费者
先来说单生产者和单消费者的大致思路 定义一个类类中成员变量有一个vector一个capacity表示环形队列的大小一个消费者位置的下标c_index一个生产者位置的下标p_index一个spaceSem信号量一个dataSem信号量。大概就这么些写出来大概就是这样
先对信号量的几个操作封装一下
// 对信号量的四个核心操作进行封装
class Sem
{
public:// 构造的时候就初始化信号量Sem(int val){sem_init(_sem, 0, val);}// P操作void P(){sem_wait(_sem);}// V操作void V(){sem_post(_sem);}// 析构的时候就销毁掉信号量~Sem(){sem_destroy(_sem);}private:sem_t _sem;
};然后定义循环队列
// 默认情况下的队列大小
const int DEFAULT_CAPACITY 5;templateclass T
class RingQueue
{
public:RingQueue(int capacity DEFAULT_CAPACITY): _rq(capacity) // 给顺序表开空间, _capacity(capacity) // 设置容量大小, _proIndex(0) // 生产者下标, _conIndex(0) // 消费者下标, _spaceSem(capacity) // 空间信号量初始值为就是队列大小, _dataSem(0) // 数据信号量初始值为0{}private:// 物理结构的循环队列std::vectorT _rq;// 队列大小int _capacity;// 生产者下标int _proIndex;// 消费者下标int _conIndex;// 生产者所需信号量Sem _spaceSem;// 消费者所需信号量Sem _dataSem;
};在往队列中Push的时候先让生产者申请spaceSem信号量也就是让spaceSem进行P操作然后往p_index处生产数据然后再让p_index % capacity循环队列若越界了%一下就回到开头了最后让dataSem进行V操作。
队列Pop和Push同理就不说了。
代码如下放在RingQueue中
// 生产者生产
void Push(const T data)
{// 生产前先申请信号量spaceSem减少_spaceSem.P();// 申请完信号量后就进行生产_rq[_proIndex] data;_proIndex % _capacity;// spaceSem减少dataSem增加 _dataSem.V();
}// 消费者消费
void Pop(T data)
{// 消费前先申请信号量dataSem减少_dataSem.P();// 申请完信号量后就进行消费data _rq[_conIndex];_conIndex % _capacity;// dataSem减少spaceSem增加 _spaceSem.V();
}然后就是创建生产者线程和消费者线程
void* Consumer(void* args)
{ /*这里用到了reinterpret_cast强转不懂的同学可以点我下面给的博客*/RingQueueint* rq reinterpret_castRingQueueint*(args);// 消费者消费while(1){int data;rq-Pop(data);printf([ %lu ] consumer get data ::%d\n\n, pthread_self(), data);}
}void* Productor(void* args)
{RingQueueint* rq reinterpret_castRingQueueint*(args);int data 0;// 生产者生产while(1){rq-Push(data);printf([ %lu ] productor send data ::%d\n, pthread_self(), data);data;}
}int main()
{RingQueueint* rq new RingQueueint(10);// 创建生产者消费者线程pthread_t c, p; /*这里用到了reinterpret_cast强转不懂的同学可以点我下面给的博客*/pthread_create(c, nullptr, Consumer, reinterpret_castvoid*(rq));pthread_create(p, nullptr, Productor, reinterpret_castvoid*(rq));// 等待两个线程pthread_join(c, nullptr);pthread_join(p, nullptr);// delete掉rq防止内存泄漏delete rq;return 0;
}不太懂reinterpret_cast的同学可以看我这篇博客【C】类型转换。
不过上面没有进行控制打印会特别快可以给生产者或消费者加sleep来控制一下。
给生产者加sleep(1)
这里消费者更快所以生产者生产一个就被消费者拿走一个。 运行 给消费者加sleep(1)
这里生产者更快所以会先将队列生产满然后消费者一秒消费一个生产者跟着消费者的节奏就会一秒生产一个。 运行 多生产者多消费者
上方的代码演示是单生产者和单消费者的生生、消消、生消三种关系中只有生消的互斥和同步关系如果有听不懂的同学详看线程详解第三篇而且信号量本身的性质就可以满足生消的同步和互斥的关系。 互斥体现在生产者和消费者同时指向同一块空间时要么为空要么为满。当为空时一定是生产者先当为满时一定是消费者先。 同步体现在生产者生产后会是dataSem加一消费者消费后会使spaceSem加一只要一方执行其动作另一方就能有新的资源。
但是此时多生产和多消费就会多出两种关系一种是生生间的互斥一种是生消间的互斥。信号量无法保证者两种关系那么就得要用互斥锁来保证这两种互斥关系。 用一把锁可以吗 答案是不可以因为一把锁会使得将整个队列看作一个整体也就是说一次只能让生产者 / 消费者中的一个角色来对循环队列进行操作但我们这里想让生产者和消费者同时都能执行故可以用两把锁一把锁锁住所有的生产者一把锁锁住所有的消费者把生产者和消费者分来锁。 一把锁会造成这样的情况
但是两把锁可以让生产和消费同时进行
这样在进行Push操作的时候就会使得所有的生产者之间互斥也就是说一次只能有一个生产者push而其他消费者还能进行Pop。Pop操作的时候就会使得所有的消费者之间互斥也就是说一次只能有一个消费者Pop而其他生产者者还能进行Push。
不会出现当生产者和消费者指向不同位置的时候二者只能有一种角色进行操作。即二者指向不同位置的时候就不会造成生消互斥生产者可以生产消费者还可消费同时进行互不影响。
故可在RingQueue中加两把锁一把锁锁生产者一把锁锁消费者在Push的时候用生产者的锁在Pop的时候用消费者的锁。
下面的代码基于上面单生产和单消费来修改 循环队列代码信号量封装的Sem没变 const int DEFAULT_CAPACITY 5;templateclass T
class RingQueue
{
public:RingQueue(int capacity DEFAULT_CAPACITY): _rq(capacity) // 给顺序表开空间, _capacity(capacity) // 设置容量大小, _proIndex(0) // 生产者下标, _conIndex(0) // 消费者下标, _spaceSem(capacity) // 空间信号量初始值为就是队列大小, _dataSem(0) // 数据信号量初始值为0{// 构造的时候对两把锁进行初始化pthread_mutex_init(_proMtx, nullptr);pthread_mutex_init(_conMtx, nullptr);}// 生产者生产void Push(const T data){// 生产前先申请信号量spaceSem减少_spaceSem.P();// 多个生产者一同Push的时候要上锁让一个生产者生产pthread_mutex_lock(_proMtx);// 申请完信号量后就进行生产_rq[_proIndex] data;_proIndex % _capacity;// 单个生产者生产完毕后解锁pthread_mutex_unlock(_proMtx); // spaceSem减少dataSem增加 _dataSem.V();}// 消费者消费void Pop(T data){// 消费前先申请信号量dataSem减少_dataSem.P();// 多个消费者一同Pop的时候要上锁让一个消费者消费pthread_mutex_lock(_conMtx);// 申请完信号量后就进行消费data _rq[_conIndex];_conIndex % _capacity;// 单个消费者消费完毕解锁pthread_mutex_unlock(_conMtx);// dataSem减少spaceSem增加 _spaceSem.V();}~RingQueue(){// 析构的时候记得释放锁pthread_mutex_destroy(_proMtx);pthread_mutex_destroy(_conMtx);}private:// 物理结构的循环队列std::vectorT _rq;// 队列大小int _capacity;// 生产者下标int _proIndex;// 消费者下标int _conIndex;// 生产者所需信号量Sem _spaceSem;// 消费者所需信号量Sem _dataSem;// 所有生产者的锁pthread_mutex_t _proMtx;// 所有消费者的锁pthread_mutex_t _conMtx;
};创建多线程 const int CONSUMER_NUM 4;
const int PRODUCTOR_NUM 2;void* Consumer(void* args)
{ /*这里用到了reinterpret_cast强转不懂的同学可以点我下面给的博客*/RingQueueint* rq reinterpret_castRingQueueint*(args);// 消费者消费while(1){sleep(1);int data;rq-Pop(data);printf([ %lu ] consumer get data ::%d\n, pthread_self(), data);}
}void* Productor(void* args)
{RingQueueint* rq reinterpret_castRingQueueint*(args);int data 0;// 生产者生产while(1){rq-Push(data);printf([ %lu ] productor send data ::%d\n, pthread_self(), data);data;}
}int main()
{RingQueueint* rq new RingQueueint();// 创建生产者消费者线程pthread_t c[CONSUMER_NUM];pthread_t p[PRODUCTOR_NUM]; for(int i 0; i CONSUMER_NUM; i){pthread_create(c i, nullptr, Consumer, reinterpret_castvoid*(rq));}for(int i 0; i PRODUCTOR_NUM; i){pthread_create(p i, nullptr, Productor, reinterpret_castvoid*(rq));}// 等待两个线程for(int i 0; i CONSUMER_NUM; i){pthread_join(c[i], nullptr);}for(int i 0; i PRODUCTOR_NUM; i){pthread_join(p[i], nullptr);}// delete掉rq防止内存泄漏delete rq;return 0;
}运行
注意我代码中的Push和Pos将申请信号量放在了加锁之前这样能够使所有在串行前的线程都预先的到其所需要的信号量资源效率更高。如电影院里不会在电影放映前10~15分钟售票 检票售票和检票放到一块排队效率太低了。我们正常情况下是先在app上买票然后电影放映前10~15进行检票此时无需再花买票的时间买票和检票分开耦合度降低就会更节省时间。上面代码中申请信号量资源就买票加锁即检票故先申请信号量后加锁。
计数器的意义
前面说了信号量本质是一把计数器这把计数器的意义是什么 前面三篇线程的博客加上这篇的一共有三种线程数据传递对临界资源的判断方式 第一次进行线程间的数据传递先进行加锁但加锁后要用if判断临界资源是否准备就绪若资源长时间没有准备就绪会导致一个线程不断重复申请锁和释放锁造成大量的时间浪费。 第二次进行线程间的数据传递时用条件变量替换了if判断这样会减少锁的申请和释放比第一种方法稍微好一点但还是需要检测临界资源是否准备就绪没有就绪就会阻塞的等待直到资源就绪了被其他线程唤醒。 第三次就是这里的信号量了本篇的代码中并没有进行直接进行临界资源就绪的判断不过也做了只是在锁外做的前面两种方式都是要先申请锁然后再判断资源是否就绪最后再释放锁根本原因是因为线程不知道临界资源的分配情况所以必须要检测临界资源但是信号量要提前预设资源的情况而在PV操作的变化过程中我们在任何地方都可以知晓临界资源的分配情况。
所以计数器的意义就是可以不用进入临界区就可得知资源的分配情况甚至可以减少临界区内部的判断未来在执行申请信号量时即使阻塞了也是在锁外阻塞的只要申请信号量成功了那么拿到锁后就可以直接对临界资源进行操作而不是等拿到锁后还要判断资源是否就绪不就绪还要等待的情况。这样就能使得加锁区间尽量短效率更高。
线程池
基本概念
池化技术是一种资源预分配的技术。在我前面博客中我写过一个简单的进程池本篇要写一个简单的线程池虽然应该把线程池放到我后面的网络的博客中再讲的不过既然学了线程了稍微写写也没什么坏处。
池化技术本质上是以空间换时间的计数像STL库中的内存池了解过的同学应该知道就是以空间换时间先开一大块内存然后一点一点的分配而不是需要一小块内存了再一小块一小块的开后者效率比前要低因为开空间这个动作也是有时间消耗的。
线程池就是先创建一批线程当任务来的时候直接将任务分配给某一个线程而不是等任务来的时候再创建线程。
说一下大致思路就写代码 首先主线程负责派发任务当然你也可以设置多个线程来派发任务多个从线程负责接受任务并执行任务。这是不是就很像生产消费者模型不是很像是就是。
线程池中存放的就是从线程主线程在线程池之外。
这样的话就是单生产者多消费者那么我们需要维护的关系就是消消的互斥和生消的同步与互斥。
代码
代码量有点大我先给出运行结果
代码有点多我就不细讲了这也花了我不少时间写的讲起来太麻烦了有不懂的同学可以评论区问好几个文件 Makefile就不给了其他的给出来 ThreadPool.hpp #pragma once#include Thread.hpp
#include LogMessage.hpp
#include lockGuard.hpp
#include caculator.hpp#include vector
#include queueconst int DEFAULT_SIZE 5;templateclass T
class ThreadPool
{
private: // Routine专用接口// 获取锁地址pthread_mutex_t* _GetMTX(){return _mtx;}// 获取生消信号量地址pthread_cond_t* _GetCond(){return _cpCond;}// 判断任务队列中是否为空bool _IsEmpty(){return _taskQueue.empty();}T _GetTask(){T task _taskQueue.front();_taskQueue.pop();return task;}// 非static函数会有this指针这样在创建线程的时候函数指针pfunc会// 和非static函数不匹配报错所以要改为staticstatic void* Routine(void* args){// 获取到当前线程池的地址因为Routine没有this指针就无法拿到任务Thread_name_and_Args* tNA reinterpret_castThread_name_and_Args*(args);ThreadPoolT* pt reinterpret_castThreadPoolT*(tNA-_args);while(1){T task;{// 多个消费者获取任务先上锁LockGuard lg(pt-_GetMTX());// 上完锁判断是否有任务没有任务就等while(pt-_IsEmpty()) pthread_cond_wait(pt-_GetCond(), pt-_GetMTX());// 此处一定可以获取任务task pt-_GetTask();}task(tNA-_name);}}public:ThreadPool(int size DEFAULT_SIZE): _size(size){// 锁和条件变量初始化pthread_cond_init(_cpCond, nullptr);pthread_mutex_init(_mtx, nullptr);// 线程池中创建线程for(int i 0; i _size; i){// 线程名字std::string name(Thread[);name (std::to_string(i 1) ]);// 往线程池中加入线程 /*给ThreadData传this指针不然Routine中线程拿不到任务*/_threadPool.push_back(new Thread(name, Routine, this));}}// 添加任务void PushTask(const T task){// 生消互斥先上锁LockGuard lg(_mtx);_taskQueue.push(task);// 添加好任务就发送条件信号让消费者消费pthread_cond_signal(_cpCond);}// 启动所有线程void RunAllThread(){for(int i 0; i _size; i){_threadPool[i]-CreateThread();LogMessage(0, _F, _L, %s启动成功, _threadPool[i]-getName().c_str());}}// 析构附加等待线程~ThreadPool(){for(int i 0; i _size; i){_threadPool[i]-JoinThread();delete _threadPool[i];}pthread_mutex_destroy(_mtx);pthread_cond_destroy(_cpCond);}
private:// 线程池std::vectorThread* _threadPool;// 线程池大小int _size;// 任务队列std::queueT _taskQueue;// 消消锁和生消锁pthread_mutex_t _mtx;// 生消条件变量pthread_cond_t _cpCond;
};calculator.hpp #pragma once#include functional
#include vector
#include string
#include LogMessage.hpptypedef std::functionint(int, int) func;std::vectorstd::pairchar, func kv;class Calculator
{
public:Calculator(){}Calculator(int x, int y, func fun, int index): _x(x), _y(y), _fun(fun), _index(index){}void operator()(const std::string name){// 执行任务LogMessage(NORMAL, _F, _L, %s 执行任务 :: %d %c %d %d, name.c_str(), _x, kv[_index].first, _y, _fun(_x, _y));}public:int _x;int _y;func _fun;int _index;
};void loadCal()
{func MyAdd [](int x, int y){ return x y; };func MySub [](int x, int y){ return x - y; };func MyMul [](int x, int y){ return x * y; };func MyDiv [](int x, int y){ return x / y; };kv.push_back(std::pairchar, func(, MyAdd));kv.push_back(std::pairchar, func(-, MySub));kv.push_back(std::pairchar, func(*, MyMul));kv.push_back(std::pairchar, func(/, MyDiv));
}lockGuard.hpp #pragma once#include pthread.hclass LockGuard
{
public:LockGuard(pthread_mutex_t* pmtx):_pmtx(pmtx){pthread_mutex_lock(_pmtx);}~LockGuard(){pthread_mutex_unlock(_pmtx);}public:pthread_mutex_t* _pmtx;
};LogMessage.hpp #pragma once
#include cstdio
#include vector
#include ctime
#include cstdarg
#include unistd.h
#include LogMessage.hpp// 文件名
#define _F __FILE__
// 所在行
#define _L __LINE__enum level
{DEBUG, // 0NORMAL, // 1WARING, // 2ERROR, // 3FATAL // 4
};std::vectorconst char* gLevelMap {DEB,NOR,WAR,ERR,FAT
};#define FILE_NAME ./log.txt// 格式化打印日志信息
void LogMessage(int level/*日志等级*/, const char* file/*文件名*/, int line/*所在行*/, const char* format, .../*自定义格式*/)
{// 选择性打印等级为DEBUG的信息编译的时候加上命令行定义NO_DEBUG就不会打印DEBUG信息
#ifdef NO_DEBUGif(level DEBUG) return;
#endif// 固定格式char FixBuffer[128];time_t tm time(nullptr); // 获取时间戳// 日志级别 时间 哪一个文件 哪一行snprintf(FixBuffer, sizeof(FixBuffer), ---------------------------------------%s |%s|-%d\n%s, \gLevelMap[level], // 等级file, // 文件名line, // 所在行ctime(tm) // 时间戳转正常时间);// 用户自定义格式char DefBuffer[128];va_list args; // 定义一个可变参数va_start(args, format); // 用format初始化可变参数vsnprintf(DefBuffer, sizeof DefBuffer, format, args); // 将可变参数格式化打印到DefBuffer中va_end(args); // 销毁可变参数// 往显示器打printf(%s%s\n_______________________________________\n, FixBuffer, DefBuffer);// 往文件中打// FILE* pf fopen(FILE_NAME, a);// fprintf(pf, %s%s\n\n, FixBuffer, DefBuffer);// fclose(pf);
}Thread.hpp #ifndef __THREAD_HPP__
#define __THREAD_HPP__#include iostream
#include stringtypedef void*(*pfunc)(void*);#include pthread.h// 封装线程名称和线程回调函数的参数
class Thread_name_and_Args
{
public:Thread_name_and_Args(const std::string name, void* args): _name(name), _args(args){}
public:std::string _name;void* _args;
};// 线程接口的封装
class Thread
{
public:Thread(const std::string name, pfunc func, void* args): _NA(name, args), _func(func){}// 创建线程void CreateThread(){pthread_create(_tid, nullptr, _func, _NA);}// 等待线程void JoinThread(){pthread_join(_tid, nullptr);}const std::string getName()const{return _NA._name;}~Thread(){}private:pthread_t _tid; // 线程idThread_name_and_Args _NA; // 线程名称和回调函数参数pfunc _func; // 回调函数的指针
};
#endiftest.cc #include LogMessage.hpp
#include Thread.hpp
#include ThreadPool.hpp
#include caculator.hpp#include unistd.hint main()
{srand((unsigned int)time(nullptr));loadCal();ThreadPoolCalculator* tp new ThreadPoolCalculator();// 主线程创建所有从线程tp-RunAllThread();// 主线程派发任务while(1){sleep(1);int x rand() % 200 1;int y rand() % 100 1;int index rand() % 4;LogMessage(NORMAL, _F, _L, 主线程派发任务 :: %d %c %d ?, x, kv[index].first, y);tp-PushTask(Calculator(x, y, kv[index].second, index));}// 析构就会等待线程delete tp;return 0;
}前面也说了这里的线程池应该放到后面的网络再讲的所以这里的线程池只是一个简单的任务分发没有什么经过网络接收任务啥的比较简陋不过新手想搞懂也得花点时间的。
单例模式
关于单例模式我前面有一篇博客讲过了不过是在C的博客中讲的那时候还没写关于线程的博客这里再讲单例模式就是为了和线程结合一下但是不会再细说了只是把线程池中的代码改成单例模式的如果想要了解单例模式的话可以看我这篇【C】特殊类的设计。
单例模式有两种一种是饿汉模式一种是懒汉模式其中懒汉模式有线程不安全问题这里我就用懒汉模式来修改一下上面的线程池并对懒汉模式进行优化变为线程安全的懒汉。
其实上面的线程池就可以说是单例模式了整块代码下来就创建了一个线程池对象但是仅凭这一点也不能说是单例因为还可以再创建对象或者拷贝对象这两点不能满足。所以需要把这两点也加上。代码中要改的地方不多。
对于ThreadPool做的第一步工作
然后创建一个静态的指针
然后再提供一个接口来为这个指针初始化和获取这个指针
然后再改一下主线程部分
这样就是一个懒汉模式运行
其实运行结果就和上面的一样。
还可以不保存类中的那个指针直接用静态的接口去调用类中的函数
也可以的。
但是因为只有一个主线程这里的场景不会造成线程不安全的问题但是当多个线程同时调用懒汉模式下的接口时就会出问题再来看一下这个接口
此处的判空是有问题的因为这里没有加锁当多个线程同时进入时可能一个线程进入了if中但是_threadPoolPtr还没修改但此时多个线程就已经判断了_threadPoolPtr为空了那么就会有多个线程同时进到if中这样问题就大了因为多个线程都会执行new操作但是最终只会有一个new出来的空间被_threadPoolPtr所指剩下new出来的空间就找不到了进而就导致了内存泄漏。
所以说我们要把这里改一改加个锁 但是此时还有问题只要该接口被调用了调用其的线程都必须要执行一次加锁和解锁的操作这样就算_threadPoolPtr开了空间还是会进行判断效率就会很低。
所以还得优化优化
再套上一层if判断如果不为空就不会再进行加锁了也是只有第一次为空的时候会出现多个线程申请锁的情况剩下的情况都不会申请锁这样效率就高多了。
STL,智能指针和线程安全
STL中的容器是否是线程安全的?
不是。
原因如下
STL 的设计初衷是将性能挖掘到极致, 而一旦涉及到加锁保证线程安全, 会对性能造成巨大的影响而且对于不同的容器, 加锁方式的不同, 性能可能也不同(例如hash表的锁表和锁桶)。
因此 STL 默认不是线程安全. 如果需要在多线程环境下使用, 往往需要调用者自行保证线程安全
智能指针是否是线程安全的? 对于 unique_ptr, 由于只是在当前代码块范围内生效, 因此不涉及线程安全问题。 对于 shared_ptr, 多个对象需要共用一个引用计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原子操作(CAS)的方式保证 shared_ptr 能够高效, 原子的操作引用计数。
其他常见的各种锁
悲观锁在每次取数据时总是担心数据会被其他线程修改所以会在取数据前先加锁读锁写锁行锁等当其他线程想要访问数据时被阻塞挂起。 我们前面讲的锁都是悲观锁。乐观锁每次取数据时候总是乐观的认为数据不会被其他线程修改因此不上锁。但是在更新数据前会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式版本号机制和CAS操作。 这里的乐观锁和下面的CAS操作在JAVA中讲的更多一点这里就不详细解释啥了。 CAS操作当需要更新数据时判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败失败则重试一般是一个自旋的过程即不断重试。
自旋锁公平锁非公平锁。
说说自旋锁。 其和我们前面悲观锁的区别就一点会进行轮询检测。先不说是干啥的。
先来讲个小例子 假如你朋友家跟你家不在一块你去TA家楼下找TA叫TA出去玩。两种场景 第一种 此时TA说TA不能去因为TA妈妈让TA写作业但TA得一个小时才能写完你俩约定好一个小时候再在TA家楼下集合此时有一个小时的时间你会干什么应该不会站在TA家楼下干等着吧。或许你会先找个网吧坐一会或者去奶茶店买杯奶茶啥的反正这一个小时就是在等TA。 第二种 TA说等TA几分钟TA正在吃饭吃完了马上就下来你答应了此时你应该是直接在TA家楼下等就行不会说再跑去网吧或者买奶茶了吧跑到半路有可能人家就下来了。三分钟后你问TA好了没TA说再等几分钟马上就好你又答应了继续等。又过了一分钟你问好了没TA又说马上你又打答应了又过了…… 第一种方式等的时间比较长你就会决定去网吧或奶茶店休息去。 第二种方式你是直接在人家楼下等过一段时间问一下好了没没好久再过一段时间问一下知道人家好了。
那么第二种就是轮询检测不断地去询问资源是否准备就绪而不是进阻塞队列中干等。这就是自旋锁。
其实和悲观锁差不了多少也不是什么重点就说说其接口 初始化和销毁的 上锁的 解锁的 其实就和互斥锁的接口差不多用起来只是吧mutex换成spin就行了。
读者写者问题
读写锁
在编写多线程的时候有一种情况是十分常见的。
那就是有些公共数据修改的机会比较少。相比较改写它们读的机会反而高的多。
通常而言在读的过程中往往伴随着查找的操作中间耗时很长。给这种代码段加锁会极大地降低我们程序的效率。
那么有没有一种方法可以专门处理这种多读少写的情况呢 有那就是读写锁。 其实读写锁这里也遵循321原则321原则不懂的同学在我的第三篇中。 三种关系读读共享、写写互斥、读写同步与互斥。 两种角色读者和写者。 一种交易场所某种数据结构。
说一下读写者问题和生消模型的本质区别 消费者会取走数据但读者不会。
当有读者在读的时候写者不能写读者会有一个计数器来统计当前读者人数每次读前都要使该计数器加一读后都要使该计数器减一且加一和减一都要使原子的。
大致流程如下
所以总体看来是读者优先的。当读写同时开始执行时必须让读者先读。但是这样可能会导致写者饥饿问题因为可能会有读者不断在读此时写者就会一直得不到资源从而无法进行写操作。
但是也可设置写者优先当读写同时开始执行必须让写者进行如果后续有读者要读则必须要等写者写完后才能进去读。
接口介绍 初始化和销毁 上锁分两种 上写锁 上读锁 解锁只有一种读锁和写锁都能解 这里就不展开讲了。
到此结束。。。