zoukankan      html  css  js  c++  java
  • posix多线程--三种基本线程编程模型

    本文介绍了三种构建线程解决方案的方式。

    一、流水线:每个线程执行同一种操作,并把操作结果传递给下一步骤的线程。

    代码示例如下:
    终端输入一个int值,每个线程将该值加1,并将结果传给下一个线程。

    #include<stdio.h>                                                                
    #include<pthread.h>
    typedef struct stage_tag
     {
              pthread_mutex_t mutex;
              pthread_cond_t cond;
              int data;
              int ready;
              pthread_t tid;
              struct stage_tag *next;
    }stage_t;
    typedef struct pipe_tag
    {
            pthread_mutex_t mutex;
            stage_t *head;
            stage_t *tail;
            int stages;
    }pipe_t;
    void pipe_send(stage_t *stage,int data)
    {
            stage->data =data;
            stage->ready = 1;
            pthread_cond_signal(&stage->cond);
    }
    void *thread_route(void *arg)
    {
            stage_t *stage = (stage_t *)arg;
            while(!stage->ready)
            {
                    pthread_cond_wait(&stage->cond,&stage->mutex);
     }
            int data = stage->data+1;
            stage_t *next = stage->next;
            if(next!=NULL)
            {
                    pipe_send(next,data);
            }
            return NULL;
    }
    void create_pipe(pipe_t *pipe,int stages)
    {
    //      pipe = (pipe_t *)malloc(sizeof(pipe_t));
            pipe->stages = stages;
            int i;
            stage_t *stage;
            stage_t *last;
            for(i=0;i<=stages;i++)
            {
                    stage = (stage_t *)malloc(sizeof(stage_t));
                    stage->data = i;
                    if(i==0)
                    {
                            pipe->head = stage;
                    }
                    if(last!=NULL)
                    {
                            last->next = stage;
                    }
    last = stage;
            }
            last->next=NULL;
            pipe->tail = last;
            for(stage=pipe->head;stage->next!=NULL;stage=stage->next)
            {
                    pthread_create(&stage->tid,NULL,thread_route,(void *)stage);
                    printf("stage %d
    ",stage->data);
            }
    /*      free(pipe);
            for(stage=pipe->head;stage!=NULL;stage=stage->next)
            {
                      free(stage);
            }       
            */
    }
    int main(void)
    {
            pipe_t my_pipe;
            long value,result;
            char line[128];
            create_pipe(&my_pipe,10);
            pipe_send(my_pipe.head,5);
            sleep(10);
            printf("result is %d
    ",my_pipe.tail->data);
            return 0;
    } 
    View Code

    二、工作组:数据由一组线程分别独立地处理。

    代码示例如下:
    程序有两个参数:filepath:文件或目录路径;search:待查找字符串

    程序将文件路径排队给工作组,工作组线程判断该路径是文件还是目录,如果是文件,它将在文件中搜索字符串;如果是目录,它将使用readdir_r查找该目录中的所有子目录和文件,并将每一项添加到工作队列。

    #include<stdio.h>                                                                
    #include<pthread.h>
    #include<dirent.h>
    #include<sys/stat.h>
    typedef struct work_tag
    {
            struct work_tag *next;
            char *path;
            char *search;
    }work_t,*work_p;
    typedef struct worker_tag
    {
            int index;
            pthread_t tid;
            struct crew_tag *crew;
    
    }worker_t,*worker_p;
    typedef struct crew_tag
    {
            pthread_mutex_t mutex;
            pthread_cond_t cond;
            pthread_cond_t done;
            long work_count;
            work_t *first,*last;
            worker_t workers[4];
    }crew_t,*crew_p;
    void *thread_route(void *arg)
    {
    worker_p worker = (worker_t *)arg;
            crew_p crew = worker->crew;
    struct dirent *entry;
            entry = (struct dirent*)malloc(sizeof(struct dirent)+sizeof(size_t));   
    pthread_mutex_lock(&crew->mutex);
    while(crew->work_count ==0)
            {
                    pthread_cond_wait(&crew->cond,&crew->mutex);
            }
            pthread_mutex_unlock(&crew->mutex);
            printf("worker is running: %d
    ",worker->index);
            while(1)
            {
                    pthread_mutex_lock(&crew->mutex);
                    while(crew->first==NULL)
                    {
                            pthread_cond_wait(&crew->cond,&crew->mutex);
                    }
                    printf("worker %d woke %#lx %d
    ",worker->index,crew->first,crew->work_count);
                    work_p work = crew->first;
                    crew->first = work->next;
                    if(crew->first==NULL)
                            crew->last = NULL;
                    printf("worker %d took %#lx,leave first %#lx,last %#lx
    ",worker->index,work,crew->first,crew->last);
                    pthread_mutex_unlock(&crew->mutex);
                    struct stat filestat;
                    lstat(work->path,&filestat);
                    if(S_ISLNK(filestat.st_mode))
    printf("worker %d:%s is a link,skipping.
    ",worker->index,work->path);
                    else if(S_ISDIR(filestat.st_mode)){
                            DIR *dir;
                            struct dirent *result;
                            dir = opendir(work->path);
                            while(1){
                                    readdir_r(dir,entry,&result);
                    if(result==NULL)
                                            break;
                                    if(strcmp(entry->d_name,".")==0)
                                            continue;
                                    if(strcmp(entry->d_name,"..")==0)
                            
                                            continue;
                                    work_p new_work = (work_p)malloc(sizeof(work_t));
                    printf("test
    ");
                     path_max = pathconf (work->path, _PC_PATH_MAX);
    new_work->path = (char*)malloc (path_max);
    strcpy (new_work->path, work->path);
                    strcat (new_work->path, "/");
                    strcat (new_work->path, entry->d_name);                 
    //       char *new_dir = strcat(work->path,entry->d_name);
                                    //new_work->path = new_dir;
                                     new_work->search = work->search;
                                     new_work->next = NULL;
                                     pthread_mutex_lock(&crew->mutex);
                                     if(crew->first==NULL)
    {
                                            crew->first = new_work;
                                            crew->last = new_work;
                                     }
                                     else{
                                            crew->last->next = new_work;
                                            crew->last = new_work;
                                     }
                                     crew->work_count++;
                                     printf("worker %d add work %#lx,first %#lx,last %#lx,%d
    ",worker->index,new_work,crew->first,crew->last,crew->work_count);
                                     pthread_cond_signal(&crew->cond);
                                     pthread_mutex_unlock(&crew->mutex);
                            }
                            closedir(dir);
                    }
                    else if(S_ISREG(filestat.st_mode)){
                            FILE *file;
                            char buffer[256];
                            file = fopen(work->path,"r");
                            fgets(buffer,sizeof(buffer),file);
                            char *search_ptr;
                            search_ptr = strstr(buffer,work->search);
                            if(search_ptr!=NULL){
                                    printf("worker %d found "%s" in %s
     ",worker->index,work->search,work->path);              
                                     }
                            fclose(file);
                            
                    }
                    else{
                            printf("worker %d:%s format is error.
    ",worker->index,work->path);
                    }
                    free(work->path);
                    free(work);
    
                    pthread_mutex_lock(&crew->mutex);
                    crew->work_count--;
                    printf("worker %d decremented work to %d
    ",worker->index,crew->work_count);
                    if(crew->work_count<=0){
                            pthread_cond_broadcast(&crew->done);
                    }                                                                
                    pthread_mutex_unlock(&crew->mutex);
            }
            free(entry);
            return NULL;
    }
    void crew_create(crew_t *crew)
    {
            int worker_index;
            crew->work_count = 0;
            crew->first = NULL;
            crew->last = NULL;
            pthread_mutex_init(&crew->mutex,NULL);
    pthread_cond_init(&crew->cond,NULL);
            pthread_cond_init(&crew->done,NULL);
            for(worker_index=0;worker_index<4;worker_index++){
                    crew->workers[worker_index].index = worker_index;
                    crew->workers[worker_index].crew = crew;
                    pthread_create(&crew->workers[worker_index].tid,
                    NULL,thread_route,(void *)&crew->workers[worker_index]);
            }
    }
    void crew_start(crew_t *crew,char *filepath,char *search)
    {
            pthread_mutex_lock(&crew->mutex);
            work_p work = (work_p)malloc(sizeof(work_t));
            work->path = filepath;
            work->search = search;
            work->next = NULL;
            crew->first = work;
            crew->last = work;
            crew->work_count++;
            pthread_cond_signal(&crew->cond);
            while(crew->work_count>0)
            {
                    pthread_cond_wait(&crew->done,&crew->mutex);
            }
            printf("crew is done!
    ");
            pthread_mutex_unlock(&crew->mutex);
    }
    int main(void)             
    {
            crew_t crew;
            crew_create(&crew);
            char *filepath = "/home/ubuntu/programs";
            char *search = "errno";
            crew_start(&crew,filepath,search);
            return 0;
    }   
    View Code

    三、客户端/服务器:客户端线程将工作排队,交给一个服务器线程去处理。客户端或者以同步方式等待服务器执行,或异步执行并在后面需要时查找结果。

    代码示例如下:
    一组线程都需要从stdin中读取输入,这将导致提示-读(prompt-and-read)操作可能有些混乱。一个方法是使用flockfile和funlockfile函数来锁住stdin和stdout。,另一个方式是,使用服务器线程,将客户端读写操作排队,由服务器线程依次处理读写操作队列。

    #include<stdio.h>                                                                
    #include<pthread.h>
    #define REQ_READ 1
    #define REQ_WRITE 2
    #define REQ_QUIT 3
    typedef struct client_tag
    {
            struct client_tag *next;
            int oper;
            int sync;
            int done_flag;
            char prompt[32];
            char text[128];
            pthread_mutex_t mutex;
            pthread_cond_t mutex;
    }client_t;
    typedef struct server_tag
    {
            client_t *first,*last;
            pthread_mutex_t mutex;
            pthread_cond_t cond;
    }server_t;
    server_t server={NULL,NULL,PTHREAD_MUTEX_INITIALIZER,PTHREAD_COND_INITIALIZER};
    pthread_mutex_t main_mutex = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t main_cond = PTHREAD_COND_INITIALIZER;
    int thread_count = 4;
    void client_request(int oper,int sync,const char *prompt,char *string)
    {
            pthread_mutex_lock(&server.mutex);
            client_t *client;
    client = (client_t *)malloc(sizeof(client_t));
            client->next = NULL;
            client->oper  = oper;
            client->sync = sync;
            if(prompt!=NULL)
                    strncpy(client->prompt,prompt,32);
            if(oper==REQ_WRITE&&string!=NULL)
                    strncpy(client->text,string,128);
            if(server.first==NULL)
            {
                    server.first = client;
                    server.last = client;
            }else{
                    server.last->next = client;
                    server.last = client;
            }
            pthread_cond_signal(&server.cond);
            if(sync)
            {
                    while(!client->done_flag)
                    {
                            pthread_cond_wait(&client->cond,&server.mutex);
                    }
                    if(oper==REQ_READ)
                    {
                            if(strlen(client->text)>0)
                                    strcpy(string,client->text);
                    }
    }
                    pthread_cond_destroy(&client->cond);
                    free(request);
            }
            pthread_mutex_unlock(&server.mutex);
    }
    void *client_route(void *arg)
    {
            int index = (int)arg;
            int loops;
            char prompt[32];
            char string[128],formatted[128];
            sprintf(prompt,"Client %d>",index);
            while(1)
            {
                    client_request(REQ_READ,1,prompt,string);
                    if(strlen(string)==0)
                            break;
                    for(loops=0;loops<4;loops++)
                    {
                            sprintf(formatted,"(%d#%d) %s",index,loops,string);
                            client_request(REQ_WRITE,0,NULL,formatted);
                            sleep(1);
                    }                 
     }
    }
    void *server_route(void *arg)
    {
            client_t *client;
            int oper;
            while(1)
            {
                    pthread_mutex_lock(&server.mutex);
                    while(server.first==NULL)
                    {
                            pthread_cond_wait(&server.cond,&server.mutex);
                    }
                    client = server.first;
                    server.first = client.next;
                    if(server.first==NULL)
                            server.last = NULL;
                    pthread_mutex_unlock(&server.mutex);
                    oper = client->oper;
                    switch(oper){
                            case REQ_QUIT:
                                    break;
                            case REQ_READ:
                                    if(strlen(client->prompt)>0)
                                            printf(client->prompt);
                                    fgets(client->text,128,stdin);
                                    break;
                            case REQ_WRITE:       
                                    puts(client->text);
                                    break;
                            default:
                                    break;
                    }
                    free(client);
                    if(oper==REQ_QUIT)
                            break;
            }
            return NULL;
    }
    int main(void)
    {
            pthread_t sid;
            pthread_create(&sid,NULL,server_route,NULL);
    
            pthread_t cid;
            int i;
            for(i=0;i<thread_count;i++)
            {
                    pthread_create(&cid,NULL,client_route,(void *)count);
            }
            pthread_mutex_lock(&main_mutex);
            while(thread_count>0)
            {
                    pthread_cond_wait(&main_cond,&main_mutex);
            }
            pthread_mutex_unlock(&main_mutex);        
            printf("Done!
    ");
            client_request(REQ_QUIT,1,NULL,NULL);
            return 0;
    }    
    View Code

    参考资料:《POSIX多线程程序设计》 pp.81-110

  • 相关阅读:
    Windows Server 2012配置开机启动项
    Windows Server 2019 SSH Server
    NOIP2017 senior A 模拟赛 7.7 T1 棋盘
    Noip 2015 senior 复赛 Day2 子串
    Noip 2015 senior复赛 题解
    Noip 2014 senior Day2 解方程(equation)
    Noip 2014 senior Day2 寻找道路(road)
    Noip 2014 senior Day2 无线网络发射器选址(wireless)
    Noip2014senior复赛 飞扬的小鸟
    Noip 2014 senior 复赛 联合权值(link)
  • 原文地址:https://www.cnblogs.com/shijingjing07/p/5395623.html
Copyright © 2011-2022 走看看