嘉兴网站制作厂家,政务网站建设规划,国内搜索引擎大全,有人看片吗免费观看视频生产者和消费者概念基于BlockingQueue的生产者消费者模型全部代码 生产者和消费者概念 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。 生产者和消费者彼此之间不直接通讯#xff0c;而通过这个容器来通讯#xff0c;所以生产者生产完数据之后不用等待… 生产者和消费者概念基于BlockingQueue的生产者消费者模型全部代码 生产者和消费者概念 生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。 生产者和消费者彼此之间不直接通讯而通过这个容器来通讯所以生产者生产完数据之后不用等待消费者处理直接将生产的数据放到这个容器当中消费者也不用找生产者要数据而是直接从这个容器里取数据这个容器就相当于一个缓冲区平衡了生产者和消费者的处理能力这个容器实际上就是用来给生产者和消费者解耦的。 生产者消费者模型的特点
生产者消费者模型是多线程同步与互斥的一个经典场景其特点如下
三种关系 生产者和生产者互斥关系、消费者和消费者互斥关系、生产者和消费者互斥关系、同步关系。两种角色 生产者和消费者。通常由进程或线程承担一个交易场所 通常指的是内存中的一段缓冲区。可以自己通过某种方式组织起来 生产者和生产者、消费者和消费者、生产者和消费者它们之间为什么会存在互斥关系 因为在生产者和消费者之间存在多种执行流同时访问的问题因此我们需要将他们同时访问的临界区进行加互斥保护起来 其中所有的生产者和消费者都会竞争式的申请锁因此生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。 生产者和消费者之间为什么会存在同步关系 如果让生产者一直生产那么当生产者生产的数据将容器塞满后生产者再生产数据就会生产失败。反之让消费者一直消费那么当容器当中的数据被消费完后消费者再进行消费就会消费失败。
虽然这样不会造成任何数据不一致的问题但是这样会引起另一方的饥饿问题是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性比如让生产者先生产然后再让消费者进行消费。
注意 互斥关系保证的是数据的正确性而同步关系是为了让多线程之间协同起来。
基于BlockingQueue的生产者消费者模型
当多个生产者消费者同时出现进行抢占线程时我们可以使用BlockingQueue来进行缓冲如图 其与普通的队列的区别在于
当队列为空时从队列获取元素的操作将会被阻塞直到队列中放入了元素。当队列满时往队列里存放元素的操作会被阻塞直到有元素从队列中取出。
知识联系 看到以上阻塞队列的描述我们很容易想到的就是管道而阻塞队列最典型的应用场景实际上就是管道的实现。 put为生产者take为消费者
全部代码 task.hpp 用于实现打印和计算 #pragma once
#include iostream
#include stringclass Task
{
public:Task(){}Task(int x,int y,char op):_x(x),_y(y),_op(op),_result(0),_exitCode(0){}void operator()(){switch(_op){case :_result _x _y;break;case -:_result _x - _y;break;case *:_result _x * _y;break;case /:{if (_y 0)_exitCode -1;else_result _x / _y;}break;case %:{if (_y 0)_exitCode -2;else_result _x % _y;}break;default:break;}}std::string formatArg(){return std::to_string(_x) _op std::to_string(_y) ; }std::string formatRes(){return std::to_string(_result) ( std::to_string(_exitCode) );}~Task(){}
private:int _x;int _y;char _op;//输入的符号int _result;int _exitCode;
};blockQueue.hpp 维护线程之间的同步 #pragma once
#include iostream
#include queue
#include pthread.hconst int gcap 5;//最大容量
templateclass T
class BlockQueue
{
public:BlockQueue(const int cap gcap):_cap(gcap){pthread_mutex_init(_mutex,nullptr);//初始化互斥量//初始化用户和生产者的条件变量pthread_cond_init(_consumerCond,nullptr);pthread_cond_init(_consumerCond,nullptr);}//判断是否为慢bool isFull(){return _q.size() _cap;}//判断是否为空bool isEmpty(){return _q.empty();}//插入void push(const T in){pthread_mutex_lock(_mutex);//细节1一定要保证在任何时候都要符合条件才进行生产while(isFull()){//1 我们只能在临界区内部判断临界区资源是否就绪注定了我们在当前一定持有锁。//2 要让线程进行休眠等待不能持有锁等待//3 注定了pthread_cond_wait要有锁的释放能力pthread_cond_wait(_productCond,_mutex);// 4. 当线程醒来的时候注定了继续从临界区内部继续运行因为我是在临界区被切走的// 5. 注定了当线程被唤醒的时候继续在pthread_cond_wait函数出向后运行又要重新申请锁申请成功才会彻底返回}// 没有满就要让他继续运行_q.push(in);//加策略pthread_cond_signal(_consumerCond);pthread_mutex_unlock(_mutex);}//取出删除void pop(T* out){pthread_mutex_lock(_mutex);while(isEmpty()) {pthread_cond_wait(_consumerCond, _mutex);}*out _q.front();_q.pop();// 加策略pthread_cond_signal(_productCond);pthread_mutex_unlock(_mutex);}~BlockQueue(){pthread_mutex_destroy(_mutex);pthread_cond_destroy(_consumerCond);pthread_cond_destroy(_productCond);}
private:std::queueT _q;int _cap;//生产容量//为什么我们这份代码只有一个锁根本原因在于//我们生产者和消费者访问的是同一个queue queue 被当作整体使用pthread_mutex_t _mutex;pthread_cond_t _consumerCond;//消费者对应的条件变量pthread_cond_t _productCond;//生产者对应的条件变量
};main.cc #include iostream
#include task.hpp
#include blockQueue.hpp
#include pthread.h
#include ctime
#include unistd.hvoid* consumer(void* args)
{BlockQueueTask* bq static_castBlockQueueTask*(args);while(1){Task t;// 1. 将数据从blockqueue中获取 -- 获取到了数据bq-pop(t);t();// 2. 结合某种业务逻辑处理数据 -- TODOstd::cout pthread_self() | consumer data: t.formatArg() t.formatRes() std::endl;}
}void *productor(void *args)
{BlockQueueTask *bq static_castBlockQueueTask *(args);std::string opers -*/%;while(1){int x rand()%20 1;int y rand()%20 1;char op opers[rand() % opers.size()];Task t(x,y,op);bq-push(t);std::cout pthread_self() | productor Task: t.formatArg() ? std::endl;}
}int main()
{BlockQueueTask*bq new BlockQueueTask();pthread_t c[2], p[3];pthread_create(c[0], nullptr, consumer, bq);pthread_create(c[1], nullptr, consumer, bq);pthread_create(p[0], nullptr, productor, bq);pthread_create(p[1], nullptr, productor, bq);pthread_create(p[2], nullptr, productor, bq);pthread_join(c[0], nullptr);pthread_join(c[1], nullptr);pthread_join(p[0], nullptr);pthread_join(p[1], nullptr);pthread_join(p[2], nullptr);delete bq;return 0;return 0;
}