zoukankan      html  css  js  c++  java
  • Linux基础——多线程实现任务

    这里,我们首先要实现一种数据结构,将相应的任务,线程的fd,还有队列实现。

    声明代码如下:

     1 #ifndef _HEAD_H
     2 #define _HEAD_H
     3 #include <stdio.h>
     4 #include <stdlib.h>
     5 #include <unistd.h>
     6 #include <string.h>
     7 #include <sys/stat.h>
     8 #include <sys/select.h>
     9 #include <sys/types.h>
    10 #include <fcntl.h>
    11 #include <pthread.h>
    12 #include <sys/time.h>
    13 #include <signal.h>
    14 #define MSG_LEN 1024
    15 #define TASK_CNT 1024
    16 extern pthread_mutex_t lock;
    17 extern pthread_cond_t cond1;
    18 extern pthread_cond_t cond2;
    19 typedef struct tag_fd
    20 {
    21     int s_rfd;
    22     int s_wfd;
    23     struct tag_fd *next;
    24 }FD,*pFD;
    25 typedef struct tag_task
    26 {
    27     char s_msg[MSG_LEN];
    28     int s_fd;
    29 }TASK,*pTASK;
    30 typedef struct tag_que
    31 {
    32     TASK arr[TASK_CNT+1];
    33     int front;
    34     int tail;
    35 }QUEUE,*pQUEUE;
    36 void fd_insert(pFD *phead,int rfd,int wfd);
    37 void fd_init(pFD *phead);
    38 int  fd_find(pFD phead,int rfd);
    39 void fd_del(pFD *phead,int rfd);
    40 void add_task(pQUEUE pq,pTASK pt);
    41 void get_task(pQUEUE pq,pTASK pt);
    42 void excute_task(pTASK pt);
    43 #endif
    View Code

    我们需要根据线程的占用情况,控制好,所以我们应把线程插入到一个链表中。
    实现代码如下:

     1 #include "head.h"
     2 void fd_init(pFD *phead)
     3 {
     4     *phead= NULL;
     5 }
     6 void fd_insert(pFD *phead,int rfd,int wfd)
     7 {
     8     pFD pnew = (pFD )calloc(1,sizeof(FD));
     9     pnew->s_rfd=rfd;
    10     pnew->s_wfd=wfd;
    11     pnew->next = *phead;
    12     *phead = pnew;
    13 }
    14 int fd_find(pFD phead,int rfd)
    15 {
    16     while(phead)
    17     {
    18         if(phead->s_rfd==rfd)
    19             break;
    20         else
    21             phead = phead->next;
    22     }
    23     if(phead == NULL)
    24         return -1;
    25     else
    26         return phead->s_wfd;
    27 }
    28 
    29 void fd_del(pFD *phead,int rfd)
    30 {
    31     pFD pcur,ppre;
    32     pcur=*phead;
    33     ppre=NULL;
    34     while(pcur)
    35     {
    36         if(pcur->s_rfd == rfd)
    37             break;
    38         else
    39         {
    40             ppre=pcur;
    41             pcur = pcur ->next;
    42         }
    43     }
    44     if(ppre==NULL)
    45     {
    46         *phead=pcur->next;
    47         free(pcur);
    48         pcur=NULL;
    49     }
    50     else
    51     {
    52         ppre->next=pcur->next;
    53         free(pcur);
    54         pcur=NULL;
    55     }
    56 }
    View Code

    然后,我们还需要实现对任务的控制,例如任务的添加、获得、执行等。

    实现代码如下:

     1 #include "head.h"
     2 static int que_empty(pQUEUE pq)
     3 {
     4     return pq->front == pq->tail;
     5 }
     6 static int que_full(pQUEUE pq)
     7 {
     8     return (pq->tail+1)%(TASK_CNT+1)==pq->front;
     9 }
    10 static int que_cnt(pQUEUE pq)
    11 {
    12     return (pq->tail - pq->front +TASK_CNT+1)%(TASK_CNT + 1);
    13 }
    14 void add_task(pQUEUE pq ,pTASK pt)
    15 {
    16     pthread_mutex_lock(&lock);
    17     while(que_full(pq))
    18         pthread_cond_wait(&cond1,&lock);
    19     pq->arr[pq->tail]=*pt;
    20     pq->tail = (pq->tail+1)%(TASK_CNT+1);
    21     if(que_cnt(pq)==1)
    22         pthread_cond_broadcast(&cond2);
    23     printf("添加了一个任务!!
    ");
    24     pthread_mutex_unlock(&lock);
    25 }
    26 void get_task(pQUEUE pq ,pTASK pt)
    27 {
    28     pthread_mutex_lock(&lock);
    29     while(que_empty(pq))
    30         pthread_cond_wait(&cond2,&lock);
    31     *pt=pq->arr[pq->front];
    32     pq->front = (pq->front+1)%(TASK_CNT+1);
    33     if(que_cnt(pq)== TASK_CNT -1)
    34         pthread_cond_broadcast(&cond1);
    35     printf("获得了一个任务!!
    ");
    36     pthread_mutex_unlock(&lock);
    37 }
    38 
    39 
    40 void excute_task(pTASK pt)
    41 {
    42     char buf[1024];
    43     memset(buf,0,1024);
    44     strcpy(buf,pt->s_msg);
    45     int index;
    46     for(index=0;index < strlen(buf);index++)
    47         buf[index]=toupper(buf[index]);
    48     buf[index]='';
    49     write(pt -> s_fd,buf,strlen(buf));
    50 }
    View Code

    最后,我们只需在服务器端应用select循环查询是否有任务,再执行相应的操作。

    服务器实现代码如下:

      1 #include "head.h"
      2 pthread_mutex_t lock;
      3 pthread_cond_t cond1,cond2;
      4 void* hand(void* arg)
      5 {
      6     pthread_detach(pthread_self());
      7     TASK task;
      8     pQUEUE pq = (pQUEUE)arg;
      9     while(1)
     10     {
     11         get_task(pq,&task);
     12         excute_task(&task);
     13         sleep(1);
     14     }
     15 }
     16 int main(int argc,char *argv[])
     17 {
     18     if(argc != 3)
     19     {
     20         perror("参数错误!!
    ");
     21         exit(1);
     22     }
     23     signal(SIGINT,SIG_IGN);
     24     signal(SIGPIPE,SIG_IGN);
     25     signal(SIGQUIT,SIG_IGN);
     26     QUEUE que;
     27     int fd;
     28     fd_set read_set,revc;
     29     pFD list;
     30     memset(&que,0,sizeof(QUEUE));
     31     fd_init(&list);
     32     int cnt = atoi(argv[2]);
     33     pthread_t *arr=(pthread_t *)calloc(cnt,sizeof(pthread_t));
     34     pthread_mutex_init(&lock,NULL);
     35     pthread_cond_init(&cond1,NULL);
     36     pthread_cond_init(&cond2,NULL);
     37     int index=0;
     38     while(cnt > 0)
     39     {
     40         pthread_create(arr+index,NULL,hand,(void*)&que);
     41         cnt--;
     42         index++;
     43     }
     44     fd = open(argv[1],O_RDONLY);
     45     if(fd == -1)
     46     {
     47         perror("管道打开失败!!
    ");
     48         exit(1);
     49     }
     50     struct timeval tm;
     51     int ret;
     52     FD_ZERO(&read_set);
     53     FD_ZERO(&revc);
     54     FD_SET(fd,&read_set);
     55     while(1)
     56     {
     57         tm.tv_sec=0;
     58         tm.tv_usec=1000;
     59         revc = read_set;
     60         ret=select(1024,&revc,NULL,NULL,&tm);
     61         if(ret == 0)
     62             continue;
     63         else if(ret > 0)
     64         {
     65             if(FD_ISSET(fd,&revc))
     66             {
     67                 char buf[32];
     68                 memset(buf,0,32);
     69                 if(read(fd,buf,32)==0)
     70                     continue;
     71                 else
     72                 {
     73                     char name[32];
     74                     int r_fd,w_fd;
     75                     buf[strlen(buf)-1]='';
     76                     memset(name,0,32);
     77                     sprintf(name,"r.%s",buf);
     78                     w_fd=open(name,O_WRONLY);
     79                     memset(name,0,32);
     80                     sprintf(name,"w.%s",buf);
     81                     r_fd=open(name,O_RDONLY);
     82                     fd_insert(&list,r_fd,w_fd);
     83                     FD_SET(r_fd,&read_set);
     84                 }
     85             }
     86         }
     87         pFD pcur=list;
     88         while(pcur)
     89         {
     90             if(FD_ISSET(pcur->s_rfd,&revc))
     91             {
     92                 char buf[1024];
     93                 memset(buf,0,1024);
     94                 if(read(pcur->s_rfd,buf,1024)==0)
     95                 {
     96                     FD_CLR(pcur->s_rfd,&read_set);
     97                     int i=pcur->s_rfd;
     98                     pcur=pcur->next;
     99                     fd_del(&list,i);
    100                 }
    101                 else
    102                 {
    103                     TASK tk;
    104                     memset(&tk,0,sizeof(TASK));
    105                     tk.s_fd=pcur->s_wfd;
    106                     strcpy(tk.s_msg,buf);
    107                     add_task(&que,&tk);
    108                     pcur=pcur->next;
    109                 }
    110             }
    111             else
    112                 pcur=pcur->next;
    113         }
    114     }
    115     pthread_mutex_destory(&lock);
    116     pthread_cond_destory(&cond1);
    117     pthread_cond_destory(&cond2);
    118     return 0;    
    119 }
    View Code

    客户端实现代码如下:

     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <fcntl.h>
     4 #include <unistd.h>
     5 #include <string.h>
     6 #include <sys/stat.h>
     7 #include <sys/types.h>
     8 int main(int argc,char *argv[])
     9 {
    10     int fd_server,send,revc;
    11     char rname[32],wname[32];
    12     memset(rname,0,32);
    13     memset(wname,0,32);
    14     sprintf(rname,"r.%d",getpid());
    15     sprintf(wname,"w.%d",getpid());
    16     mkfifo(rname,0666);
    17     mkfifo(wname,0666);
    18     fd_server=open(argv[1],O_WRONLY);
    19     char msg[1024];
    20     memset(msg,0,1024);
    21     sprintf(msg,"%d
    ",getpid());
    22     write(fd_server,msg,strlen(msg));
    23     revc=open(rname,O_RDONLY);
    24     send=open(wname,O_WRONLY);
    25     while(memset(msg,0,1024),fgets(msg,1024,stdin))
    26     {
    27         write(send,msg,strlen(msg));
    28         memset(msg,0,1024);
    29         read(revc,msg,1024);
    30         write(1,msg,strlen(msg));
    31     }
    32     close(fd_server);
    33     close(send);
    34     close(revc);
    35     unlink(rname);
    36     unlink(wname);
    37     return 0;
    38 }
    View Code
  • 相关阅读:
    re模块和分组 random模块
    javascript中===和==的区别
    基于jQuery封装一个瀑布流插件
    javascript中天气接口案例
    jQuery中样式和属性模块简单分析
    jQuery中事件模块介绍
    jQueryDOM操作模块(二)
    jQueryDOM操作模块
    jQuery基本选择器模块(二)
    jQuery基本选择器模块
  • 原文地址:https://www.cnblogs.com/gjn135120/p/4009313.html
Copyright © 2011-2022 走看看