zoukankan      html  css  js  c++  java
  • 线程的工作方式-流水线

    #include <iostream>
    #include <pthread.h>
    #include <cstdio>
    #include "errors.h"
    using namespace std;
    
    typedef struct stage_tag
    {
        pthread_mutex_t mutex;        //protect data 
        pthread_cond_t  avail;        //data availiable 当前流水节点处于空闲可用状态, 等待接收数据,进行加工
        pthread_cond_t  read;        //ready for data 当前处于数据准备状态,写入当前节点
        int             data_ready; //data present 节点数据状态;1 表示准备好 等待发送 ; 0 表示没有数据
        long            data;        //data to process
        pthread_t        thread;        //thread for stage
        struct stage_tag *next;        //next satge
    }stage_t;
    
    typedef struct pipe_tag
    {
        pthread_mutex_t    mutex;        //mutex to protect pipe
        stage_t            *head;        //fist stage
        stage_t            *tail;        //final stage
        int                stages;        //number for stages
        int                active;        //active data element
    }pipe_t;
    
    int pipe_send(stage_t *stage,long data)
    {
        int status=pthread_mutex_lock(&stage->mutex);
        if(status!=0)
            return status;
        while(stage->data_ready)// 如果有数据,则下面必须等待知道收到ready信号
        {
            status=pthread_cond_wait(&stage->read,&stage->mutex);//wait ready
            if(status!=0)                    //在read上阻塞,main线程改变了条件变量的值并发信息号,解除阻塞 
            {
                pthread_mutex_unlock(&stage->mutex);
                return status;
            }
        }
        //send new data
        stage->data=data;
        stage->data_ready=1;
        //此时有了data,就可以发送avail信号了
        status=pthread_cond_signal(&stage->avail);        //signal avail
        if(status!=0)
        {
            pthread_mutex_unlock(&stage->mutex);
            return status;
        }
        status=pthread_mutex_unlock(&stage->mutex);
        return status;
    }
    
    void *pipe_stage(void *arg) 
    {
        stage_t *stage=(stage_t*)arg;
        stage_t *next_stage=stage->next;
        
        int status=pthread_mutex_lock(&stage->mutex);
        if(status!=0)
            err_abort(status,"Lock pipe stage");
        
        while(1)
        {    
          //if there's data int the pipe stage,wait for it be consumed
          //一般一个条件表达式都是在一个互斥锁的保护下被检查。
          //当条件表达式未被满足时,线程将仍然阻塞在这个条件变量上。
          //当另一个线程改变了条件的值并向条件变量发出信号时,
          //等待在这个条件变量上的一个线程或所有线程被唤醒,
          //接着都试图再次占有相应的互斥锁。阻塞在条件变量上的线程被唤醒以后,
          //直到pthread_cond_wait()函数返回之前条件的值都有可能发生变化。
          //所以函数返回以后,在锁定相应的互斥锁之前,必须重新测试条件值。
          //最好的测试方法是循环调用pthread_cond_wait函数,并把满足条件的表达式置为循环的终止条件。
          //如:pthread_mutex_lock(); 
          //    while (condition_is_false) 
          //    pthread_cond_wait(); 
          //    pthread_mutex_unlock();
            while(stage->data_ready!=1)
            {
                status=pthread_cond_wait(&stage->avail,&stage->mutex);//wait avail
                if(status!=0)
                    err_abort(status,"wait for previous stage");
            }
            pipe_send(next_stage,stage->data+1);                //pipe_send()
            stage->data_ready=0;
            status=pthread_cond_signal(&stage->read);            //signal ready
            if(status!=0)
                err_abort(status,"wake next stage");
        }
    }
    
    int pipe_create(pipe_t *pipe,int stages)
    {
        stage_t **link=&pipe->head,*new_stage,*stage;
        int status=pthread_mutex_init(&pipe->mutex,NULL);
        if(status)
            err_abort(status,"Init pipe status");
        
        pipe->stages=stages;
        pipe->active=0;
        for(int i=0;i<stages;++i)
        {
            new_stage=(stage_t*)malloc(sizeof(stage_t));
            if(new_stage==NULL)
                errno_abort("allocate stage");
            
            status=pthread_mutex_init(&new_stage->mutex,NULL);
            if(status)
                err_abort(status,"init stage mutex");
            
            status=pthread_cond_init(&new_stage->avail,NULL);
            if(status)
                err_abort(status,"init avail condition");
                
            status=pthread_cond_init(&new_stage->read,NULL);
            if(status)
                err_abort(status,"init read condition");
                
            new_stage->data_ready=0;
            *link=new_stage;
            link=&new_stage->next;
        }
        *link=(stage_t*)NULL;
        pipe->tail=new_stage;
        
        for(stage=pipe->head;stage->next!=NULL;stage=stage->next)
        {
            status=pthread_create(&stage->thread,NULL,pipe_stage,(void*)stage);
            if(status)
                err_abort(status,"Create pipe stage");        
        }
        return 0;
    }
    
    int pipe_start(pipe_t *pipe,long vaule) 
    {
        int status=pthread_mutex_lock(&pipe->mutex);
        if(status)
            err_abort(status,"lock pipe mutex");
        
        pipe->active++;//记录活动的次数 
        status=pthread_mutex_unlock(&pipe->mutex);
        if(status)
            err_abort(status,"unlock pipe mutex");
        
        pipe_send(pipe->head,vaule);                    //pipe_send()
        return 0;
    }
    
    int pipe_result(pipe_t *pipe,long *result)
    {
        stage_t *tail=pipe->tail;
        int status=pthread_mutex_lock(&pipe->mutex);
        if(status)
            err_abort(status,"lock pipe mutex");
        
        int empty=0;
        if(pipe->active<=0)
            empty=1;
        else
            pipe->active--;
        status = pthread_mutex_unlock(&pipe->mutex);
        if(status!=0)
            err_abort( status, "unlock pipe mutex" );
        if(empty)
               return 0;
        
        pthread_mutex_lock(&tail->mutex);
        while(!tail->data_ready)
            pthread_cond_wait(&tail->avail,&tail->mutex);
        
        *result=tail->data;
        tail->data_ready=0;
        pthread_cond_signal(&tail->read);
        pthread_mutex_unlock(&tail->mutex);
        return 1;
    }
    
    int main()
    {
        pipe_t my_pipe;
        pipe_create(&my_pipe,3);                            //pipe_create()
        cout<<"Enter ingter values ,or "=" for next result"<<endl;
        
        while(1)
        {
            cout<<"Data >";
            char line[128];
            if(fgets(line,sizeof(line),stdin)==NULL)
                exit(0);
            if(strlen(line)<=1)
                continue;
            
            long result,value;
            if(strlen(line)<=2&&line[0]=='=')
            {
                if(pipe_result(&my_pipe,&result))            //pipe_result()
                    cout<<"Result  is:"<<result<<endl;
                else
                    cout<<"Pipe is empty"<<endl;
            }
            else
            {
                if(sscanf(line,"%ld",&value)<1)
                    fprintf(stderr,"Enter an intger value
    ");
                else
                    pipe_start(&my_pipe,value);            //pipe_start()
            }
        }
        return 0;
    }

    流水线工作方式的图示

    流水线中线程,线程队列对每个输入进行处理,一个阶段的线程处理完成后,将会把数据交给下一阶段的线程。最后一阶段的线程产生输出结果

    如果前一阶段处理的速度高于下一阶段的线程,可以使用缓冲区作为使其同步工作的手段。

  • 相关阅读:
    企业微信应用授权
    exec存储过程示例
    jquery判断对象是否存在
    IScroll5要防止重复加载
    transitionEnd不起作用解决方法
    微信接口 output {"errMsg":"translateVoice:fail, the permission value is offline verifying"}
    javascript保留两位小数
    html取消回车刷新提交
    企业微信后台登录
    企业微信开启开发者工具
  • 原文地址:https://www.cnblogs.com/tianzeng/p/9243862.html
Copyright © 2011-2022 走看看