Linux Linux 042 生产者 - 消费者模型 小米里的大麦 2025-08-03 2025-08-03 生产者 - 消费者模型(CP 问题) 1. 生产者-消费者模型(CP 问题)是什么? 这是并发编程中最经典的问题之一,主要描述 两个线程/进程之间的数据交换协作问题 :
生产者 :不断生产数据,放入缓冲区(仓库、通道)。
消费者 :不断从缓冲区中取出数据进行处理。
但问题在于:
缓冲区有 容量限制 。
多线程并发会导致 竞争访问资源 。
所以需要设计好 同步机制 (比如互斥锁、条件变量、信号量等)保证:
生产者不能在缓冲区满的时候继续放;
消费者不能在缓冲区空的时候继续取;
多个线程操作共享资源不会冲突。
2. 什么是“解耦”?为什么要解耦? 1. 解耦的本质 解耦指的是将系统中的不同组件或模块之间的依赖关系降低,使它们能够独立地进行开发、修改和维护。在生产者 - 消费者模型中,解耦就是要让生产者和消费者之间的直接关联尽可能减少,各自可以独立地运行和变化,而不会因为一方的改变对另一方造成过大的影响。简单来说 解耦就是降低模块之间的依赖性,提高系统的可扩展性和灵活性 。
在 CP 问题中,供货商和消费者通过缓冲区(超市)进行 间接通信 ,实现了解耦。
2. 为什么要解耦?
不解耦的弊端 :供货商得知道哪个消费者要什么、什么时候要,耦合性太强,代码难维护。
解耦后的优势 :
可以任意增加/减少生产者或消费者数量(线程扩展方便)。
各自只关心“缓冲区的状态”,不需要处理对方的逻辑。
模块职责清晰,便于调试、优化和扩展。
3. 现实类比:供应链的“供货商 - 超市 - 消费者” 1. 现实类比
角色
类比
现实意义
供货商
生产者
批量生产商品的人
超市
缓冲区 Buffer / 仓库
商品流通的中转站,有货架/仓库
消费者
消费者
前来购物的顾客
2. “321 原则” 理解 CP 问题
1. “3 种关系”
生产者 vs 生产者(互斥) :不同的供货商之间存在竞争关系。例如,两家不同品牌的饮料供货商,他们都希望自己的产品能够在超市中占据更多的货架空间,获得更高的销量。为了实现这一目标,他们会在产品质量、价格、营销策略等方面展开竞争,这种竞争关系就是互斥的,因为在一定的市场份额下,一家供货商销量的增加往往意味着其他供货商销量的减少。
消费者 vs 消费者(互斥) :消费者之间也存在一定的互斥关系。比如在超市进行促销活动时,某些热门商品的数量有限,消费者之间就会为了抢购这些商品而产生竞争。例如,限量版的鞋子、热门的电子产品等,先到的消费者有更大的机会购买到,而后到的消费者可能就会错失购买机会。
生产者 vs 消费者(互斥、同步):
互斥 :供货商希望以较高的价格出售商品以获取更多利润,而消费者则希望以较低的价格购买到心仪的商品,双方在价格方面存在利益冲突,这是互斥的表现。
同步 :供货商需要根据消费者的需求来生产商品,如果生产的商品不符合消费者的需求,就会造成库存积压。而消费者的购买行为也会影响供货商的生产计划,例如当某种商品的销量大增时,供货商可能会增加该商品的生产。所以生产者和消费者之间需要保持一定的同步关系,以维持市场的供需平衡。
供货商与消费者 不需要彼此知道对方是谁 ,通过中间的缓冲区(超市)就能协作完成“商品流转” —— 这就是 解耦 。
2. “2 种角色”
角色
行为
生产者(供货商)
负责生成产品,放入缓冲区
消费者(顾客)
负责从缓冲区获取产品,进行消费
注意:
二者不直接通信,不依赖对方的状态;
只是“对缓冲区”的操作要同步协调。
3. “1 个交易场所”
缓冲区就像是“交易中转站”,相当于现实生活的“超市 ”。
1 个交易场所 :超市就是生产者(供货商)和消费者进行交易的场所。超市为供货商提供了销售渠道,将众多供货商的商品集中展示,方便消费者进行选购。
特点:
有容量上限:超市货架就这么大,不能无限放。
是中间角色:解耦了供货商和消费者之间的依赖。
4. 代码示例 —— 基于 BlockingQueue 的生产者消费者模型
1. BlockingQueue.hpp 阻塞队列类模板,提供线程安全的任务存储和同步机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 #pragma once #include <iostream> #include <queue> #include <pthread.h> using namespace std;template <class T >class BlockingQueue { private : queue<T> q_; int max_capacity; pthread_mutex_t mutex_; pthread_cond_t not_empty_; pthread_cond_t not_full_; public : static const int DEFAULT_MAX_CAPACITY = 10 ; BlockingQueue (int max_capacity = DEFAULT_MAX_CAPACITY) :max_capacity (max_capacity) { pthread_mutex_init (&mutex_, nullptr ); pthread_cond_init (¬_empty_, nullptr ); pthread_cond_init (¬_full_, nullptr ); } ~BlockingQueue () { pthread_mutex_destroy (&mutex_); pthread_cond_destroy (¬_empty_); pthread_cond_destroy (¬_full_); } void push (const T& item) { pthread_mutex_lock (&mutex_); while (q_.size () >= max_capacity) { pthread_cond_wait (¬_full_, &mutex_); } q_.push (item); pthread_cond_signal (¬_empty_); pthread_mutex_unlock (&mutex_); } T pop () { pthread_mutex_lock (&mutex_); while (q_.empty ()) { pthread_cond_wait (¬_empty_, &mutex_); } T item = q_.front (); q_.pop (); pthread_cond_signal (¬_full_); pthread_mutex_unlock (&mutex_); return item; } };
2. Task.hpp 任务类,封装数学运算任务的执行和结果处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 #pragma once #include <iostream> #include <string> using namespace std;enum { SUCCESS = 0 , DIV_ERROR = 1 , MOD_ERROR = 2 , }; class Task { private : int _x, _y; int _ret; char _op; int _code; public : Task (int x = 0 , int y = 0 , char op = '+' ) :_x(x), _y(y), _op(op), _ret(0 ), _code(SUCCESS) { } void run () { switch (_op) { case '+' : _ret = _x + _y; break ; case '-' : _ret = _x - _y; break ; case '*' : _ret = _x * _y; break ; case '/' : if (_y == 0 ) { _code = DIV_ERROR; _ret = 0 ; } else { _ret = _x / _y; } break ; case '%' : if (_y == 0 ) { _code = MOD_ERROR; _ret = 0 ; } else { _ret = _x % _y; } break ; default : _code = MOD_ERROR; _ret = 0 ; break ; } } void operator () () { run (); } string get_task () const { return to_string (_x) + _op + to_string (_y) + "= ???" ; } string get_ret () const { string ret = to_string (_x) + _op + to_string (_y) + "=" + to_string (_ret) + " [错误代码]:" + to_string (_code); return ret; } };
3. main.cpp 主程序,创建多个生产者线程生成随机任务,多个消费者线程处理任务并输出结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 #include "BlockingQueue.hpp" #include "Task.hpp" #include <unistd.h> #include <ctime> #include <sys/time.h> const string ops = "+-*/%" ; BlockingQueue<Task>* TaskQueue; pthread_mutex_t g_print_mutex = PTHREAD_MUTEX_INITIALIZER; long long get_timestamp () { struct timeval tv; gettimeofday (&tv, nullptr ); return tv.tv_sec * 1000 + tv.tv_usec / 1000 ; } void safe_print (const string& message) { pthread_mutex_lock (&g_print_mutex); cout << "[" << get_timestamp () << "] " << message << endl; pthread_mutex_unlock (&g_print_mutex); } void * consumer (void * arg) { int id = *(int *)arg; while (true ) { Task task = TaskQueue->pop (); task (); string s = "[消费者 " + to_string (id) + "] 完成了任务:" + task.get_task () + ",结果为:" + task.get_ret (); safe_print (s); usleep (500000 ); } pthread_exit (nullptr ); } void * producer (void * arg) { int id = *(int *)arg; srand (time (nullptr ) + id * 1000 ); while (true ) { int x = rand () % 50 + 1 ; int y = rand () % 50 + 1 ; char op = ops[rand () % ops.size ()]; Task task (x, y, op) ; string s = "[生产者 " + to_string (id) + "] 生产了任务:" + task.get_task (); safe_print (s); TaskQueue->push (task); usleep (1000000 ); } pthread_exit (nullptr ); } int main () { TaskQueue = new BlockingQueue <Task>(5 ); int ids[5 ] = { 1 , 2 , 3 , 4 , 5 }; cout << "创建 2 个消费者线程" << endl; pthread_t Consumers[2 ]; for (int i = 0 ; i < 2 ; i++) { pthread_create (&Consumers[i], nullptr , consumer, &ids[i]); } cout << "创建 3 个生产者线程" << endl; pthread_t Producers[3 ]; for (int i = 0 ; i < 3 ; i++) { pthread_create (&Producers[i], nullptr , producer, &ids[i+2 ]); } for (int i = 0 ; i < 2 ; i++) { pthread_join (Consumers[i], nullptr ); } for (int i = 0 ; i < 3 ; i++) { pthread_join (Producers[i], nullptr ); } delete TaskQueue; return 0 ; }
5. POSIX 信号量 1. 什么是 POSIX 信号量?有什么用? POSIX 信号量和 SystemV 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但 POSIX 可以用于线程间同步,属于 POSIX 标准的一部分(定义在 <semaphore.h>
头文件中),主要用于控制对共享资源的访问,避免竞争条件,实现互斥与同步。
2. POSIX 信号量函数 同样的,这部分函数和之前的 pthread_mutex_init
系列函数十分相似,这里就不详细讲解了,类比使用即可。注意头文件是 <semaphore.h>
。
作用
函数原型
参数说明
返回值
初始化 信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
sem
:信号量指针 pshared
:0 表示用于线程间共享;非 0 表示进程间共享value
:初始值,表示资源初始数量
0 表示成功,非 0 表示失败(可用 errno
获取错误)
销毁 信号量
int sem_destroy(sem_t *sem);
sem
:信号量指针
0 成功,非 0 失败
等待 (P 操作)
int sem_wait(sem_t *sem);
sem
:信号量指,如果 sem > 0
,则减一并继续执行;值为 0,则阻塞等待
0 成功,-1 失败
非阻塞等待
int sem_trywait(sem_t *sem);
sem
:信号量指针 如果资源不足,不会阻塞,而是立即返回错误(适合用于非阻塞检测场景)
0 成功,-1 失败
发布 (增加)信号量(V 操作)
int sem_post(sem_t *sem);
sem
:信号量指针 将信号量值加 1,如果有等待线程,将唤醒其中一个
0 成功,-1 失败
这里把信号量的工作流程(P/V 操作)单拎出来进行强调:
操作
含义
行为描述
sem_wait()
P 操作 (wait)
当前线程尝试获取资源: 若信号量值 > 0,获取成功(减 1) 若 = 0,则阻塞等待
sem_post()
V 操作 (signal)
当前线程释放资源: 信号量值 +1,若有等待线程,唤醒一个
3. 代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 #include <iostream> #include <pthread.h> #include <semaphore.h> #include <unistd.h> using namespace std;sem_t sem;void * worker (void * arg) { int id = *(int *)arg; sem_wait (&sem); cout << "Thread:" << id << " 获取到了信号量" << endl; sleep (1 ); sem_post (&sem); cout << "Thread:" << id << " 释放了信号量" << endl; return nullptr ; } int main () { sem_init (&sem, 0 , 3 ); pthread_t threads[10 ]; int ids[10 ]; for (int i = 0 ; i < 10 ; i++) { ids[i] = i; pthread_create (&threads[i], nullptr , worker, &ids[i]); } for (int i = 0 ; i < 10 ; i++) { pthread_join (threads[i], nullptr ); } sem_destroy (&sem); return 0 ; }
6. 基于环形队列的生产消费模型 1. 什么是环形队列生产消费模型? 基于环形队列的生产者-消费者模型 也是一种常见的并发编程设计,其核心是通过环形队列(循环队列)作为缓冲区,协调 “生产者” 和 “消费者” 两个角色的工作:生产者负责生成数据并放入队列,消费者负责从队列中取出数据并处理。从而实现生产者与消费者之间的数据传递和解耦。需要注意的是双方互不直接通信,而是通过队列进行 异步协作 。
2. 特点
环形队列是一个 固定容量、首尾相接的循环缓冲区 。
使用两个指针:
head
/ front
→ 消费者读取位置。
tail
/ rear
→ 生产者写入位置。
每次写或读都会环形移动(取模)。
3. 环形队列中判空 / 判满的处理(重点) 由于 front
和 rear
都在循环移动,当队列为空或为满时,都会出现 front == rear
的情况 ,因此环形队列最关键的难点就是 如何区分“队列空”与“队列满” 。
4. 常见解决方案
方法
原理
判空条件
判满条件
优缺点
1. 多空一法(保留一个空位,即浪费一个存储单元
规定队列容量为 N
时,最多存 N-1
个元素。
head == tail
(tail + 1) % size == head
✅ 简单高效,逻辑清晰,边界状态安全 ❌ 实际可用容量为 size - 1
,浪费一个单元空间
2. 引入计数器 count
使用一个 int count
变量单独记录当前队列中元素数量。
count == 0
count == size
✅ 空间完全利用 ❌ 需要额外同步 count
变量,线程并发时处理更复杂
3. 标志(记)位法(flag)
设置一个标志位 bool full
来判断当前状态。
head == tail && !full
head == tail && full
✅ 读写位置逻辑简洁清晰 ❌ 实现复杂,易出错
5. 代码示例 1. RingQueue.hpp 基于信号量和互斥锁实现的线程安全环形队列模板类,使用两个信号量分别控制数据资源和空间资源,两个互斥锁保护生产者和消费者的并发访问位置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 #pragma once #include <iostream> #include <vector> #include <pthread.h> #include <semaphore.h> using namespace std;const static int default_capacity = 5 ; template <class T >class RingQueue { private : void P (sem_t & sem) { sem_wait (&sem); } void V (sem_t & sem) { sem_post (&sem); } void Lock (pthread_mutex_t & mutex) { pthread_mutex_lock (&mutex); } void Unlock (pthread_mutex_t & mutex) { pthread_mutex_unlock (&mutex); } public : RingQueue (int capacity = default_capacity) :ringqueue_ (capacity), capacity_ (capacity), c_index_ (0 ), p_index_ (0 ) { sem_init (&cdata_sem_, 0 , 0 ); sem_init (&pspace_sem_, 0 , capacity_); pthread_mutex_init (&c_mutex_, nullptr ); pthread_mutex_init (&p_mutex_, nullptr ); } ~RingQueue () { sem_destroy (&cdata_sem_); sem_destroy (&pspace_sem_); pthread_mutex_destroy (&c_mutex_); pthread_mutex_destroy (&p_mutex_); } void Push (const T& in) { P (pspace_sem_); Lock (p_mutex_); ringqueue_[p_index_] = in; p_index_++; p_index_ %= capacity_; Unlock (p_mutex_); V (cdata_sem_); } void Pop (T* out) { P (cdata_sem_); Lock (c_mutex_); *out = ringqueue_[c_index_]; c_index_++; c_index_ %= capacity_; Unlock (c_mutex_); V (pspace_sem_); } private : vector<T> ringqueue_; int capacity_; int c_index_; int p_index_; sem_t cdata_sem_; sem_t pspace_sem_; pthread_mutex_t c_mutex_; pthread_mutex_t p_mutex_; };
2. Task.hpp 封装数学运算任务的类,支持加减乘除取模五种运算,包含错误处理机制,提供任务执行和结果获取功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 #pragma once #include <iostream> #include <string> using namespace std;const string opers = "+-*/%" ; enum { SUCCESS = 0 , DIV_ERROR = 1 , MOD_ERROR = 2 , UNKNOWN_ERROR = 3 }; class Task { public : Task () :_x(0 ), _y(0 ), _op('+' ), _ret(0 ), _code(SUCCESS) {} Task (int x, int y, char op = '+' ) :_x(x), _y(y), _op(op), _ret(0 ), _code(SUCCESS) { if (op != '+' && op != '-' && op != '*' && op != '/' && op != '%' ) { _op = '+' ; _code = UNKNOWN_ERROR; } } void run () { _ret = 0 ; _code = SUCCESS; switch (_op) { case '+' : _ret = _x + _y; break ; case '-' : _ret = _x - _y; break ; case '*' : _ret = _x * _y; break ; case '/' : if (_y == 0 ) { _code = DIV_ERROR; _ret = 0 ; } else { _ret = _x / _y; } break ; case '%' : if (_y == 0 ) { _code = MOD_ERROR; _ret = 0 ; } else { _ret = _x % _y; } break ; default : _code = UNKNOWN_ERROR; _ret = 0 ; break ; } } void operator () () { run (); } string get_task () const { return to_string (_x) + _op + to_string (_y) + "= ???" ; } string get_ret () const { string ret = to_string (_x) + _op + to_string (_y) + "=" + to_string (_ret) + " [错误代码:" + to_string (_code) + "]" ; return ret; } char get_operator () const { return _op; } int get_first_operand () const { return _x; } int get_second_operand () const { return _y; } int get_result () const { return _ret; } int get_error_code () const { return _code; } ~Task () { } private : int _x, _y; int _ret; char _op; int _code; };
3. Main.cc 创建多个生产者和消费者线程,生产者生成随机运算任务放入环形队列,消费者从队列取出任务并执行计算,演示多线程协作的生产消费模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 #include <iostream> #include <unistd.h> #include <ctime> #include "RingQueue.hpp" #include "Task.hpp" using namespace std;struct ThreadData { RingQueue<Task> *rq; string threadname; }; void *Productor (void *args) { ThreadData *td = static_cast <ThreadData*>(args); RingQueue<Task> *rq = td->rq; string name = td->threadname; int len = opers.size (); while (true ) { int data1 = rand () % 10 + 1 ; usleep (10 ); int data2 = rand () % 10 ; char op = opers[rand () % len]; Task t (data1, data2, op) ; rq->Push (t); cout << name << " 生产任务:" << t.get_task () << endl; sleep (1 ); } return nullptr ; } void *Consumer (void *args) { ThreadData *td = static_cast <ThreadData*>(args); RingQueue<Task> *rq = td->rq; string name = td->threadname; while (true ) { Task t; rq->Pop (&t); t (); cout << name << " 消费任务:" << t.get_task () << " 结果:" << t.get_ret () << endl; } return nullptr ; } int main () { cout << "=== 环形队列生产者-消费者模型 ===" << endl; srand (time (nullptr ) ^ getpid ()); RingQueue<Task> *rq = new RingQueue <Task>(10 ); pthread_t consumers[3 ], producers[2 ]; cout << "创建 2 个生产者线程..." << endl; for (int i = 0 ; i < 2 ; i++) { ThreadData *td = new ThreadData (); td->rq = rq; td->threadname = "[生产者-" + to_string (i) + "]" ; pthread_create (&producers[i], nullptr , Productor, td); } cout << "创建 3 个消费者线程..." << endl; for (int i = 0 ; i < 3 ; i++) { ThreadData *td = new ThreadData (); td->rq = rq; td->threadname = "[消费者-" + to_string (i) + "]" ; pthread_create (&consumers[i], nullptr , Consumer, td); } for (int i = 0 ; i < 2 ; i++) { pthread_join (producers[i], nullptr ); } for (int i = 0 ; i < 3 ; i++) { pthread_join (consumers[i], nullptr ); } delete rq; return 0 ; }