生产消费者模式

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>

const int BUFFER_SIZE = 5;  // 缓冲区大小
const int TOTAL_ITEMS = 10; // 总生产项数

std::queue<int> buffer;             // 共享缓冲区
std::mutex mtx;                     // 互斥锁
std::condition_variable cond_prod;  // 生产者条件变量
std::condition_variable cond_cons;  // 消费者条件变量

// 生产者函数
void producer() {
    for (int i = 1; i <= TOTAL_ITEMS; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟生产耗时
        
        std::unique_lock<std::mutex> lock(mtx);
        
        // 等待缓冲区有空间
        cond_prod.wait(lock, [] { 
            return buffer.size() < BUFFER_SIZE; 
        });
        
        buffer.push(i);
        std::cout << "生产: " << i << " (缓冲区大小: " << buffer.size() << ")\n";
        
        lock.unlock();
        cond_cons.notify_one(); // 通知消费者
    }
}

// 消费者函数
void consumer() {
    for (int i = 0; i < TOTAL_ITEMS; ++i) {
        std::unique_lock<std::mutex> lock(mtx);
        
        // 等待缓冲区有数据
        cond_cons.wait(lock, [] { 
            return !buffer.empty(); 
        });
        
        int item = buffer.front();
        buffer.pop();
        std::cout << "消费: " << item << " (缓冲区大小: " << buffer.size() << ")\n";
        
        lock.unlock();
        cond_prod.notify_one(); // 通知生产者
        
        std::this_thread::sleep_for(std::chrono::milliseconds(150)); // 模拟消费耗时
    }
}

int main() {
    std::thread prod_thread(producer);
    std::thread cons_thread(consumer);

    prod_thread.join();
    cons_thread.join();

    std::cout << "所有任务完成!" << std::endl;
    return 0;
}

关键组件说明:

共享缓冲区:使用std::queue作为FIFO缓冲区

互斥锁:std::mutex保护共享资源的并发访问

条件变量:

cond_prod:生产者等待缓冲区有空间

cond_cons:消费者等待缓冲区有数据

工作流程:

生产者:生成数据 → 获取锁 → 检查缓冲区空间 → 缓冲区满则等待 → 放入数据 → 通知消费者

消费者:获取锁 → 检查缓冲区数据 → 缓冲区空则等待 → 取出数据 → 通知生产者

Table of Contents