基于阻塞队列及环形队列的生产消费模型

news/2024/10/7 22:27:23 标签: linux, 服务器, 算法

目录

条件变量函数

等待条件满足

阻塞队列

升级版

信号量

POSIX信号量

环形队列


条件变量函数

等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 
 参数: 
 cond:要在这个条件变量上等待 
 mutex:互斥量,后面详细解释 

pthread_cond_wait:第二个参数必须是正在使用的互斥锁

a.pthread_cond_wait:该函数调用时,会以原子性的方式将锁释放,并将自己挂起

b.pthread_cond_wait:该函数被唤醒返回的时候,会自动从新获取锁

阻塞队列

blockqueue.hpp

#pragma once
#include<iostream>
#include<string>
#include<queue>
#include<unistd.h>
#include<pthread.h>

using namespace std;
static const int gmaxcap=5;
template<class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap=gmaxcap)
        :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }
    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        while (is_full())
            pthread_cond_wait(&_pcond,&_mutex);//生产条件不满足
        _q.push(in);
        //阻塞队列中一定有数据
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }
    void pop(T* out)
    {
        pthread_mutex_lock(&_mutex);
        while(is_empty)
            pthread_cond_wait(&_ccond,&_mutex);
        *out=_q.front();
        _q.pop();
        //队列中一定有一个空位置
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size()==_maxcap;
    }
private:
    queue<T> _q;
    int _maxcap;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;//生产者对应的条件变量
    pthread_cond_t _ccond;
};


task.hpp

#pragma once
#include<iostream>
#include<cstdio>
#include<string>
#include<functional>

using namespace std;
class Task
{
    using func_t=function<int(int,int,char)>;
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func)
        :_x(x),_y(y),_op(op),_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d ",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ? ",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};

升级版

blockqueue.hpp

#pragma once
#include<iostream>
#include<string>
#include<queue>
#include<unistd.h>
#include<pthread.h>

using namespace std;
static const int gmaxcap=5;
template<class T>
class BlockQueue
{
public:
    BlockQueue(const int& maxcap=gmaxcap)
        :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }
    void push(const T& in)
    {
        pthread_mutex_lock(&_mutex);
        while (is_full())
            pthread_cond_wait(&_pcond,&_mutex);//生产条件不满足
        _q.push(in);
        //阻塞队列中一定有数据
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }
    void pop(T* out)
    {
        pthread_mutex_lock(&_mutex);
        while(is_empty)
            pthread_cond_wait(&_ccond,&_mutex);
        *out=_q.front();
        _q.pop();
        //队列中一定有一个空位置
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size()==_maxcap;
    }
private:
    queue<T> _q;
    int _maxcap;
    pthread_mutex_t _mutex;
    pthread_cond_t _pcond;//生产者对应的条件变量
    pthread_cond_t _ccond;
};


task.hpp

#pragma once
#include<iostream>
#include<cstdio>
#include<string>
#include<functional>

using namespace std;
class CalTask
{
    using func_t=function<int(int,int,char)>;
public:
    CalTask()
    {}
    CalTask(int x,int y,char op,func_t func)
        :_x(x),_y(y),_op(op),_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d ",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ? ",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
const string oper="+-*/%";
int mymath(int x,int y,char op)
{
    int result=0;
    switch (op)
    {
    case '+':
        result=x+y;
        break;
    case '-':
        result=x-y;
        break;
    case '*':
        result=x*y;
        break;
    case '/':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x/y;
    }
        break;
    case '%':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x%y;
    }
        break;
    default:
        break;
    }
    return result;
}
class SaveTask
{
    typedef function<void(const string&)> func_t;
public:
    SaveTask()
    {}
    SaveTask(const string& message,func_t func)
        :_message(message),_func(func)
    {}
    void operator()()
    {
        _func(_message);
    }
private:
    string _message;
    func_t _func;
};
void Save(const string& message)
{
    const string target="./log.txt";
    FILE* fp=fopen(target.c_str(),"a+");
    if(!fp)
    {
        cerr<<"fopen error"<<endl;
        return;
    }
    fputs(message.c_str(),fp);
    fputs("\n",fp);
    fclose(fp);
}

MainCp.cc

#include"BlockQueue.hpp"
#include"task.hpp"
#include<sys/types.h>
#include<unistd.h>
#include<ctime>

//
//
template<class C,class S>
class BlockQueues
{

public:
    BlockQueue<C>* c_bq;
    BlockQueue<S>* s_bq;
};
void* consumer(void* bqs_)
{
    BlockQueue<CalTask>* bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->c_bq;
    BlockQueue<SaveTask>* save_bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->s_bq;
    
    while(true)
    {
        /* consumer */
        // int data;
        // bq->pop(&data);
        CalTask t;
        bq->pop(&t);
        string result=t();
        cout<<"消费数据: "<<result<<endl;

        SaveTask save(result,Save);
        save_bq->push(save);
        cout<<"推送保存任务完成..."<<endl;
        sleep(1);
    }
    return nullptr;
}
void* producter(void* bqs_)
{
    BlockQueue<CalTask>* bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->c_bq;
    
    while (true)
    {
        //producer
        int x=rand()%10+1;
        int y=rand()%5;
        int operCode=rand()%oper.size();
        CalTask t(x,y,oper[operCode],mymath);
        bq->push(t);
        cout<<"生产任务: "<<t.toTaskString()<<endl;
        // sleep(1);
    }
    return nullptr;
}
void* saver(void* bqs_)
{
    BlockQueue<SaveTask>* save_bq=(static_cast<BlockQueues<CalTask,SaveTask>* >(bqs_))->s_bq;
    while (true)
    {
        SaveTask t;
        save_bq->pop(&t);
        t();
        cout << "推送保存任务完成..." << endl;
    }
    return nullptr;
}
int main()
{
    srand((unsigned long)time(nullptr));
    BlockQueues<CalTask,SaveTask> bqs;

    bqs.c_bq=new BlockQueue<CalTask>();
    bqs.s_bq=new BlockQueue<SaveTask>();
    pthread_t c,p,s;
    pthread_create(&c,nullptr,consumer,&bqs);
    pthread_create(&p,nullptr,producter,&bqs);
    pthread_create(&s,nullptr,saver,&bqs);
    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    pthread_join(s,nullptr);
    delete bqs.c_bq;
    delete bqs.s_bq;
    return 0;
}

./MainCp
生产任务: 9 * 0 = ? 
生产任务: 9 - 4 = ? 
生产任务: 8 - 0 = ? 
生产任务: 3 - 4 = ? 
生产任务: 6 + 1 = ? 
消费数据: 9 * 0 = 0 
推送保存任务完成...
生产任务: 2 - 2 = ? 
推送保存任务完成...
消费数据: 9 - 4 = 5 
推送保存任务完成...
生产任务: 9 - 0 = ? 
推送保存任务完成...
消费数据: 8 - 0 = 8 
推送保存任务完成...
生产任务: 6 * 3 = ? 
推送保存任务完成...
消费数据: 3 - 4 = -1 
推送保存任务完成...
生产任务: 4 * 4 = ? 
推送保存任务完成...
消费数据: 6 + 1 = 7 
推送保存任务完成...
生产任务: 5 % 4 = ? 
推送保存任务完成...
^C
zhangsan@ubuntu:~/practice-using-ubuntu/20241005/blockqueue$ cat log.txt
9 * 0 = 0 
9 - 4 = 5 
8 - 0 = 8 
3 - 4 = -1 
6 + 1 = 7 

信号量

a.信号量的本质就是计数器

b.只有拥有信号量,在未来就一定能拥有临界资源的一部分

申请信号量的本质就是:对临界资源中特点小块资源的预定机制

sem--         申请资源       P        必须保证操作的原子性

sem++       释放资源        V       必须保证操作的原子性

POSIX信号量

环形队列

RingQueue.hpp

#pragma once

#include<iostream>
#include<cassert>
#include<vector>
#include<ctime>
#include<cstdlib>
#include<semaphore.h>
#include<unistd.h>
#include<pthread.h>

static const int gcap=5;

template<class T>
class RingQueue
{
private:
    void P(sem_t& sem)
    {
        int n=sem_wait(&sem);
        assert(n==0);
    }
    void V(sem_t& sem)
    {
        int n=sem_post(&sem);
        assert(n==0);
    }
public:
    RingQueue(const int& cap=gcap):_queue(cap),_cap(cap)
    {
        int n=sem_init(&_spaceSem,0,_cap);
        assert(n==0);
        n=sem_init(&_dataSem,0,0);
        assert(n==0);
        _productorStep=_consumerStep=0;
        pthread_mutex_init(&_pmutex,nullptr);
        pthread_mutex_init(&_cmutex,nullptr);
    }
    void Push(const T& in)
    {
        
        P(_spaceSem);//productor
        pthread_mutex_lock(&_pmutex);
        _queue[_productorStep++]=in;
        _productorStep%=_cap;
        pthread_mutex_unlock(&_pmutex);//更高效
        V(_dataSem);
        
    }
    void Pop(T* out)
    {
        pthread_mutex_lock(&_cmutex);
        P(_dataSem);
        *out=_queue[_consumerStep++];
        _consumerStep%=_cap;
        V(_spaceSem);
        pthread_mutex_unlock(&_cmutex);
    }
    ~RingQueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);
        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }
private:
    vector<T> _queue;
    int _cap;
    sem_t _spaceSem;//生产者->空间资源
    sem_t _dataSem;
    int _productorStep;
    int _consumerStep;
    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;
};

task.hpp

#pragma once
#include<iostream>
#include<cstdio>
#include<string>
#include<functional>

using namespace std;
class Task
{
    using func_t=function<int(int,int,char)>;
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func)
        :_x(x),_y(y),_op(op),_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = %d ",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof buffer,"%d %c %d = ? ",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};
const string oper="+-*/%";
int mymath(int x,int y,char op)
{
    int result=0;
    switch (op)
    {
    case '+':
        result=x+y;
        break;
    case '-':
        result=x-y;
        break;
    case '*':
        result=x*y;
        break;
    case '/':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x/y;
    }
        break;
    case '%':
    {
        if(y==0)
        {
            cerr<<"div zero error!"<<endl;
            result=-1;
        }    
        else result=x%y;
    }
        break;
    default:
        break;
    }
    return result;
}

main.cc

#include"RingQueue.hpp"
#include"task.hpp"

using namespace std;

void* ProductorRoutine(void* rq)
{
    RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>* >(rq);
    while (true)
    {
        /* code */
        int x=rand()%100;
        int y=rand()%50;
        char op=oper[rand()%oper.size()];
        Task t(x,y,op,mymath);
        ringqueue->Push(t);
        cout<<"生产者派发了一个任务: "<<t.toTaskString()<<endl;
        sleep(1);
    }
}
void* ConsumerRoutine(void* rq)
{
    RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>* >(rq);
    while (true)
    {
        /* code */
        Task t;
        ringqueue->Pop(&t);
        string result=t();
        cout<<"消费者消费了一个任务"<<result<<endl;
    }
    
}
int main()
{
    srand((unsigned int)time(nullptr));
    RingQueue<Task>* rq=new RingQueue<Task>();

    pthread_t p,c;
    pthread_create(&p,nullptr,ProductorRoutine,rq);
    pthread_create(&c,nullptr,ConsumerRoutine,rq);
    pthread_join(p,nullptr);
    pthread_join(c,nullptr);
    delete rq;
    return 0;
}


http://www.niftyadmin.cn/n/5693433.html

相关文章

MySQL数据库专栏(二)SQL语句基础操作

目录 数据库操作 创建数据库 查看数据库 选择数据库 删除数据库 数据表操作 数据表数据类型 数据表列约束 数据表索引 创建表 查看表 查看表结构 删除表 数据表的增删改操作 …

C++面试速通宝典——15

254. STL有哪些容器&#xff1f;各自特点&#xff1f; 序列容器&#xff1a; vector&#xff1a;动态数组&#xff0c;支持快速随机访问。list&#xff1a;双向链表&#xff0c;支持快速插入和删除。deque&#xff1a;双端队列&#xff0c;两端都可以快速插入和删除。 关联容器…

Pytorch中不会自动传播梯度的操作有哪些?

在 PyTorch 中&#xff0c;某些生成张量的操作本身不会创建与计算图相关联的梯度信息。这些操作通常用于初始化张量&#xff0c;并且默认情况下不需要进行梯度计算。以下是一些常见的不会自动传播梯度的张量生成操作&#xff1a; 数值初始化操作&#xff1a; torch.linspace():…

windows环境下使用socket进行tcp通信

客户端 #define WIN32 _LEAN_AND_MEAN #define _WINSOCK_DEPRECATED_NO_WARNINGS #include<iostream> #include<WinSock2.h> #include<Windows.h> #include<stdio.h>using namespace std; #pragma comment (lib, "ws2_32.lib") //#ws2_32…

Golang | Leetcode Golang题解之第459题重复的子字符串

题目&#xff1a; 题解&#xff1a; func repeatedSubstringPattern(s string) bool {return kmp(s s, s) }func kmp(query, pattern string) bool {n, m : len(query), len(pattern)fail : make([]int, m)for i : 0; i < m; i {fail[i] -1}for i : 1; i < m; i {j : …

Github 2024-10-03Go开源项目日报Top10

根据Github Trendings的统计,今日(2024-10-03统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Go项目10TypeScript项目1快速且可扩展的多平台Web服务器 创建周期:3551 天开发语言:Go协议类型:Apache License 2.0Star数量:57434 个Fork数…

VirtulBOX Ubuntu22安装dpdk23.11

目录 依赖包安装 Python安装 numa安装 ​编辑Python pip3安装 ​编辑pyelftools安装 meson和ninja安装 ​编辑构建与编译 Meson构建DPDK ​编辑Ninja安装DPDK ​编辑VFIO-PCI驱动安装 大页内存和IOMMU配置 ​编辑VFIO-PCI加载 ​编辑VFIO-PCI驱动绑定 ​编辑dpdk…

【数据结构】什么是哈希表(散列表)?

&#x1f984;个人主页:修修修也 &#x1f38f;所属专栏:数据结构 ⚙️操作环境:Visual Studio 2022 目录 &#x1f4cc;哈希表的概念 &#x1f4cc;哈希函数的构造方法 &#x1f38f;直接定址法 &#x1f38f;除留余数法 &#x1f38f;平方取中法 &#x1f38f;折叠法 &#x…