zoukankan      html  css  js  c++  java
  • 多线程同步互斥

    客户端将需要解决的task发送给服务器,服务器调用线程来解决客户端发送的task,解决完由线程负责将其发送回客户端。(用管道实现通信)

    1. server维护两个列表。一是客户端列表。二是任务列表。分别如下:

    /* 客户端列表 */    
    typedef struct tag_fds
    {
        int s_rfd ;
        int s_wfd ;
        struct tag_fds* s_next ;
    }FD_PAIR, *pFD_PAIR;
    
    /* 任务列表,相当于资源 */
    typedef struct tag_que
    {
        TASK s_arr[TASK_CNT + 1] ;
        int s_front ;
        int s_tail ;
    }QUEUE, *pQUEUE ;

    2. server端维护一个管道(为了叙述方便,暂时称为server_pipe),用于接收客户端的上线消息。client端维护两个管道,一个管道用于向server端发送所要处理的task,而另一个管道用于接收从server端返回的task result。

    3. server端可以使用select函数对所有管道的读端进行轮询。所有的读端包括:用于接收客户端task管道的读端以及server_pipe的读端。

    4. 当客户端上线时,它会将自己的进程ID(pid)通过server_pipe发送给服务器。服务器根据pid,可以构造出该客户端所创建的两个管道的名称,以此可以打开客户端的两个管道,同时将server端的读端加入监听集合,并且将该客户端加入客户端队列。

    5. 当客户端向服务器发送task时,服务器端的select监听到之后,会遍历客户端列表(根据server端接收task管道的读描述符),找到具体的客户端。我们会将服务器对应于该客户端管道的写描述符连同task任务一起打包(其实就是结构体啦),加入任务列表。之所以要将写描述符打包进一个任务,是因为方便我们的线程处理完任务后,可以直接向客户端返回结果。任务结构体如下:

    typedef struct tag_task
    {
        char s_msg[1024];     /* 客户端向服务器端发送的任务用msg存储 */
        int s_fd ;            /* s_fd为写端,用于线程处理完任务后,发送消息给客户端 */
    }TASK, *pTASK;

    6. 很显然,我们服务器端的主线程在此处就是一个生产者,负责将TASK添加到任务列表中。而主线程创造出的诸多线程则是消费者,从任务列表取出任务,处理完后发送结果至客户端。

    7. 此处服务器处理逻辑比较简单,客户端发送什么请求,我们就返回什么请求,打印在屏幕上。

    //server.h

    头文件server.h

    #ifndef __SERVER_H__
    #define __SERVER_H__
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <sys/stat.h>
    #include <sys/types.h>
    #include <fcntl.h>
    #include <pthread.h>
    #include <sys/time.h>
    #include <sys/select.h>
    #include <signal.h>
    #define MSG_LEN 1024
    #define TASK_CNT 1024 
    extern pthread_mutex_t mutex ;
    extern pthread_cond_t cond_master ;
    extern pthread_cond_t cond_slave ;
    
    typedef struct tag_fds
    {
        int s_rfd ;
        int s_wfd ;
        struct tag_fds* s_next ;
    }FD_PAIR, *pFD_PAIR;
    
    typedef struct tag_task
    {
        char s_msg[1024];
        int s_fd ;
    }TASK, *pTASK;
    
    typedef struct tag_que
    {
        TASK s_arr[TASK_CNT + 1] ;
        int s_front ;
        int s_tail ;
    }QUEUE, *pQUEUE ;
    
    void fds_link_init(pFD_PAIR* phead);
    void fds_insert(pFD_PAIR* phead, int fd_r, int fd_w);
    int fds_find_wfd(pFD_PAIR phead, int fd_r);
    void fds_link_delete(pFD_PAIR* phead,int fd_r);
    
    void add_task(pQUEUE pq,pTASK pt );
    void get_task(pQUEUE pq, pTASK pt);
    void excute_task(pTASK pt);
    
    #endif

    //服务器主线程main.c
    #include "server.h"
    pthread_mutex_t mutex ;
    pthread_cond_t cond_master ;
    pthread_cond_t cond_slave ;
    void* slave_handler(void* arg)
    {
        pthread_detach(pthread_self());
        pQUEUE pq = (pQUEUE)arg ;
        TASK my_task ;
        while(1)
        {
            get_task(pq, &my_task);
            excute_task(&my_task);
            sleep(1);
        }
    }
    int main(int argc, char* argv[])// exe fifo_name  thd_cnt
    {
        if(argc != 3)
        {
            printf("USAGE: EXE FILENAME THD_CNT ! 
    ");
            exit(1);
        }
        signal(SIGINT, SIG_IGN);
        signal(SIGPIPE,SIG_IGN);
        signal(SIGQUIT,SIG_IGN);
        
    
        int fd_server ;
        QUEUE my_que ;
        pFD_PAIR my_list ;
    
        fd_set read_set, ready_set ;
        struct timeval tm ;
        int select_ret ;
    
        memset(&my_que, 0, sizeof(QUEUE));
        fds_link_init(&my_list);
    
    
        int slave_cnt = atoi(argv[2]);
        pthread_t * arr = (pthread_t*)calloc(slave_cnt, sizeof(pthread_t));
    
    
    
        pthread_mutex_init(&mutex, NULL);
        pthread_cond_init(&cond_master, NULL);
        pthread_cond_init(&cond_slave, NULL);
    
    
    
        int index = 0 ;
        while(slave_cnt > 0)
        {
            pthread_create(arr + index, NULL,slave_handler, (void*)&my_que );
            slave_cnt -- ;
            index ++ ;
        }
    
    
    
        fd_server = open(argv[1], O_RDONLY);
        if(fd_server == -1)
        {
            perror("open");
            exit(-1);
        }
        FD_ZERO(&read_set);
        FD_ZERO(&ready_set);
        FD_SET(fd_server, &read_set);
        while(1)
        {
            tm.tv_sec = 0 ;
            tm.tv_usec = 1000 ;
            ready_set = read_set ;
            select_ret = select(1024,&ready_set, NULL, NULL, &tm );
    
            if(select_ret == 0)
            {
                continue ;
            }else if(select_ret > 0)
            {
                if(FD_ISSET(fd_server, &ready_set))// client on r.pid w.pid
                {
                    char buf[32];
                    memset(buf, 0, 32);
                    if(read(fd_server, buf, 32) == 0)
                    {
                        continue ;
                    }else 
                    {
                        printf("a client on ! 
    ");
                        char pipe_name[32];
                        memset(pipe_name, 0, 32);
                        buf[strlen(buf) - 1] = '';
                        sprintf(pipe_name,"r.%s",buf);//clinet read
                        int wfd, rfd ;
                        wfd = open(pipe_name, O_WRONLY);
                        memset(pipe_name, 0, 32);
                        sprintf(pipe_name,"w.%s",buf);//clinet write
                        rfd = open(pipe_name, O_RDONLY);
                        fds_insert(&my_list, rfd, wfd);
                        FD_SET(rfd, &read_set);
                    }
                }
                pFD_PAIR pCur = my_list ;
                while(pCur)
                {
                    if(FD_ISSET(pCur ->s_rfd, &ready_set))// client request
                    {
                        char buf[1024] ;
                        memset(buf, 0, 1024);
                        if(read(pCur ->s_rfd, buf, 1024) == 0)//client quit
                        {
                            FD_CLR(pCur ->s_rfd, &read_set);
                            int fd_r = pCur ->s_rfd ;
                            pCur = pCur -> s_next ;
                            fds_link_delete(&my_list, fd_r);
    
                        }else 
                        {
                            TASK tk ;
                            memset(&tk, 0, sizeof(tk));
                            tk.s_fd = pCur -> s_wfd ;
                            strcpy(tk.s_msg, buf);
                            add_task(&my_que, &tk);
                            pCur = pCur ->s_next ;
                        }
                    }else
                    {
                        pCur = pCur ->s_next ;
                    }
                }
            }
    
    
    
        }
    
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&cond_master);
        pthread_cond_destroy(&cond_slave);
        return 0 ;
    }

    //fds_link.c
    #include "server.h"
    
    void fds_link_init(pFD_PAIR* phead)
    {
        *phead = NULL ;
    }
    
    void fds_insert(pFD_PAIR* phead, int fd_r, int fd_w)
    {
        pFD_PAIR pCur = (pFD_PAIR)calloc(1, sizeof(FD_PAIR));
        pCur ->s_rfd = fd_r ;
        pCur ->s_wfd = fd_w ;
        pCur ->s_next = *phead ;
        *phead = pCur ;
    
    }
    /*
    int fds_find_wfd(pFD_PAIR phead, int fd_r)
    {
        while(phead)
        {
            if(phead ->s_rfd == fd_r)
            {
                break ;
            }else 
            {
                phead = phead ->s_next ;
            }
        }
        if(phead == NULL)
        {
            return -1 ;
        }else 
        {
            return phead ->s_wfd ;
        }
    }                                                       */
    
    void fds_link_delete(pFD_PAIR* phead,int fd_r)
    {
        pFD_PAIR pPre , pCur ;
        pPre = NULL ;
        pCur = *phead ;
        while(pCur)
        {
            if(pCur ->s_rfd == fd_r)
            {
                break ;
            }else
            {
                pPre = pCur ;
                pCur = pCur ->s_next ;
            }
        }
        if(pPre == NULL)
        {
            *phead = pCur ->s_next ;
            free(pCur);
            pCur = NULL ;
        }else 
        {
            pPre ->s_next = pCur ->s_next ;
            free(pCur);
            pCur = NULL;
        }
    }

    //task.c

    #include "server.h"
    static int que_empty(pQUEUE pq)
    {
        return pq -> s_front == pq -> s_tail ;
    }
    static int que_full(pQUEUE pq)
    {
        return  (pq -> s_tail + 1)%(TASK_CNT + 1) == pq -> s_front ;
    }
    static int que_cnt(pQUEUE pq)
    {
        return (pq ->s_tail - pq ->s_front + TASK_CNT + 1)%(TASK_CNT + 1) ;
    }
    
    void add_task(pQUEUE pq,pTASK pt )
    {
        pthread_mutex_lock(&mutex);
        while(que_full(pq))
        {
            pthread_cond_wait(&cond_master, &mutex);
        }
        pq ->s_arr[pq ->s_tail] = *pt ;
        pq ->s_tail = (pq ->s_tail + 1)%(TASK_CNT + 1) ;
        //if(que_cnt(pq) == 1)
        {
            pthread_cond_broadcast(&cond_slave);
        }
        pthread_mutex_unlock(&mutex);
        sleep(1);
    }
    
    void get_task(pQUEUE pq, pTASK pt)
    {
        pthread_mutex_lock(&mutex);
        while(que_empty(pq))
        {
            pthread_cond_wait(&cond_slave, &mutex);
        }
        *pt = (pq ->s_arr)[pq ->s_front] ;
        pq -> s_front = (pq -> s_front + 1)%(TASK_CNT + 1);
    //    if(que_cnt(pq) == TASK_CNT - 1)
        {
            pthread_cond_broadcast(&cond_master);
        }
        pthread_mutex_unlock(&mutex);
        sleep(1);
        
    }
    
    void excute_task(pTASK pt)
    {
        write(pt ->s_fd, pt ->s_msg, strlen(pt ->s_msg));
    }


    客户端测试client.c
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <fcntl.h>
    #include <sys/stat.h>
    #include <sys/types.h>
    int main(int argc, char* argv[])//exe fifo
    {
        int fd_server, fd_snd, fd_recv  ;
        char rname[32], wname[32];
        fd_server  = open(argv[1], O_WRONLY);
        memset(rname, 0, 32);
        memset(wname, 0, 32);
        sprintf(rname,"r.%d", getpid());
        sprintf(wname,"w.%d", getpid());
        mkfifo(rname,0666);
        mkfifo(wname,0666);    
        char msg[1024]="";
        sprintf(msg,"%d
    ", getpid());
        write(fd_server, msg, strlen(msg));
        fd_recv = open(rname,O_RDONLY);
        fd_snd = open(wname,O_WRONLY);
        while(memset(msg, 0, 1024), fgets(msg, 1024, stdin) != NULL)
        {
            write(fd_snd, msg, strlen(msg));
            memset(msg, 0, 1024);
            read(fd_recv, msg, 1024);
            write(1, msg, strlen(msg));
        }
        close(fd_server);
        close(fd_snd);
        close(fd_recv);
    
        return 0 ;
    }


    Makefile

    SRC_DIR := ./src
    INC_DIR := ./include 
    EXE_DIR := ./bin
    CC := gcc 
    CFLAGS := -g -o
    SRC_OBJECTS := $(wildcard $(SRC_DIR)/*.c)
    INC_OBJECTS := $(wildcard $(INC_DIR)/*.h)
    $(EXE_DIR)/main : $(SRC_OBJECTS) $(INC_OBJECTS)
        $(CC) $(CFLAGS) $@ $(SRC_OBJECTS) -I$(INC_DIR) -lpthread
  • 相关阅读:
    sql 在日期范围内搜索
    js 处理日期时间字符串显示的方法
    matlab练习程序(并行计算)
    C++程序运行时间
    matlab练习程序(KNN,K最邻近分类法)
    多媒体指令(像素处理)
    ubuntu启动/重启/停止apache
    matlab练习程序(matlab调用c/c++)
    我的vim设置
    matlab练习程序(c/c++调用matlab<engine>)
  • 原文地址:https://www.cnblogs.com/hxjbc/p/3962259.html
Copyright © 2011-2022 走看看