1. 什么是生产消费者模型
生产者-消费者模型(producer-consumer model) 是多线程编程中的经典并发控制模型,主要用于协调多个线程之间的数据访问,防止竞争条件(race condition)和资源浪费,提高程序的并发能力。 抛开概念,我们用生活中的例子来举例——超市就是最好的例子。 超市充当着生产商和消费者的中间资源。 超市从生产商进货,生产商需要向超市提供货物,消费者在超市购物,超市需要向消费者提供商品。 如此一来,超市就成立生产者和消费者之间的桥梁。消费者就和生产者有一定的隔离,解决了生产者与消费者之间的强耦合。 得益于超市做缓冲区,整个生产消费的过程十分的高效,即便消费者没有在超市找到想要的商品,也可以借助超市向生产者进行反馈,从而做到生产对应的商品,也就是允许生产消费步调不一致。 生产消费者模型的本质就是:忙闲不均. 同时我们要知道超市不可能只面向单一的生产消费者,无论是哪一个,超市都会面向多个。也就是说,超市会被多个生产者消费者看到。 那么生产者、消费者间排列组合有什么关系呢? 生产者与生产者 还是以超市为例,多个生产者间存在互斥的关系,每个生产者都希望自己的产品能更多的出现到超市中,可以超市的空间始终是有限的,一个产品多了势必会影响到另一份的产品。比如可口与百事,统一与康师傅。 由此再一次得出结论:生产者与生产者存在互斥关系 消费者与消费者 假设超市准备打烊了,此时进来了两名顾客,他们看上了同一件商品,但是该商品以及卖的只省一件了,那么两名顾客必然就存在竞争的关系,也就是互斥。 由此得出:消费者与消费者间存在互斥关系. 生产者与消费者 依旧是同一个超市,某天顾客a进入超市打算购买商品a,但是此时的商品a已经卖完了,于是呢顾客a就去通知超市去备货,然后顾客a就离开了,走了一段路后,顾客a心想一定要买到商品a,于是又返回超市询问老板,商品a到了没有,可是还没有到。于是一整天的时间顾客a就在超市间往返,超市老板看他这么辛苦便要了顾客a的联系方式,声称一有货就通知他,于是顾客便回家安心等待了。 这个故事告诉我们什么呢? 超市老板添加顾客a的联系方式是为了将商品信息同步给生产者,这表明了生产者与消费者之间存在同步关系.同时,在超市的备货期间顾客是不能进行消费的,这表明了生产者与消费者之间存在互斥关系 由此得出:生产者与消费者间存在同步、互斥关系.
1.1 生产消费者模型的特点生产消费者模型的最根本特点——321原则 3种关系:
生产者与生产者:互斥消费者与消费者:互斥生产者与消费者:互斥与同步 2种角色:生产者消费者 1个交易场所通常是一个特点的缓冲区(阻塞队列、环形队列) 生产者与消费者间的同步关系:生产者不断地生产,交易场所堆满商品后,需要通知消费者进行消费。消费者不断地消费,教育场所为空时,需要通知生产者进行生产。1.2 生产消费者模型的优点提高并发性能 生产者和消费者可以独立运行,提升系统吞吐量。解耦数据生成和处理 生产者不需要关心消费者如何处理数据,消费者也不需要关心数据如何生成。适用于缓冲处理 适用于消息队列、日志处理、任务分发等场景。1.3 生产消费者的核心该模型通常包括三个核心组件:
生产者(Producer) 负责生产数据(任务、消息等)。产生的数据通常会放入共享缓冲区。消费者(Consumer) 负责从共享缓冲区中取出数据进行处理。处理完毕后,消费者可以继续等待新的数据到来。共享缓冲区(Buffer) 生产者与消费者共享的数据存储区域。需要同步机制(如互斥锁、条件变量)来保护数据一致性。2. 基于堵塞队列实现生产消费者模型阻塞队列Blocking Queue是一种特殊的队列,在具有先进先出的基础上,还拥有队列大小的固定的这一特点。 堵塞队列可以为满,也可以为空。
当堵塞队列为满时:无法入队->无法生产(堵塞)当堵塞队列为空时:无法出队->无法消费(阻塞)首先我们肯定要创建一个堵塞队列的类。 那么这个堵塞队列应该具有什么样的属性呢? 从功能上出发,我们的堵塞队列需要完成的任务有:生产者向堵塞队列中入队,消费者从堵塞队列中出队,也就是说我们的类必须具有插入Push和删除Pop操作。同时因为阻塞队列是定长的,我能需要判断阻塞队列是否为空为满,那么IsFull和IsEmpty操作也是必要的。 了解我方法后,类的属性其实也出来了。因为阻塞队列是一个队列,我们可以调用STL的queue来充当属性之一,除此之外还有定长的属性我还需要定义一个变量cap来确定队列的长度。最后因为阻塞队列是一个公共资源,我们需要加锁mutex也是必要的,同时我们因为我们在队列中有数据时需要通知消费者在队列中没有数据时选哟通知生产者,也就表明我们还需要两个条件变量cond。 分析完后,阻塞队列具有的属性就包括了以下内容: 方法:
isFull() 判断阻塞队列是否为满。isEmpty() 判断阻塞队列是否为空。push() 将数据入队。pop() 将数据出队。 属性:queue 队列本体int 阻塞队列长度pthread_mutex_t 互斥锁pthread_cond_t 同步消费者pthread_cond_t 同步生产者2.2 实现单生产消费模型阻塞队列BlockQueue.hpp
代码语言:javascript代码运行次数:0运行复制#pragma once#include <mutex>#include <pthread.h>#include <unistd.h>#include <queue>#define NUM 5using namespace std;namespace yui{ template<class T> class BlockQueue{ private: bool isFull(){ return _blockQueue.size() == _cap; } bool isEmpty(){ return _blockQueue.empty(); } public: BlockQueue(int cap = NUM):_cap(cap) { pthread_mutex_init(&_mutex,nullptr); pthread_cond_init(&_producer,nullptr); pthread_cond_init(&_consumer,nullptr); } void push(const T& inData){ //访问公共资,加锁 pthread_mutex_lock(&_mutex); //开始判断阻塞队列是否为满 while(isFull()){ //如果队列为满,生产者就必须开始等待 // pthread_cond_signal(&_consumer); pthread_cond_wait(&_producer,&_mutex); } //运行到这里队列肯定没满,将数据入队,通知消费者消费。 _blockQueue.push(inData); pthread_cond_signal(&_consumer); pthread_mutex_unlock(&_mutex);//解锁 } void pop(T* outData){ //加锁 pthread_mutex_lock(&_mutex); while(isEmpty()){ // pthread_cond_signal(&_producer); pthread_cond_wait(&_consumer,&_mutex); } *outData = _blockQueue.front(); _blockQueue.pop(); pthread_cond_signal(&_producer); pthread_mutex_unlock(&_mutex); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_producer); pthread_cond_destroy(&_consumer); } private: queue<T> _blockQueue; // 阻塞队列内核 int _cap; // 队列长度 pthread_mutex_t _mutex; //互斥锁 pthread_cond_t _producer; pthread_cond_t _consumer; };}
main.cc
代码语言:javascript代码运行次数:0运行复制#include <iostream>#include <cstdlib>#include <ctime>#include "BlockQueue.hpp"using namespace std;void *runProducer(void *arg){ yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg); while (true) { //sleep(1); int num = rand() % 100; bq->push(num); cout << "生产者生产了一个数据:" << num << endl; cout << "--------------------------" << endl; } pthread_exit((void *)0); // 退出线程}void *runConsumer(void *arg){ yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg); while (true) { // sleep(1); int num = 0; bq->pop(&num); cout << "消费者消费了一个数据:" << num << endl; cout << "-------------------------" << endl; } pthread_exit((void *)0); // 退出线程}int main(){ srand((unsigned int)time(nullptr)); yui::BlockQueue<int> *bq = new yui::BlockQueue<int>(5); // 创建阻塞队列 // 创建生产消费者线程 pthread_t producer; pthread_t consumer; if (pthread_create(&producer, nullptr, runProducer, bq) < 0) { perror("线程创建失败"); return 1; } if (pthread_create(&consumer, nullptr, runConsumer, bq) < 0) { perror("线程创建失败"); return 1; } if(pthread_join(producer,nullptr)<0){ perror("线程回收失败!"); return 1; } if(pthread_join(consumer,nullptr)<0){ perror("线程回收失败!"); return 1; } delete bq; return 0;}
运行结果:
通过上面的运行结果,我们可以看到生产者疯狂地生产,消费者疯狂地消费。这样地结果不方便我们观察堵塞队列地特点,为此我们可以通过休眠的方式来仔细观察。 方法1:消费者每隔一秒消费一次,生产者疯狂生产 修改代码:
代码语言:javascript代码运行次数:0运行复制void *runProducer(void *arg){ yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg); while (true) { //sleep(1); int num = rand() % 100; bq->push(num); cout << "生产者生产了一个数据:" << num << endl; cout << "--------------------------" << endl; } pthread_exit((void *)0); // 退出线程}void *runConsumer(void *arg){ yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg); while (true) { sleep(1); int num = 0; bq->pop(&num); cout << "消费者消费了一个数据:" << num << endl; cout << "-------------------------" << endl; } pthread_exit((void *)0); // 退出线程}
运行结果:
阻塞队列大小是5,由此生产者生产5次后,就进入堵塞状态了,之后生产者与消费者步调一致,消费者每消费一个生产者就生产一个。 注意:此时消费的数据,是阻塞队列中队头的数据,也就是最先生产的数据。 策略2:生产者每隔一秒生产一次,消费者疯狂消费 预期结果为 刚开始阻塞队列为空,消费者无法进行消费,只能阻塞等待,一秒后,生产者生产了一个数据,并立即通知消费者进行消费,两者协同工作,消费者消费的就是生产者刚刚生产的数据 修改代码:
代码语言:javascript代码运行次数:0运行复制void *runProducer(void *arg){ yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg); while (true) { sleep(1); int num = rand() % 100; bq->push(num); cout << "生产者生产了一个数据:" << num << endl; cout << "--------------------------" << endl; } pthread_exit((void *)0); // 退出线程}void *runConsumer(void *arg){ yui::BlockQueue<int> *bq = static_cast<yui::BlockQueue<int> *>(arg); while (true) { // sleep(1); int num = 0; bq->pop(&num); cout << "消费者消费了一个数据:" << num << endl; cout << "-------------------------" << endl; } pthread_exit((void *)0); // 退出线程}
符合预期。
2.3 多生产多消费模型堵塞队列关于多生产消费模型,我们需要对上面的代码进行大改吗? 答案是完全不需要,我们刚刚写的代码就可以满足多生产消费的情境。 可能有些读者会认为需要使用到pthread_cond_broadcast来唤醒所有线程。其实是不必的,假设在只生产了一个数据的情况下,唤醒所有的线程,会导致只有一个线程进行合法操作,其他线程都是非法操作了。
代码语言:javascript代码运行次数:0运行复制int main(){ srand((unsigned int)time(nullptr)); yui::BlockQueue<int> *bq = new yui::BlockQueue<int>(5); // 创建阻塞队列 // 创建生产消费者线程 pthread_t pro[2]; pthread_t con[3]; for(int i = 0; i < 2; i++) pthread_create(pro + i, nullptr, runProducer, bq); for(int i = 0; i < 3; i++) pthread_create(con + i, nullptr, runConsumer, bq); for(int i = 0; i < 2; i++) pthread_join(pro[i], nullptr); for(int i = 0; i < 3; i++) pthread_join(con[i], nullptr); delete bq; return 0;}
Task.hpp
代码语言:javascript代码运行次数:0运行复制#pragma onceenum{ ZERODIVERROR = 1, CHARERROR};template<class T>class Task{ public: Task(T a,T b,char ch):_a(a),_b(b),_ch(ch) {} Task(){} ~Task() {} int calculator(T& ret){ int flag = 0; switch (_ch) { case '+': ret = _a+_b; break; case '-': ret = _a-_b; break; case '*': ret = _a*_b; break; case '/': if(_b != 0) ret = _a/_b; else flag = ZERODIVERROR; break; default: flag = CHARERROR; break; } return flag; } public: T _a; T _b; char _ch;};
BlockQueue.hpp
代码语言:javascript代码运行次数:0运行复制#pragma once#include <mutex>#include <pthread.h>#include <unistd.h>#include <queue>#define NUM 5using namespace std;namespace yui{ template<class T> class BlockQueue{ private: bool isFull(){ return _blockQueue.size() == _cap; } bool isEmpty(){ return _blockQueue.empty(); } public: BlockQueue(int cap = NUM):_cap(cap) { pthread_mutex_init(&_mutex,nullptr); pthread_cond_init(&_producer,nullptr); pthread_cond_init(&_consumer,nullptr); } void push(const T& inData){ //访问公共资,加锁 pthread_mutex_lock(&_mutex); //开始判断阻塞队列是否为满 while(isFull()){ //如果队列为满,生产者就必须开始等待 // pthread_cond_signal(&_consumer); pthread_cond_wait(&_producer,&_mutex); } //运行到这里队列肯定没满,将数据入队,通知消费者消费。 _blockQueue.push(inData); pthread_cond_signal(&_consumer); pthread_mutex_unlock(&_mutex);//解锁 } void pop(T* outData){ //加锁 pthread_mutex_lock(&_mutex); while(isEmpty()){ // pthread_cond_signal(&_producer); pthread_cond_wait(&_consumer,&_mutex); } *outData = _blockQueue.front(); _blockQueue.pop(); pthread_cond_signal(&_producer); pthread_mutex_unlock(&_mutex); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_producer); pthread_cond_destroy(&_consumer); } private: queue<T> _blockQueue; // 阻塞队列内核 int _cap; // 队列长度 pthread_mutex_t _mutex; //互斥锁 pthread_cond_t _producer; pthread_cond_t _consumer; };}
main.cc
代码语言:javascript代码运行次数:0运行复制#include <iostream>#include <cstdlib>#include <ctime>#include "BlockQueue.hpp"#include "task.hpp"using namespace std;char chs[4] = {'+', '-', '*', '/'};void *runProducer(void *arg){ yui::BlockQueue<Task<int>> *bq = static_cast<yui::BlockQueue<Task<int>> *>(arg); while (true) { sleep(1); int a = rand() % 100; int b = rand() % 100; char c = chs[rand() % 4]; bq->push(Task<int>(a, b, c)); cout << "生产者生产了一个问题:" << a << c << b << "= ?" << endl; cout << "--------------------------" << endl; } pthread_exit((void *)0); // 退出线程}void *runConsumer(void *arg){ yui::BlockQueue<Task<int>> *bq = static_cast<yui::BlockQueue<Task<int>> *>(arg); while (true) { // sleep(1); Task<int> *tmp = new Task<int>; bq->pop(tmp); int ret = 0; if (tmp->calculator(ret) == 0) { cout << "消费者解决问题得:" << tmp->_a << tmp->_ch << tmp->_b << " = " << ret << endl; cout << "--------------------------" << endl; } else if (tmp->calculator(ret) == ZERODIVERROR) { cout << "问题存在错误:" << tmp->_a << tmp->_ch << tmp->_b << " = ?" << "存在除0错误" << endl; cout << "--------------------------" << endl; } else if (tmp->calculator(ret) == CHARERROR) { cout << "问题存在错误:" << tmp->_a << tmp->_ch << tmp->_b << " = ?" << "存在未定义字符:"<<tmp->_ch<< endl; cout << "--------------------------" << endl; } } pthread_exit((void *)0); // 退出线程}int main(){ srand((unsigned int)time(nullptr)); yui::BlockQueue<Task<int>> *bq = new yui::BlockQueue<Task<int>>(5); // 创建阻塞队列 // 创建生产消费者线程 pthread_t producer; pthread_t consumer; if (pthread_create(&producer, nullptr, runProducer, bq) < 0) { perror("线程创建失败"); return 1; } if (pthread_create(&consumer, nullptr, runConsumer, bq) < 0) { perror("线程创建失败"); return 1; } if (pthread_join(producer, nullptr) < 0) { perror("线程回收失败!"); return 1; } if (pthread_join(consumer, nullptr) < 0) { perror("线程回收失败!"); return 1; } delete bq; return 0;}
这只是生产消费者模型的一个非常简单的应用,在以后的编程中,你可以把Task类中的任务写成其他,比如网络请求、并行IO等等。
3. 结语希望本文对你理解生产消费模型有所帮助 往期Linux文章:linux 往期python文章:python
以上就是【Linux】:多线程中的生产消费者模型的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号