zoukankan      html  css  js  c++  java
  • [多线程学习笔记] 流水线

     
    /*************************************************************************
        > File Name: pipe.c
        > Author: likeyi
        > Mail: likeyiyy@sina.com 
        > Created Time: Thu 17 Apr 2014 03:53:25 PM CST
     ************************************************************************/
    
    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <string.h>
    typedef struct buffer
    {
        unsigned char * buffer;
        unsigned long length;
    }buffer_t;
    typedef struct stage_tag
    {
        pthread_mutex_t mutex;
        pthread_cond_t avail;
        pthread_cond_t ready;
        int busy;
        int data_ready;
        buffer_t * data;
        pthread_t thread;
        long total;     /* 这个用来计数,看一个线程被调用多少次,当没有空闲的线程时,就用这个来比较。*/
    }stage_t;
    static stage_t * get_tcp_thread();
    static stage_t * get_ip_thread();
    static stage_t * get_eth_thread();
    static stage_t * tcp_thread_t,* ip_thread_t,*eth_thread_t;
    
    static unsigned char * tcp_hdr = "tcp header";
    static unsigned char * ip_hdr = "ip header";
    static unsigned char * eth_hdr = "eth header";
    
    void * pipe_add_tcp_header(void * arg)
    {
        /* 每次读取文件的一行,作为数据,发出去。*/
        pthread_detach(pthread_self());
        FILE * fp;
        unsigned char data[4096];
        stage_t * ip_thread = get_ip_thread();
        if((fp = fopen("data","r")) == NULL)
        {
            printf("Error When open data file
    ");
            pthread_exit(NULL); /*NOTICE: pthread_exit 不应该是局部变量。*/
        }
        int i = 0;
        int size = strlen(tcp_hdr);
        while(1)
        {
            pthread_mutex_lock(&ip_thread->mutex);
            /*
            * 什么IP 正忙?哈哈,那么我只有等了。
            * 是的data_ready的状态说明正忙。
            * */
            while(ip_thread->data_ready)
            {
                pthread_cond_wait(&ip_thread->ready,&ip_thread->mutex); 
            }
    
            /*
            * 开始准备数据。
            * */
            if(fgets(data,sizeof(data),fp) == NULL)
            {
                break;
            }
            int size2 = strlen(data);
            unsigned char * next_data = malloc(size + size2 + 1);
            strcpy(next_data,tcp_hdr);
            strcpy(next_data + size,data);
            /* 数据封装完毕 */
            ip_thread->data->buffer = next_data;
            ip_thread->data->length = size + size2;
    
            
            ip_thread->data_ready = 1; 
    
            pthread_cond_signal(&ip_thread->avail);
            /*
            * 呵呵,某种意义上,我这个时候是不是也应该告诉我的上级,我也做好准备了?
            * 但是,我没有上级,哈哈哈。
            * */
            pthread_mutex_unlock(&ip_thread->mutex);
            
        }
        printf("tcp_thread stoped
    ");
        
    }
    void * pipe_add_ip_header(void * arg)
    {
        stage_t * ip_stage = get_ip_thread();
        pthread_mutex_lock(&ip_stage->mutex);
        
        int size = strlen(ip_hdr);
        while(1)
        {
            /* 数据没准备好 
            *  看起来用data_ready来表示是否busy很科学的样子,我这样做反而不好了。
            * */
            while(ip_stage->data_ready != 1)
            {
                pthread_cond_wait(&ip_stage->avail,&ip_stage->mutex);
            }
            /*
            * 
            * */
    
            int size2 = ip_stage->data->length;
            unsigned char * next_data = malloc(size + size2 + 2);
            strcpy(next_data,ip_hdr);
            *(next_data + size) = ' ';
            strcpy(next_data + size + 1,ip_stage->data->buffer);
            /* 数据封装完毕 */
    /*********************************************************************/
            stage_t * eth_thread = get_eth_thread();
            pthread_mutex_lock(&eth_thread->mutex);
            while(eth_thread->data_ready)
            {
                pthread_cond_wait(&eth_thread->ready,&eth_thread->mutex);
            }
            eth_thread->data->buffer = next_data;
            eth_thread->data->length = size + size2;
    
            eth_thread->data_ready = 1;
            pthread_cond_signal(&eth_thread->avail);
            pthread_mutex_unlock(&eth_thread->mutex);
    
    /*********************************************************************/
    
            /*
            * 我自己呢?data_ready = 0,并且释放信号。
            * */
            ip_stage->data_ready = 0;
            pthread_cond_signal(&ip_stage->ready);
        }
    
    }
    void * pipe_add_ethdr(void * arg)
    {
        pthread_detach(pthread_self());
        stage_t * eth_stage = get_eth_thread();
        pthread_mutex_lock(&eth_stage->mutex);
        while(1)
        {
            while(eth_stage->data_ready != 1)
            {
                pthread_cond_wait(&eth_stage->avail,&eth_stage->mutex);
            }
            printf("Data is :%s
    ",eth_stage->data->buffer);
            eth_stage->data_ready = 0;
            pthread_cond_signal(&eth_stage->ready);
        }
    }
    static stage_t * get_tcp_thread()
    {
        return tcp_thread_t; 
    }
    static stage_t * get_ip_thread()
    {
        return ip_thread_t;
    }
    static stage_t * get_eth_thread()
    {
        return eth_thread_t;
    }
    int main(int argc, char ** argv)
    {
        eth_thread_t = malloc(sizeof(stage_t));
        ip_thread_t = malloc(sizeof(stage_t));
        tcp_thread_t = malloc(sizeof(stage_t));
    
        eth_thread_t->data = malloc(sizeof(buffer_t));
        ip_thread_t->data = malloc(sizeof(buffer_t));
        tcp_thread_t->data = malloc(sizeof(buffer_t));
    
        pthread_mutex_init(&eth_thread_t->mutex,NULL);
        pthread_mutex_init(&ip_thread_t->mutex,NULL);
        pthread_mutex_init(&tcp_thread_t->mutex,NULL);
    
        pthread_cond_init(&eth_thread_t->avail,NULL);
        pthread_cond_init(&ip_thread_t->avail,NULL);
        pthread_cond_init(&tcp_thread_t->avail,NULL);
    
        pthread_cond_init(&eth_thread_t->ready,NULL);
        pthread_cond_init(&ip_thread_t->ready,NULL);
        pthread_cond_init(&tcp_thread_t->ready,NULL);
    
        pthread_create(&eth_thread_t->thread,NULL,pipe_add_ethdr,NULL);
        pthread_create(&ip_thread_t->thread,NULL,pipe_add_ip_header,NULL);
        pthread_create(&tcp_thread_t->thread,NULL,pipe_add_tcp_header,NULL);
    
        pthread_exit(NULL);
        
        
    }

    线程工作有三个基本模式:流水线,工作组,客户/服务器模型。

    三个基本模式可以相互组合,比如流水线和工作组就可以组合到一块。

    书上的例子还是可以看的,我也第一次有实体这个概念,

    通过结构体把线程,互斥量,条件变量,谓词分装到一块真是个不错的注意。

    我简化了书上的列子(或者更复杂),模拟了TCP到ether层传输的过程,每经过一层,它们都打上自己的标志,然后传给下一层。

    假设流水线上有A-BC-D四个等级,由于A是流水线的开头,D是流水线的结尾,所以它们两个有所不同,但是流水线中间的工作方式都是类似的。

    A: 它检测下一级流水线是否做好准备,假如做好准备了,就把数据发给下一层,并且告诉下一层数据准备好了。

    注意这里的概念,上级要知道下级做好准备了才会发送数据,但是发送了数据之后又必须告诉下级数据准备好了。

    看起来,上级看下级准备好了通知下级,下级一直处于接受状态,并不需啊哟上级告诉下级数据准备好了就可以,因为隐私的说,下级准备好了就可以接受数据了。

    其实不可以,因为你下级准备好,我上级不一定准备好,你不能自己取数据,你可能取得错误的数据,必须,我知道你准备好了,我这边开始准备,然后再通知你。

    那么,上级先准备好,然后再等下级可不可以呢?看似可以,但是上级的数据从哪里来的呢?上级也是流水线中的一级。

    A的执行流是:

    wait for next not busy.

    process

    tell next is ok.

    当然,由于是流水线的头,它可能更灵活一些。

    B. B首先检测数据是否可用,可用,则处理数据,然后看看C是否准备好了吗,假如C没有准备好就一直等,注意B等C的时候,B也不接受新的数据了。

       当C可用后,B把数据发给C,告诉C准备好了,然后告诉A你又可以给我发数据了。

        wait for prev send me data.

        .... (I am in busy mode in fact)

        wait for next ready.

        tell next is ok

        tell pev is not busy now.

    D:tail的处理模式是,

         wait for prev send me data

         ....(I am in busy mode)

         tell prev is not busy now.

    总结:

    流水线的上一级还是修改了下一级的数据结构,这个很不好,但是假如不修改的话,应该是上一级告诉下一级数据好了的时候,是不是应该告诉下一级去哪里取数据?

  • 相关阅读:
    代理(reGeorg)
    弱口令爆破技巧
    无法解析@NotBlank
    LC 1723. Find Minimum Time to Finish All Jobs (dp+二分)
    帝国cms 联合多张表查询
    php 根据白名单替换字符转中的链接 封装的函数
    php 正则匹配域名后的整个链接和只匹配域名
    JVM系列(一):垃圾回收之MinorGC,MajorGC和FullGC的区别
    spring事务的执行原理
    java基础系列(八):Semphore,CountDownLatch和CyclicBarrier的使用
  • 原文地址:https://www.cnblogs.com/likeyiyy/p/3671386.html
Copyright © 2011-2022 走看看