精品网站建设比较好,短视频平台推广,建设网站的效果目的及其功能,cd-wordpress生产者消费者模型 生产者消费者模型生产者消费者模型的概念生产者消费者模型的特点生产者消费者模型优点 基于BlockingQueue的生产者消费者模型基于阻塞队列的生产者消费者模型模拟实现基于阻塞队列的生产消费模型 生产者消费者模型
生产者消费者模型的概念 生产者消费者模式就… 生产者消费者模型 生产者消费者模型生产者消费者模型的概念生产者消费者模型的特点生产者消费者模型优点 基于BlockingQueue的生产者消费者模型基于阻塞队列的生产者消费者模型模拟实现基于阻塞队列的生产消费模型 生产者消费者模型
生产者消费者模型的概念 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。 生产者和消费者彼此之间不直接通讯而通过阻塞队列来进行通讯所以生产者生产完数据之后不用等待消费者处理直接扔给阻塞队列消费者不找生产者要数据而是直接从阻塞队列里取阻塞队列就相当于一个缓冲区平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。 生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的一个经典场景其特点如下
三种关系生产者与生产者竞争与互斥关系、消费者与消费者竞争与互斥关系、生产者与消费者互斥与同步关系两种角色生产者与消费者一个交易场所通常指内存中的一段缓冲区。 生产者和生产者、消费者和消费者、生产者和消费者它们之间为什么会存在互斥关系 因为生产者与消费者之间的容器会被多个访问流进行访问所以我们就需要将该临界资源使用互斥锁保护起来防止线程安全问题的发生因此所有的生产者和消费者都会竞争式的申请锁生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。 生产者和消费者之间为什么会存在同步关系 如果生产者一直生产当空间被填满以后生产者就会停止生产消费者也是一样如果消费者一直消费空间中数据被消耗完了消费者也会停止消费。
虽然这样不会造成任何数据不一致的问题但是这样会引起另一方的饥饿问题是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性比如让生产者先生产然后再让消费者进行消费。
注意 互斥关系保证的是数据的正确性而同步关系是为了让多线程之间协同起来。
生产者消费者模型优点
解耦支持并发支持忙闲不均
如果我们在主函数中调用某一函数那么我们必须等该函数体执行完后才继续执行主函数的后续代码因此函数调用本质上是一种紧耦合。
对应到生产者消费者模型中函数传参实际上就是生产者生产的过程而执行函数体实际上就是消费者消费的过程但生产者只负责生产数据消费者只负责消费数据在消费者消费期间生产者可以同时进行生产因此生产者消费者模型本质是一种松耦合。
基于BlockingQueue的生产者消费者模型
基于阻塞队列的生产者消费者模型
在多线程编程中阻塞队列Blocking Queue是一种常用于实现生产者和消费者模型的数据结构。 阻塞队列的特点在于
当队列为空时从队列获取元素的操作将会被阻塞直到队列中放入了元素。当队列满时往队列里存放元素的操作会被阻塞直到有元素从队列中取出。
知识联系 看到以上阻塞队列的描述我们很容易想到的就是管道而阻塞队列最典型的应用场景实际上就是管道的实现。
模拟实现基于阻塞队列的生产消费模型
我们先以单生产者单消费者为例
其中的BlockQueue就是生产者消费者模型当中的交易场所我们可以用CSTL库当中的queue进行实现下面我们进行一个简单的封装
#pragma once#include iostream
#include pthread.h
#include queue
#include unistd.h
#include mutex#define NUM 5template class T
class BlockQueue
{
private:// 判断队列是否满了bool isQueueFull(){return _bq.size() _capacity;}// 判断队列是否为空bool isQueueEmpty(){return _bq.size() 0;}public:// 构造函数BlockQueue(int capacity NUM) : _capacity(capacity){pthread_mutex_init(_mtx, nullptr);pthread_cond_init(_full, nullptr);pthread_cond_init(_empty, nullptr);}// 向阻塞队列中插入数据(生产者)void push(const T in){// 加锁pthread_mutex_lock(_mtx);while (isQueueFull()){// 如果生产者生产过程中数据满了就阻塞等待pthread_cond_wait(_full, _mtx);}_bq.push(in);// 解锁pthread_mutex_unlock(_mtx);// 唤醒消费者pthread_cond_signal(_empty);}// 向阻塞队列中获取数据(消费者)void pop(T out){// 加锁pthread_mutex_lock(_mtx);while (isQueueEmpty()){// 如果消费者消费过程中数据空了就阻塞等待pthread_cond_wait(_empty, _mtx);}out _bq.front();_bq.pop();// 解锁pthread_mutex_unlock(_mtx);// 唤醒生产者pthread_cond_signal(_full);}// 析构函数~BlockQueue(){pthread_mutex_destroy(_mtx);pthread_cond_destroy(_full);pthread_cond_destroy(_empty);}private:// 阻塞队列std::queueT _bq;// 阻塞队列最大容器个数int _capacity;// 通过互斥锁来保证队列安全pthread_mutex_t _mtx;// 用来表示_bq是否为满的条件pthread_cond_t _full;// 用来表示_bq是否为空的条件pthread_cond_t _empty;
};上述代码中需要注意以下几点
我们实现的是单生产者与单消费者的生产者消费者模型所以我们不需要维护生产者与生产者消费者与消费者的关系只需要维护生产者与消费者之间的关系我们将BlockQueue中的参数模板化就不会局限于一种类型以后就可以很好的进行复用我们将阻塞队列最大容器个数设置为5表示阻塞队列中存在5个数据以后就不会在进行生生产了此时你生产者被阻塞由于生产者与消费者都会访问阻塞队列阻塞队列即为临界资源我们需要增加互斥锁来保证线程安全的问题生产者向阻塞队列中插入数据时如果阻塞队列满了生产者就会被阻塞进行等待直到消费者获取数据完成以后阻塞队列中存在空余空间唤醒生产者进行生产同理消费者获取数据时如果阻塞队列空了消费者就会被阻塞进行等待直到生产者生产数据完成以后唤醒消费者进行消费我们需要定义两个条件变量_full和_empty描述阻塞队列的状态进而才可以判断何时运行何时等待pthread_cond_wait除了会传入一个条件变量以外还会传入一个互斥锁我们会发现我们是在临界区中进行等待的我们此时还处于持有锁状态pthread_cond_wait第二个参数意义就在于成功调用wait之后传入的锁会被自动释放当被唤醒的时候就会自动获取线程锁
判断是否满足生产消费条件时不能用if而应该用while
pthread_cond_wait函数是让当前执行流进行等待的函数是函数就意味着有可能调用失败调用失败后该执行流就会继续往后执行。其次在多消费者的情况下当生产者生产了一个数据后如果使用pthread_cond_broadcast函数唤醒消费者就会一次性唤醒多个消费者但待消费的数据只有一个此时其他消费者就被伪唤醒了。为了避免出现上述情况我们就要让线程被唤醒后再次进行判断确认是否真的满足生产消费条件因此这里必须要用while进行判断。
在主函数中我们就只需要创建一个生产者线程和一个消费者线程让生产者线程不断生产数据让消费者线程不断消费数据
#include BlockQueue.hppvoid* consumer(void* args)
{BlockQueueint* bqueue (BlockQueueint*)args;while(true){int a;bqueue-pop(a);std::cout consumer: a std::endl;sleep(1);}return nullptr;
}void* productor(void* args)
{BlockQueueint* bqueue (BlockQueueint*)args;int a 1;while(true){bqueue-push(a);std::cout productor: a std::endl;a;sleep(1);}return nullptr;
}int main()
{pthread_t c, p;BlockQueueint* bq new BlockQueueint();//创建生产者消费者线程pthread_create(c, nullptr, consumer, bq);pthread_create(p, nullptr, productor, bq);pthread_join(c, nullptr);pthread_join(p, nullptr);}当生产者与消费者步调一致时我们会发现生产者生产一个数据消费者就会消费一个数据
当生产者生产的快消费者消费的慢时阻塞队列满了就会导致生产者阻塞等待只有当消费者被唤醒以后消费掉一个数据此时生产者才会被唤醒继续生产数据 当生产者生产的慢消费者消费的快因为最开始阻塞队列中并没有数据所以消费者就会阻塞等待当生产者生产一个数据以后消费者就会被唤醒消费一个数据然后生产者继续被唤醒生产数据消费者消费数据步调保持一致 当我们满足某一条件时再唤醒对应的生产者或消费者比如当阻塞队列当中存储的数据大于队列容量的一半时再唤醒消费者线程进行消费当阻塞队列当中存储的数据小于队列容器的一半时再唤醒生产者线程进行生产。
// 向阻塞队列中插入数据(生产者)
void push(const T in)
{// 加锁pthread_mutex_lock(_mtx);while (isQueueFull()){// 如果生产者生产过程中数据满了就阻塞等待pthread_cond_wait(_full, _mtx);}_bq.push(in);if (_bq.size() _capacity / 2)// 唤醒消费者pthread_cond_signal(_empty);// 解锁pthread_mutex_unlock(_mtx);
}// 向阻塞队列中获取数据(消费者)
void pop(T out)
{// 加锁pthread_mutex_lock(_mtx);while (isQueueEmpty()){// 如果消费者消费过程中数据空了就阻塞等待pthread_cond_wait(_empty, _mtx);}out _bq.front();_bq.pop();if (_bq.size() _capacity / 2)// 唤醒生产者pthread_cond_signal(_full);// 解锁pthread_mutex_unlock(_mtx);
}我们仍然让生产者生产的快消费者消费的慢。运行代码后生产者还是一瞬间将阻塞队列打满后进行等待但此时不是消费者消费一个数据就唤醒生产者线程而是当阻塞队列当中的数据小于队列容器的一半时才会唤醒生产者线程进行生产。 基于计算任务的生产者消费者模型 当然实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已我们这样做只是为了测试代码的正确性。
由于我们将BlockingQueue当中存储的数据进行了模板化此时就可以让BlockingQueue当中存储其他类型的数据。 例如我们想要实现一个基于计算任务的生产者消费者模型此时我们只需要定义一个Task类这个类当中需要包含一个func_t成员函数
#pragma once#include iostream
#include functionaltypedef std::functionint(int, int) func_t;class Task
{
public:Task(){}Task(int x, int y, func_t func) : _x(x), _y(y), _func(func){}~Task(){}int operator()(){return _func(_x, _y);}public:int _x;int _y;func_t _func;
};同时我们也可以将锁进行一个封装采用RAII形式的加锁解锁风格创建锁对象自动调用构造函数加锁除了作用域自动调用析构函数解锁。
#pragma once#include iostream
#include pthread.hclass Mutex
{
public:Mutex(pthread_mutex_t *mtx) : _mtx(mtx){}void lock(){std::cout 需要进行加锁 std::endl;pthread_mutex_lock(_mtx);}void unlock(){std::cout 需要进行解锁 std::endl;pthread_mutex_unlock(_mtx);}~Mutex(){}private:pthread_mutex_t *_mtx;
};class LockGuard
{
public:LockGuard(Mutex mtx) :_mtx(mtx){_mtx.lock();}~LockGuard(){_mtx.unlock();}
private:Mutex _mtx;
};此时我们的BlockQueue.hpp中插入和获取数据代码就可以优化为
// 向阻塞队列中插入数据(生产者)
void push(const T in)
{LockGuard lockguard(_mtx);while (isQueueFull()){// 如果生产者生产过程中数据满了就阻塞等待pthread_cond_wait(_full, _mtx);}_bq.push(in);// 唤醒消费者pthread_cond_signal(_empty);}// 向阻塞队列中获取数据(消费者)
void pop(T out)
{LockGuard lockguard(_mtx);while (isQueueEmpty()){// 如果消费者消费过程中数据空了就阻塞等待pthread_cond_wait(_empty, _mtx);}out _bq.front();_bq.pop();// 唤醒生产者pthread_cond_signal(_full);
}运行代码当生产者向阻塞队列中写入一个数据后随即消费者就会被唤醒获取数据也就是进行计算操作 同样我们也可以创建多个线程进行计算