file:
BlockQueue.hpp:
#pragma once
#include<pthread.h>
#include<cassert>
#include<queue>
const int maxsize=5;
template<class T>
class BlockQueue{
public:
BlockQueue(){
int n;
n=pthread_mutex_init(&m_mutex,nullptr);
assert(n==0);
n=pthread_cond_init(&m_cpond,nullptr);
assert(n==0);
n=pthread_cond_init(&m_ppond,nullptr);
assert(n==0);
(void)n;
}
~BlockQueue(){
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cpond);
pthread_cond_destroy(&m_ppond);
}
void push(const T& task){
pthread_mutex_lock(&m_mutex);
while(is_full()){
pthread_cond_wait(&m_ppond,&m_mutex);
}
m_q.push(task);
pthread_cond_signal(&m_cpond);
pthread_mutex_unlock(&m_mutex);
}
void pop(T &task){
pthread_mutex_lock(&m_mutex);
while(is_empty()){
pthread_cond_wait(&m_cpond,&m_mutex);
}
task=m_q.front();
m_q.pop();
pthread_cond_signal(&m_ppond);
pthread_mutex_unlock(&m_mutex);
}
private:
bool is_empty(){
return m_q.size()==0;
}
bool is_full(){
return m_q.size()==maxsize;
}
private:
std::queue<T> m_q;
pthread_mutex_t m_mutex;
pthread_cond_t m_cpond,m_ppond;
};
Task.hpp:
#pragma once
#include<functional>
class Task{
public:
using func_t=std::function<int(int,int)>;
Task(){}
Task(const int &x,const int &y,func_t func):m_x(x),m_y(y),m_callback(func){}
~Task(){}
int operator()(){
return m_callback(m_x,m_y);
}
private:
func_t m_callback;
int m_x;
int m_y;
};
MainCp.cpp:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include<ctime>
#include<unistd.h>
#include<iostream>
using func_t=std::function<int(int,int)>;
std::vector<func_t> fs;
int add(int x,int y){
return x+y;
}
int divid(int x,int y){
return x/y;
}
int mul(int x,int y){
return x*y;
}
void *comsumer(void *args){
BlockQueue<Task> *bq=static_cast<BlockQueue<Task>*>(args);
while(true){
Task t;
bq->pop(t);
std::cout<<"消費數據:"<<t()<<std::endl;
sleep(1);
}
}
void *productor(void *args){
BlockQueue<Task> *bq=static_cast<BlockQueue<Task>*>(args);
while(true){
int x=rand()%10+1;
int y=rand()%10+1;
int taskid=rand()%3;
bq->push(Task(x,y,fs[taskid]));
std::cout<<"生產數據:"<<x<<' '<<y<<' '<<taskid<<std::endl;
sleep(1);
};
}
void init(){
fs.push_back(add);
fs.push_back(divid);
fs.push_back(mul);
}
int main(){
init();
srand((unsigned int)time(nullptr)^getpid());
BlockQueue<Task> *bq=new BlockQueue<Task>();
pthread_t c,p;
pthread_create(&c,nullptr,comsumer,(void*)bq);
pthread_create(&p,nullptr,productor,(void*)bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
}