查 网站接入服务提供者名称,免费一级域名网站,wordpress 说说页面,WordPress云笔记#x1f3ac; 个人主页#xff1a;谁在夜里看海.
#x1f4d6; 个人专栏#xff1a;《C系列》《Linux系列》《算法系列》
⛰️ 道阻且长#xff0c;行则将至 目录
#x1f4da;一、线程概念
#x1f4d6; 回顾进程
#x1f4d6; 引入线程
#x1f4d6; 总结 个人主页谁在夜里看海. 个人专栏《C系列》《Linux系列》《算法系列》
⛰️ 道阻且长行则将至 目录
一、线程概念 回顾进程 引入线程 总结 Linux下的线程
二、线程操作phread
1.线程创建
语法
示例
2.线程退出
语法
示例
3.线程取消
语法
示例
4.线程等待
语法
示例
5.分离线程
语法
示例
三、线程互斥 引入 互斥量的接口
1.初始化
2.销毁
3.加锁
4.解锁
示例 死锁
四个必要条件
示例
解决办法
四、线程同步 概念
1.条件变量
基本概念
语法 示例生产消费者模型
2.信号量
基本概念
语法
示例基于环形队列的PC-model
五、线程池 概念 实现
1.初始化
2.任务队列
3.线程同步
4.工作线程
5.线程池退出
总结 一、线程概念
线程与进程密不可分要理解线程的概念我们先来回顾一下进程 回顾进程
我们说进程是操作系统资源分配的基本单位是一个程序在计算机上的运行实例对于这句话我们分两点进行阐述
① 一个进程被创建时系统会它开辟一段虚拟地址空间每个进程都有各自的虚拟内存空间这保证了进程间的独立性。操作系统会为每个进程分配各种资源因此说进程是资源分配的基本单位
② 当程序被加载到内存并开始执行时它就变成了一个进程。换句话说程序是静态的文件而进程是程序运行的动态实体。 引入线程
程序被加载到内存中是要执行一些操作的 但如果系统仅支持单一的进程模型每个程序只能顺序执行只有一个执行流这样在面对一些复杂任务以及高并发情况时并不能满足需求。
此时我们可以采用多进程模型执行但是同样也会产生一些问题由于每个进程都需要独立的虚拟内存空间如果大量创建进程会导致巨大的内存开销对于一些高并发的小任务如大量 I/O 操作并不需要让每个操作都独占一个进程这样太浪费资源了那有没有什么办法可以让这些并发操作在一个进程下执行呢
于是就引入了多线程的概念线程是进程内的执行单元多个线程可以在同一个进程中并发执行。由于线程共享进程的内存空间因此相比多进程所需的内存和资源消耗更少并且线程之间的上下文切换比进程之间的上下文切换也要更快这使得多线程模型在I/O密集型、高并发等场景下具有非常大的优势。 总结
如果说进程是操作系统资源分配的基本单元那么线程就是资源调度的基本单元也是程序执行的基本单元。我们可以把进程看作工厂资源就是工厂内部的原材料与各种机械设备而线程就是工厂里的工人。
因此如果进程只有一个执行流那它也是一个线程称之为主线程我们在进程中创建线程其实就是在主线程下创建其他线程线程与进程的关系如下图 说完了线程的概念我们下面来说一下线程的操作。 Linux下的线程
线程和进程是两个独立的概念在现代操作系统下例如Windows线程和进程都是通过内核进行管理和调度和进程一样每个线程也有独立的执行上下文并且可以在内核调度器中进行独立的调度。操作系统的内核负责线程的创建、销毁和调度。
然而在Linux中线程并不是一个独立实体而是通过轻量级进程LWP, Light Weight Process来实现的Linux用进程的概念来管理线程其主要原因有下
Linux内核本身是设计为面向进程的管理模式即所有的任务和执行单元都被视为“进程”而线程的概念是在Linux设计成型之后才提出的因此为了避免增加内核复杂度Linux选择将线程视为特殊类型的进程即LWP而非引入一个全新的抽象概念。
除了通过内核直接管理线程我们还可以使用线程库来对线程进行管理线程库提供了用户空间的接口能过方便地管理和操作线程而LWP的实现方式与POSIX线程库的设计要求高度一致因此在Linux下我们通常使用POSIX线程库pthread实现对线程的创建、销毁、同步等操作。
二、线程操作phread
POSIX线程库通常称为 pthreads即 POSIX Threads是基于 POSIX 标准的一组接口用于在程序中创建和管理线程它提供了一个标准的、跨平台的线程管理框架广泛应用于多种操作系统中包括 UNIX、Linux、macOS 以及一些类 Unix 系统。
pthreads 提供了以下主要功能
1.线程创建
线程的创建操作通过 pthread_create() 函数完成。该函数用来创建一个新的线程并指定其执行的函数和相关的参数。
语法
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);① 参数1thread这是一个指向 pthread_t 类型的指针用来返回新创建线程的标识符
❓pthread_t 类型该怎么理解
✅首先我们要清楚虽然线程之间共享进程的资源但是线程在被创建时系统还是会给它分配独立的线程栈、寄存器等状态信息以确保线程的独立执行而 pthread_t 类型的变量则用于存储线程的唯一标识符该标识符指向一个线程控制块TCB这个控制块包含了线程的执行状态、栈地址、调度信息等通过pthread_t操作系统能够高效地查找和管理线程的上下文。
由于线程控制块TCB存放在虚拟地址空间的共享区中因此pthread_t实际上存放的就是一个地址指向共享区上的一块空间。 ② 参数2attr这是一个指向 pthread_attr_t 类型的指针指定线程的属性。如果为NULL则使用线程的默认属性。
③ 参数3start_routine这是一个函数指针指向线程执行的函数。线程启动时将从该函数开始执行。
④ 参数4arg这是传递给 start_routine 函数的参数。它是一个 void* 类型的指针可以传递任何类型的数据在函数体内部通过强制类型转换对数据接收。
⑤ 返回值成功时返回 0失败时返回错误码信息。
示例
这里我们创建一个线程并循环打印线程 tid
#include stdio.h
#include pthread.h
#include unistd.h
#include sys/syscall.hlong gettid(void)
{return syscall(SYS_gettid);
}void *func1(void* arg)
{int i 1;while(i){printf(i am pthread 1, tid is %ld\n, gettid());sleep(1);}
}int main()
{pthread_t tid;if(pthread_create(tid, NULL, func1, NULL) ! 0)perror(pthread_create:);printf(thread 1 tid is %lu\n,tid);int i 1;while(i){printf(i am main pthread, tid is %ld\n, gettid());sleep(1);}return 0;
}
运行结果 2.线程退出
线程退出操作通过 pthread_exit() 完成。当线程执行完自己的任务时通常会调用 pthread_exit() 来退出确保线程的资源得到正确释放。
语法
void pthread_exit(void *retval);参数retval表示线程的返回值主线程可以通过 pthread_join() 获取这个返回值pthread_join()是线程等待函数后面会讲解。
返回值因为线程终止时无法获取自身的返回值因此该函数没有返回值。
示例
#include pthread.h
#include stdio.hvoid* thread_func(void *arg) {printf(Thread is exiting...\n);pthread_exit(NULL);
}int main() {pthread_t thread;// 创建线程pthread_create(thread, NULL, thread_func, NULL);// 等待线程退出pthread_join(thread, NULL);printf(Main thread finished.\n);return 0;
}3.线程取消
终止线程还可以通过 pthread_cancel()pthread_cancel() 用于请求取消某个线程的执行。它是异步的不会立即终止目标线程而是向目标线程发送取消请求目标线程在合适的地方检查这个请求并决定是否退出。
使用流程
① 调用 pthread_cancel() 向指定线程发送取消请求。
② 目标线程检查取消请求如果设置了 取消点即线程可以响应取消请求的地方线程就会终止。
③ 如果线程不在取消点处它可能会继续运行直到它进入一个取消点。
语法
int pthread_cancel(pthread_t thread);① 参数1thread这是目标线程的标识符即 pthread_create() 返回的线程ID。
② 返回值成功时返回 0失败时返回错误码ESRCH 表示线程不存在EINVAL 表示线程不支持取消等。
示例
取消线程的执行
#include pthread.h
#include stdio.h
#include unistd.hvoid* thread_func(void *arg) {printf(Thread started...\n);// 模拟长时间运行的任务for (int i 0; i 5; i) {printf(Thread running: %d\n, i);sleep(1);}printf(Thread finished.\n);return NULL;
}int main() {pthread_t thread;// 创建线程pthread_create(thread, NULL, thread_func, NULL);// 睡眠2秒后请求取消线程sleep(2);printf(Requesting thread cancellation...\n);pthread_cancel(thread);// 等待线程结束pthread_join(thread, NULL);printf(Main thread finished.\n);return 0;
}在上述示例中主线程创建了一个子线程该子线程执行一个循环每秒打印一次 Thread running。主线程睡眠 2 秒后调用 pthread_cancel(thread) 请求取消子线程。此时子线程如果处于取消点可能会退出。子线程完成后主线程通过 pthread_join() 等待子线程终止。
运行结果 4.线程等待
线程等待操作通过 pthread_join() 完成。调用 pthread_join() 函数可以让主线程等待子线程执行完毕。
语法
int pthread_join(pthread_t thread, void **retval);① 参数1thread这是要等待的线程的标识符即 pthread_create() 返回的线程ID。
② 参数2retval这是一个指向 void* 类型的指针用来接收线程的返回值返回值类型为void* 所以此处为void** 。如果不需要返回值可以传递 NULL。
③ 返回值成功时返回 0失败时返回错误码。
示例
#include stdio.h
#include stdlib.h
#include string.h
#include unistd.h
#include pthread.hvoid *thread1(void *arg)
{printf(thread1 returning...\n);int *p (int*)malloc(sizeof(int));*p 1;return (void*)p;
}void *thread2(void *arg)
{printf(thread2 exiting...\n);int *p (int*)malloc(sizeof(int));*p 2;pthread_exit((void*)p);
}void *thread3(void *arg)
{while(1){printf(thread3 running...\n);sleep(1);}return NULL;
}int main()
{pthread_t tid;void *ret;// thread1 returnpthread_create(tid, NULL, thread1, NULL);pthread_join(tid, ret);printf(thread return, tid is %p, return code is %d\n, tid, *(int*)ret);free(ret);// thread2 exitpthread_create(tid, NULL, thread2, NULL); pthread_join(tid, ret);printf(thread exit, tid is %p, return code is %d\n, tid, *(int*)ret);free(ret);// thread2 exitpthread_create(tid, NULL, thread3, NULL);sleep(3);pthread_cancel(tid);pthread_join(tid, ret);if(ret PTHREAD_CANCELED)printf(thread exit, tid is %p, return code is PTHREAD_CANCELED\n, tid);return 0;
}
我们分别创建3个线程每创建一个线程及时对线程等待回收资源接着创建下一个线程由于上一个线程的资源被回收因此下一个线程可以复用上一个线程的资源体现在它们的TCB地址是一样的运行结果 5.分离线程
默认情况下新创建的线程是 joinable 的支持被等待线程退出后需要对其进行pthread_join操作否则无法释放资源从而造成系统泄漏。
但如果我们并不关心线程的返回值此时join是一种负担这个时候我们可以告诉系统当线程退出时自动释放线程资源也就是将线程分离
语法
可以是线程组内其他线程对目标线程进行分离
int pthread_detach(pthread_t thread);
① 参数thread这是线程标识符即通过 pthread_create() 创建的线程ID。
② 返回值成功时返回 0失败时返回错误码。
也可以是线程自己分离:
pthread_detach(pthread_self());
pthread_self() 函数用于获取线程自身标识符。
注意❗️
不能同时调用 pthread_detach() 和 pthread_join()一个线程不能既是joinable又是分离的
示例
#include stdio.h
#include pthread.hvoid *pthread(void *arg)
{pthread_detach(pthread_self());printf(%s\n,(char*)arg);return NULL;
}int main()
{pthread_t tid;pthread_create(tid, NULL, pthread, pthread1 running...);int ret 0;sleep(1); // 让线程先分离再进行等待if(pthread_join(tid, NULL) 0){printf(pthread wait success\n); // 说明线程joinableret 0;}else{printf(pthread wait failed\n); // 说明线程unjoinableret 1; }return ret;
}
线程分离之后调用 pthread_join() 等待该线程就会失败 三、线程互斥 引入
理解线程互斥之前我们先了解两个概念临界资源和临界区多线程执行流共享的资源就叫做临界资源每个线程内部访问临界资源的代码就叫临界区。
而线程互斥就是在任何时刻只能有一个执行流进入临界区访问临界资源。为什么要这么规定呢因为多个线程同时对临界资源访问时可能会导致资源竞争问题举个例子
当多个线程同时要对变量i进行操作时由于i操作在底层分为3步① 从内存提取i到寄存器中② 在寄存器中完成操作③ 将后的值重新写入到内存中。对应三条汇编指令
① load将共享变量 i 从内存加载到寄存器中
② update更新寄存器里面的值执行1操作
③ store将新值从寄存器写回共享变量 i 的内存地址。
这就会导致一些问题比如线程1和线程2同时将i提取到各自的寄存器中线程1先完成了操作并重写会内存但是由于这一步并不会更新到线程2的寄存器中因此虽然变量i被执行了两次操作但实际上只有一次。
为了更直观地展示资源竞争现象我们模拟了一个抢票的流程
#include stdio.h
#include pthread.hint ticket 100;
pthread_mutex_t mutex;void *buy(void *arg)
{// pthread_detach(pthread_self());char *id (char*)arg;while(1){if(ticket 0){usleep(1000);printf(%s sells ticket: %d\n, id, ticket);--ticket;}else{break;}}
}int main()
{pthread_t t1, t2, t3, t4;pthread_mutex_init(mutex, NULL);pthread_create(t1, NULL, buy, thread 1);pthread_create(t2, NULL, buy, thread 2);pthread_create(t3, NULL, buy, thread 3);pthread_create(t4, NULL, buy, thread 4);pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);pthread_mutex_destroy(mutex);// sleep(30);return 0;
}
这里的临界资源为总票数 ticket我们创建了4个线程来模拟抢票过程进入临界区对ticket进行--操作此时会出现什么结果呢 我们发现ticket最终变成了负数理论上 ticket 变为0时就应该终止了这就是由于非原子行操作ticket--所造成的资源竞争问题。
要解决上面问题需要做到三点
① 代码必须要有互斥行为当代码进入临界区执行时不允许其他线程进入该临界区。
② 如果多个线程同时要求执行临界区的代码并且临界区没有线程在执行那么只能允许一个线程进入该临界区。
③ 如果线程不在临界区中执行那么该线程不能阻止其他线程进入临界区。
要做到这三点本质上就是需要一把锁。Linux上提供的这把锁叫互斥量。 互斥量的接口
1.初始化
初始化互斥量有两种方法
静态分配 pthread_mutex_t mutex PTHREAD_MUTEX_INITIALIZER
动态分配
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);① 参数1mutex指向互斥量对象的指针传入一个未初始化的 pthread_mutex_t 类型变量。
② 参数2attr用于设置互斥量属性通常设置为 NULL表示使用默认的互斥量属性。
③ 返回值成功时返回0失败时返回错误码。
❗️注意如果使用 PTHREAD_MUTEX_INITIALIZER 静态分配互斥量就不需要调用pthread_mutex_init 函数进行初始化。
2.销毁
线程在不再需要互斥量时可以调用 pthread_mutex_destroy() 来销毁互斥量。销毁互斥量之前必须确保没有任何线程持有该互斥量的锁。
int pthread_mutex_destroy(pthread_mutex_t *mutex);❗️注意如果使用 PTHREAD_MUTEX_INITIALIZER 静态分配互斥量就不需要调用pthread_mutex_destroy 函数对其销毁。
3.加锁 线程在访问共享资源之前需要通过 pthread_mutex_lock() 获得互斥量的锁。若其他线程已持有该互斥量的锁调用线程会被阻塞直到互斥量被释放。
int pthread_mutex_lock(pthread_mutex_t *mutex);① 参数mutex指向要锁定的互斥量对象的指针。
② 返回值成功时返回0失败时返回错误码。
4.解锁 当线程完成对共享资源的访问后需要释放互斥量的锁。其他线程可以在解锁后获得该锁。
int pthread_mutex_unlock(pthread_mutex_t *mutex);① 参数mutex指向要锁定的互斥量对象的指针。
② 返回值成功时返回0失败时返回错误码。
示例 我们运用互斥锁来改进上面的售票模拟代码
#include stdio.h
#include pthread.hint ticket 100;
pthread_mutex_t mutex;void *buy(void *arg)
{// pthread_detach(pthread_self());char *id (char*)arg;while(1){pthread_mutex_lock(mutex); // 访问临界区前加锁if(ticket 0){usleep(1000);printf(%s sells ticket: %d\n, id, ticket);--ticket;pthread_mutex_unlock(mutex); // 操作完成后解锁}else{pthread_mutex_unlock(mutex);break;}}
}int main()
{pthread_t t1, t2, t3, t4;pthread_mutex_init(mutex, NULL);pthread_create(t1, NULL, buy, thread 1);pthread_create(t2, NULL, buy, thread 2);pthread_create(t3, NULL, buy, thread 3);pthread_create(t4, NULL, buy, thread 4);pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);pthread_mutex_destroy(mutex);// sleep(30);return 0;
}
❗️注意这里我们需要在判断 ticket 语句之前就进行加锁而不是在ticket前加锁因为对ticket的判断语句也属于临界区。执行结果 ticket在减到0时所有线程停止访问并退出符合预期。 死锁
使用互斥锁实现线程间互斥时如果多个线程之间在获取资源时发生了相互依赖的情况导致个线程在互相等待对方释放资源从而形成了一个无限等待的状态导致死锁的发生。
四个必要条件
要发生死锁必须满足以下四个必要条件
① 互斥条件一个资源只能由一个线程占用。如果有其他线程请求这个资源那它必须等待
② 占用且等待一个线程至少占有了一个资源并且等待获取被其他线程占用的资源
③ 非抢占条件当线程持有某个资源时不能强行剥夺其资源只能等待线程自己释放资源
④ 循环等待线程形成一个环形等待关系其中每个线程都在等待下一个线程释放资源。
示例
假设有两个互斥锁 mutex1 和 mutex2
#include pthread.h
#include stdio.hpthread_mutex_t mutex1 PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex2 PTHREAD_MUTEX_INITIALIZER;void* thread1_func(void* arg) {pthread_mutex_lock(mutex1);printf(Thread 1: Locked mutex1\n);// 模拟一些工作sleep(1);pthread_mutex_lock(mutex2);printf(Thread 1: Locked mutex2\n);pthread_mutex_unlock(mutex2);pthread_mutex_unlock(mutex1);return NULL;
}void* thread2_func(void* arg) {pthread_mutex_lock(mutex2);printf(Thread 2: Locked mutex2\n);// 模拟一些工作sleep(1);pthread_mutex_lock(mutex1);printf(Thread 2: Locked mutex1\n);pthread_mutex_unlock(mutex1);pthread_mutex_unlock(mutex2);return NULL;
}int main() {pthread_t t1, t2;pthread_create(t1, NULL, thread1_func, NULL);pthread_create(t2, NULL, thread2_func, NULL);pthread_join(t1, NULL);pthread_join(t2, NULL);return 0;
}在这个例子中
① 线程1锁住了 mutex1然后等待 mutex2。
② 线程2锁住了 mutex2然后等待 mutex1。
这样两个线程就互相等待对方释放锁造成了死锁 解决办法
为了避免死锁可以采取以下策略
① 避免占有且等待尝试在一个线程内一次性获取所有必需的锁而不是按顺序逐个获取资源。这样线程不会在持有一个资源的同时等待其他资源。
② 锁的顺序确保所有线程都按相同的顺序请求锁。这样可以避免循环等待。例如如果线程都按 mutex1 - mutex2 的顺序请求锁则可以避免死锁。
③ 死锁检测和恢复设计死锁检测算法定期检查是否存在死锁现象。一旦检测到死锁可以通过中止线程、回滚或其他方法来恢复。
④ 使用超时机制在请求锁时可以设置超时。如果线程在一段时间内未能获取到锁可以放弃并重新尝试避免无限等待。
⑤ 使用非阻塞锁使用诸如 pthread_mutex_trylock 等非阻塞式锁操作如果锁被占用线程会立即返回而不是等待这样也能避免死锁。
四、线程同步 概念
上面讲到线程互斥保证了任意时刻只有一个线程访问共享资源避免了资源竞争和冲突但是这并不能控制线程执行的顺序。因此它适用于线程之间相对独立、不需要互相协调的场景例如抢票系统。在这种情况下多个线程访问共享资源总票数时只需要保证同一时刻只有一个线程修改票数谁抢到算谁的不需要额外的线程协作。
但是线程之间有时需要相互协作例如生产消费者模型。在这种情况下消费者线程需要等待生产者线程生产数据才能进行消费换句话说消费线程与生产线程之间具有一定的执行顺序此时如果只用线程互斥并不能满足需求于是就需要引入线程同步机制
线程同步是在线程互斥的基础上进一步控制线程的执行顺序确保线程之间在特定的时刻能够按规定的顺序访问资源。线程同步使用于需要线程协作的场景例如生产消费者模型、读者写者模型等等。线程同步不仅确保线程在访问共享资源时不会发生冲突还控制了线程之间的执行顺序保证程序逻辑的正确性。
下面我们来介绍线程同步的实现方法。
1.条件变量
线程可以通过条件变量等待特定条件的发生并在条件满足时被唤醒。条件变量常用于生产者消费者模型生产者线程生产数据消费者线程消费数据确保它们在正确的时机执行。
基本概念
条件变量Condition Variable允许线程在特定条件下进入等待状态并在条件满足时被唤醒。它通常与 互斥锁 一起使用以保证对共享资源的安全访问。
语法
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);① pthread_cond_wait
让调用线程阻塞直到被其他线程通过 pthread_cond_signal 或 pthread_cond_broadcast 唤醒。它需要一个互斥锁作为参数以确保在等待条件时对共享资源的访问是安全的。
② pthread_cond_signal 唤醒一个在条件变量上等待的线程。如果有多个线程在等待系统随机选择一个线程唤醒。
③ pthread_cond_broadcast 唤醒所有在条件变量上等待的线程。
❓ 如何理解条件变量的工作原理
✅ 当线程调用 pthread_cond_wait 时它会释放与条件变量相关联的互斥锁并进入等待状态。当另一个线程调用 pthread_cond_signal 或 pthread_cond_broadcast 时等待的线程会被唤醒。唤醒后线程会重新获取互斥锁然后继续执行保证了线程之间的协调避免了数据竞争。 示例生产消费者模型
#include iostream
using namespace std;
#include queue
#include pthread.h
#include stdlib.h
#include unistd.h#define NUM 8class BlockQueue{
private:queueint q;int cap NUM;pthread_mutex_t lock;pthread_cond_t full;pthread_cond_t empty;void QueueLock(){pthread_mutex_lock(lock);}void QueueUnlock(){pthread_mutex_unlock(lock);}void ProductWait(){pthread_cond_wait(full, lock);}void ConsumeWait(){pthread_cond_wait(empty, lock);}void NotifyProcduct(){pthread_cond_signal(full);}void NotifyConsume(){pthread_cond_signal(empty);}bool IsEmpty(){if(q.size() 0) return true;else return false;}bool IsFull(){if(q.size() cap) return true;else return false;}public:BlockQueue(){pthread_mutex_init(lock, nullptr);pthread_cond_init(full, nullptr);pthread_cond_init(empty, nullptr);}~BlockQueue(){pthread_mutex_destroy(lock);pthread_cond_destroy(full);pthread_cond_destroy(empty);}void PushData(const int data){QueueLock();while(IsFull()){cout Queue is full, product is waiting... endl;ProductWait();}q.push(data);NotifyConsume();QueueUnlock();}void PopData(int data){QueueLock();while(IsEmpty()){NotifyProcduct();cout Queue is empty, consume is waiting... endl;ConsumeWait();}data q.front();q.pop();NotifyProcduct();QueueUnlock();}
};void *Procduct(void *arg){BlockQueue *q (BlockQueue*)arg;srand((unsigned long)time(nullptr));while(1){int data rand() % 1024;q-PushData(data);cout Produce data done: data endl;// sleep(1);}
}void *Consume(void *arg){BlockQueue *q (BlockQueue*)arg;while(1){int data;q-PopData(data);cout Consume data done: data endl;// sleep(1);}
}int main()
{BlockQueue q;pthread_t t1, t2;pthread_create(t1, nullptr, Procduct, q);pthread_create(t2, nullptr, Consume, q);pthread_join(t1, nullptr);pthread_join(t2, nullptr);return 0;
}
我们创建了一个消费者线程和一个生产者线程共享资源为容量8的队列
① 当队列为空时消费线程阻塞等待生产线程生产数据一旦生产者生产了数据就通知唤醒消费线程消费数据
② 当队列为满时生产线程阻塞等待消费线程消费数据一旦消费线程消费了数据就通知唤醒生产线程生产数据......
部分运行结果 2.信号量
上面的生产消费者模型是基于queue队列模拟实现其空间大小是动态分配的我们判断生产消费线程阻塞等待的情况分别是队列为满与队列为空都可以直接用内置函数size()进行判断。但是如果数据存储的空间大小是一定的队列为空为满就不好直接判断此时需要借助信号量来实时记录当前空间大小。
基本概念
信号量Semaphore维护一个计数值线程在访问资源前需要检查信号量的值。当信号量的值大于零时线程可以进入临界区并访问资源当信号量的值为零时线程会被阻塞直到信号量的值增加。在生产消费者模型中信号量表示为当前剩余空间以及数据量大小。
语法
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);① pthread_cond_wait 让调用线程阻塞直到被其他线程通过 pthread_cond_signal 或 pthread_cond_broadcast 唤醒。它需要一个互斥锁作为参数以确保在等待条件时对共享资源的访问是安全的。
② pthread_cond_signal 唤醒一个在条件变量上等待的线程。如果有多个线程在等待系统随机选择一个线程唤醒。
③ pthread_cond_broadcast 唤醒所有在条件变量上等待的线程。 ❓ 如何理解条件变量的工作原理
✅ 当线程调用 pthread_cond_wait 时它会释放与条件变量相关联的互斥锁并进入等待状态。当另一个线程调用 pthread_cond_signal 或 pthread_cond_broadcast 时等待的线程会被唤醒。唤醒后线程会重新获取互斥锁然后继续执行同样保证了线程之间的协调避免了数据竞争。
❗️注意信号量的操作 sem_wait() 和 sem_post()在实现时是由操作系统提供的原子操作。也就是说信号量的计数值是通过原子操作进行修改的这些操作是不可中断的保证在多线程环境下操作信号量时不会出现竞争因此在使用信号量时不需要用互斥锁来保护对信号量的修改。
示例基于环形队列的PC-model
#include iostream
using namespace std;
#include stdlib.h
#include unistd.h
#include semaphore.h
#include pthread.h
#include vector#define NUM 8class RingQueue{
private:vectorint q;int cap NUM;sem_t data_sem;sem_t space_sem;int product_step;int consume_step;public:RingQueue(){q.resize(cap);sem_init(data_sem, 0, 0);sem_init(space_sem, 0, cap);product_step 0;consume_step 0;}~RingQueue(){sem_destroy(data_sem);sem_destroy(space_sem);}void PushData(const int data){sem_wait(space_sem);q[product_step] data;product_step;product_step % cap;sem_post(data_sem);}void PopData(int data){sem_wait(data_sem);data q[consume_step];consume_step;consume_step % cap;sem_post(space_sem);}
};void *Produce(void *arg){RingQueue *q (RingQueue*)arg;srand((unsigned long)time(nullptr));while(1){int data rand() % 1024;q-PushData(data);cout Produce data done: data endl;// sleep(1);}
}
void *Consume(void *arg){RingQueue *q (RingQueue*)arg;while(1){int data;q-PopData(data);cout Consume data done: data endl;// sleep(1);}
}int main()
{RingQueue q;pthread_t t1, t2;pthread_create(t1, nullptr, Produce, q);pthread_create(t2, nullptr, Consume, q);pthread_join(t1, nullptr);pthread_join(t2, nullptr);return 0;
}
创建了一个消费者线程和一个生产者线程共享源为容量8的环形队列
① 当数据信号量为0时表示当前没有数据消费线程阻塞等待生产线程生产数据之后增加数据信号量此时消费线程会被唤醒
② 当空间信号量为0时表示当前没有空间生产线程阻塞等待消费线程消费数据之后增加空间信号量此时生产线程会被唤醒......
部分运行结果 五、线程池
相比于多进程多线程大大减少了空间开辟和释放的开销但并不意味着多线程的创建和销毁的开销可以忽略不计在面对超高并发情况线程的频繁创建和销毁还是会带来巨大的开销如果线程一次性创建过多甚至会导致资源耗尽系统崩溃的情况如春运期间铁路12306的访问量非常大频繁创建和销毁线程的开销会变得十分严重。为了应对这种情况便引入了线程池技术 概念
线程池是一种线程管理机制它通过维护一个预先创建的线程集合线程池来避免频繁地创建和销毁线程所带来的性能开销。线程池管理一定数量的工作线程将任务提交给空闲线程来执行执行完成后线程会返回池中等待下一个任务。这种方式可以提高系统性能尤其在高并发场景下通过复用线程来减少线程创建和销毁的成本。
线程池通常管理着以下几个重要组件
① 线程池中的工作线程执行实际的任务
② 任务队列存放等待执行的任务
③ 核心线程数线程池中始终保持活跃的线程数量
④ 核心线程数线程池中始终保持活跃的线程数量
⑤ 线程池大小的动态调整根据负载和任务量线程池可能会动态增减线程数量 实现
下面介绍简易线程池的具体实现
1.初始化
在创建线程池时线程池会创建一定数量的线程这些线程将用来执行任务。线程池的初始化通过 PoolInit() 函数完成。
bool PoolInit() {pthread_t tid;for (int i 0; i _thread_max; i) {int ret pthread_create(tid, NULL, thr_start, this);if (ret ! 0) {cout create pool thread error endl;return false;}}return true;
}① PoolInit()初始化线程池创建指定数量的工作线程_thread_max。每个线程执行 thr_start() 函数。
② pthread_create()用来创建线程每个线程将运行 thr_start 函数。thr_start 会一直循环地从任务队列中取出任务并执行直到线程池退出。
2.任务队列
任务队列是线程池中的关键组件之一。它用于存储待处理的任务工作线程从队列中取出任务并执行。
queueThreadTask * _task_queue;这里使用了 std::queue 来实现任务队列。ThreadTask 是任务对象每个任务都包含一个待处理的数据和对应的处理函数当任务被推送到队列时工作线程会从队列中弹出并执行。
队列相关操作① PushTask将任务加入队列② PopTask从队列中弹出一个任务。
bool PushTask(ThreadTask *tt) {LockQueue();if (_tp_isquit) {UnLockQueue();return false;}_task_queue.push(tt);WakeUpOne();UnLockQueue();return true;
}bool PopTask(ThreadTask **tt) {*tt _task_queue.front();_task_queue.pop();return true;
}3.线程同步
线程池中的多个线程需要访问共享资源即任务队列因此需要使用 互斥锁 来保护共享资源的访问。条件变量 用于线程间的同步控制线程何时等待何时被唤醒。
pthread_mutex_t _lock;
pthread_cond_t _cond;① _lock互斥锁用来保护任务队列 _task_queue避免多个线程同时修改队列
② _cond条件变量用来通知线程池中的工作线程何时可以执行任务。
线程同步操作
① LockQueue 和 UnLockQueue用于加锁和解锁任务队列确保对任务队列的访问是线程安全的
② WakeUpOne 和 WakeUpAll分别唤醒一个等待的线程或所有等待的线程
③ ThreadWait让当前线程阻塞直到任务队列中有任务或线程池退出
void LockQueue() {pthread_mutex_lock(_lock);
}void UnLockQueue() {pthread_mutex_unlock(_lock);
}void WakeUpOne() {pthread_cond_signal(_cond);
}void WakeUpAll() {pthread_cond_broadcast(_cond);
}void ThreadWait() {if (_tp_isquit) {ThreadQuit();}pthread_cond_wait(_cond, _lock);
}4.工作线程
每个工作线程都执行 thr_start 函数该函数包含一个无限循环不断从任务队列中取出任务并执行。直到线程池退出。
static void *thr_start(void *arg) {ThreadPool *tp (ThreadPool*)arg;while (1) {tp-LockQueue();while (tp-IsEmpty()) {tp-ThreadWait();}ThreadTask *tt;tp-PopTask(tt);tp-UnLockQueue();tt-Run();delete tt;}return NULL;
}① 每个工作线程在 thr_start 中会一直等待任务
② 如果任务队列为空线程会通过 ThreadWait 阻塞直到有任务被推送到队列中
③ 任务一旦加入队列线程会被唤醒并执行任务
④ 行完任务后线程会回到等待状态直到线程池退出。
5.线程池退出
当线程池不再接受新任务并且所有任务执行完毕时线程池需要退出。线程池的退出通过 PoolQuit 函数来完成。
bool PoolQuit() {LockQueue();_tp_isquit true;UnLockQueue();while (_thread_cur 0) {WakeUpAll();usleep(1000);}return true;
}① PoolQuit标志 _tp_isquit 被设置为 true然后唤醒所有线程并让它们检查退出条件
② 工作线程在执行 ThreadWait 时会检查 _tp_isquit如果为 true则退出。
总结
1.线程池的初始化线程池通过 PoolInit 创建多个工作线程这些线程通过执行 thr_start 来不断地获取并处理任务。
2.任务队列任务队列是线程池的重要组成部分用于存储待处理的任务任务由主线程推送到队列工作线程从队列中取出并执行。
3.线程同步机制互斥锁和条件变量用于确保线程安全地访问任务队列并协调工作线程的等待和唤醒。
4.工作线程执行任务每个工作线程都从队列中获取任务并执行任务直到线程池退出。
5.线程池退出当线程池不再接收任务时调用 PoolQuit 函数退出线程池并通知所有工作线程退出。
完整代码如下
// threadpool.hpp
#ifndef __M_TP_H__
#define __M_TP_H__
#include iostream
using namespace std;
#include queue
#include pthread.h
#include stdlib.h
#include unistd.h#define MAX_THREAD 5
typedef bool (*hander_t)(int);
class ThreadTask
{
private:int _data;hander_t _handler;
public:ThreadTask():_data(-1), _handler(NULL){}ThreadTask(int data, hander_t handler){_data data;_handler handler;}void SetTask(int data, hander_t handler){_data data;_handler handler; }void Run(){_handler(_data);}
};class ThreadPool
{
private:int _thread_max;int _thread_cur;bool _tp_isquit;queueThreadTask * _task_queue;pthread_mutex_t _lock;pthread_cond_t _cond;
private:void LockQueue(){pthread_mutex_lock(_lock);}void UnLockQueue(){pthread_mutex_unlock(_lock);}void WakeUpOne(){pthread_cond_signal(_cond);}void WakeUpAll(){pthread_cond_broadcast(_cond);}void ThreadQuit(){_thread_cur--;UnLockQueue();pthread_exit(NULL);}void ThreadWait(){if(_tp_isquit){ThreadQuit();}pthread_cond_wait(_cond, _lock);}bool IsEmpty(){return _task_queue.empty();}static void *thr_start(void *arg){ThreadPool *tp (ThreadPool*)arg;while(1){tp-LockQueue();while(tp-IsEmpty()){tp-ThreadWait();}ThreadTask *tt;tp-PopTask(tt);tp-UnLockQueue();tt-Run();delete tt;}return NULL;}
public:ThreadPool(int maxMAX_THREAD):_thread_max(max), _thread_cur(max),_tp_isquit(false){pthread_mutex_init(_lock, NULL);pthread_cond_init(_cond, NULL);}~ThreadPool(){pthread_mutex_destroy(_lock);pthread_cond_destroy(_cond);}bool PoolInit(){pthread_t tid;for(int i 0; i _thread_max; i){int ret pthread_create(tid, NULL, thr_start, this);if(ret ! 0){cout create pool thread error endl;return false;}}return true;}bool PushTask(ThreadTask *tt){LockQueue();if(_tp_isquit){UnLockQueue();return false;}_task_queue.push(tt);WakeUpOne();UnLockQueue();return true;}bool PopTask(ThreadTask **tt){*tt _task_queue.front();_task_queue.pop();return true;}bool PoolQuit(){LockQueue();_tp_isquit true;UnLockQueue();while(_thread_cur 0){WakeUpAll();usleep(1000);}return true;}
};
#endif
// main.cpp
#include threadpool.hppbool handler(int data)
{srand(time(NULL));int n rand() % 5;printf(Thread: %p Run Tast: %d--sleep %d sec\n, pthread_self(), data, n);sleep(n);return true;
}int main()
{ThreadPool pool;pool.PoolInit();for(int i 0; i 10; i){ThreadTask *tt new ThreadTask(i, handler);pool.PushTask(tt);}pool.PoolQuit();return 0;
}
运行结果 以上就是【深入理解线程概念、操作、互斥与同步机制、线程池实现】的全部内容欢迎指正~
码文不易还请多多关注支持这是我持续创作的最大动力