zoukankan      html  css  js  c++  java
  • 13.1 多线程操作共享内存、生产者消费者模型、多线程服务器框架

      生产者消费者模型如下:

    程序如下:

      1 #include <unistd.h>
      2 #include <sys/types.h>
      3 
      4 #include <stdlib.h>
      5 #include <stdio.h>
      6 #include <errno.h>
      7 #include <string.h>
      8 
      9 #include <pthread.h>
     10 
     11 int g_Count = 0;
     12 
     13 int     nNum, nLoop;
     14 
     15 //定义锁
     16 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
     17 
     18 //定义条件并初始化
     19 pthread_cond_t my_condition=PTHREAD_COND_INITIALIZER;
     20 
     21 #define CUSTOM_COUNT 2
     22 #define PRODUCT_COUNT 4
     23 
     24 
     25   // int pthread_mutex_lock(pthread_mutex_t *mutex);
     26   //     int pthread_mutex_trylock(pthread_mutex_t *mutex);
     27    //    int pthread_mutex_unlock(pthread_mutex_t *mutex);
     28 
     29 /*
     30        int pthread_cond_timedwait(pthread_cond_t *restrict cond,
     31               pthread_mutex_t *restrict mutex,
     32               const struct timespec *restrict abstime);
     33        int pthread_cond_wait(pthread_cond_t *restrict cond,
     34               pthread_mutex_t *restrict mutex);
     35               */
     36 
     37 //posix 线程库的函数 线程库
     38 void *consume(void* arg)
     39 {
     40     
     41     int inum = 0;
     42     inum = (int)arg;
     43     while(1)
     44     {
     45         pthread_mutex_lock(&mutex);
     46         printf("consum:%d
    ", inum);
     47         while (g_Count == 0)  //while 醒来以后需要重新判断 条件g_Count是否满足,如果不满足,再次wait
     48         {
     49             printf("consum:%d 开始等待
    ", inum);
     50             pthread_cond_wait(&my_condition, &mutex); //api做了三件事情 //pthread_cond_wait假醒
     51             printf("consum:%d 醒来
    ", inum);
     52         }
     53     
     54         printf("consum:%d 消费产品begin
    ", inum);
     55         g_Count--; //消费产品
     56         printf("consum:%d 消费产品end
    ", inum);
     57         
     58         pthread_mutex_unlock(&mutex);
     59     
     60         sleep(1);
     61     }
     62 
     63     pthread_exit(0);
     64 
     65 } 
     66 
     67 //生产者线程
     68 //
     69 void *produce(void* arg)
     70 {
     71     int inum = 0;
     72     inum = (int)arg;
     73     
     74     while(1)
     75     {
     76     
     77     /*
     78         //因为是很多生产者调用produce,要保护全局变量g_Count,所以加锁 
     79         pthread_mutex_lock(&mutex);
     80         if (g_Count > 20)
     81         {
     82             printf("produce:%d 产品太多,需要控制,休眠
    ", inum);
     83             pthread_mutex_unlock(&mutex);
     84             sleep(1);
     85             continue;
     86         }
     87         else
     88         {
     89             pthread_mutex_unlock(&mutex);
     90         }
     91         */
     92     
     93         pthread_mutex_lock(&mutex);
     94         printf("产品数量:%d
    ", g_Count);
     95         printf("produce:%d 生产产品begin
    ", inum);
     96         g_Count++;
     97         //只要我生产出一个产品,就告诉消费者去消费
     98         printf("produce:%d 生产产品end
    ", inum);
     99         
    100         printf("produce:%d 发条件signal begin
    ", inum);
    101         pthread_cond_signal(&my_condition); //通知,在条件上等待的线程 
    102         printf("produce:%d 发条件signal end
    ", inum);
    103         
    104         pthread_mutex_unlock(&mutex);
    105         sleep(1);
    106     }
    107     
    108     pthread_exit(0);
    109 
    110 } 
    111 
    112 //结论:return arg 和 pthread_exit()的结果都可以让pthread_join 接过来
    113 int main()
    114 {
    115     int        i =0;
    116     pthread_t    tidArray[CUSTOM_COUNT+PRODUCT_COUNT+10];
    117     
    118     //创建消费者线程
    119     for (i=0; i<CUSTOM_COUNT; i++)
    120     {
    121         pthread_create(&tidArray[i], NULL, consume, (void *)i);
    122     }
    123     
    124     sleep(1);
    125     //创建生产线程
    126     for (i=0; i<PRODUCT_COUNT; i++)
    127     {
    128         pthread_create(&tidArray[i+CUSTOM_COUNT], NULL, produce, (void*)i);
    129     }
    130     
    131     
    132     
    133     for (i=0; i<CUSTOM_COUNT+PRODUCT_COUNT; i++)
    134     {
    135         pthread_join(tidArray[i], NULL); //等待线程结束。。。
    136     }
    137     
    138     
    139     printf("进程也要结束1233
    ");
    140     
    141     return 0;
    142 }

    执行结果如下:

     条件等待模型如下:

    多线程访问共享内存,通过信号量同步:不同进程中的线程无法进行同步与互斥

      1 #include <sys/types.h>
      2 #include <unistd.h>
      3 
      4 #include <stdlib.h>
      5 #include <stdio.h>
      6 #include <string.h>
      7 
      8 #include <signal.h>
      9 #include <errno.h>
     10 #include <signal.h>
     11 #include <sys/wait.h>
     12 
     13 #include "myipc_sem.h"
     14 #include "myipc_shm.h"
     15 
     16 #include <pthread.h>
     17 
     18 int g_key = 0x3333;
     19 
     20 void TestFunc(int loopnum)
     21 {
     22     printf("loopnum:%d
    ", loopnum);
     23     
     24     int ncount = 0;
     25     int ret = 0;
     26     int shmhdl = 0;
     27     int *addr = NULL;
     28     
     29     int semid = 0;
     30     sem_open(g_key, &semid);
     31 
     32 
     33      sem_p(semid); //临界区开始
     34     //
     35         ret = IPC_CreatShm(".", 0, &shmhdl);
     36         
     37         ret =IPC_MapShm(shmhdl, (void **)&addr);
     38         *((int *)addr) =  *((int *)addr)  + 1;
     39         ncount = *((int *)addr);
     40         printf("ncount:%d
    ", ncount);
     41         //addr[0] = addr[0] +1;
     42         ret =IPC_UnMapShm(addr);
     43         //sleep(2);
     44         
     45     sem_v(semid);  //临界区开始
     46     //
     47     printf("进程正常退出:%d
    ", getpid());
     48 }    
     49 
     50 
     51 
     52 //posix 线程库的函数 线程库
     53 void *thread_routine(void* arg)
     54 {
     55     printf("thread_routine start
    ");
     56     TestFunc(1);
     57     pthread_exit(0);
     58 
     59 } 
     60 
     61 int main(void )
     62 {
     63     int res;
     64     int procnum=10;
     65     int loopnum = 100;
     66     
     67     pthread_t tidArray[200];
     68 
     69 
     70     int  i=0,j = 0;
     71 
     72     printf("请输入要创建子进程的个数 : 
    ");
     73     scanf("%d", &procnum);
     74 
     75     printf("请输入让每个子进程测试多少次 :
    ");
     76     scanf("%d", &loopnum);
     77     
     78     //共享内存创建
     79     int        ret = 0;
     80     int     shmhdl = 0;
     81     ret = IPC_CreatShm(".", sizeof(int), &shmhdl);
     82     if (ret != 0)
     83     {
     84         printf("func IPC_CreatShm() err:%d 
    ", ret);
     85         return ret;
     86     }
     87     
     88     
     89     //信号量的创建
     90      int      semid = 0;
     91     ret = sem_creat(g_key, &semid);
     92     if (ret != 0)
     93     {
     94         printf("func sem_creat() err:%d,重新按照open打开信号量 
    ", ret);
     95         if (ret == SEMERR_EEXIST)
     96         {
     97             ret = sem_open(g_key, &semid);
     98             if (ret != 0)
     99             {
    100                 printf("按照打开的方式,重新获取sem失败:%d 
    ", ret);
    101                 return ret;
    102             }
    103         }
    104         else
    105         {
    106             return ret;
    107         }
    108         
    109     }
    110     
    111     int  val = 0;
    112     ret = sem_getval(semid, &val);
    113     if (ret != 0 )
    114     {
    115         printf("func sem_getval() err:%d 
    ", ret);
    116         return ret;
    117     }
    118     printf("sem val:%d
    ", val);
    119     getchar();
    120     
    121     for (i=0; i<procnum; i++)
    122     {
    123     
    124         //tmp.numId = i; //这说明 线程函数在使用了一个不断变化的内存空间。。。。
    125         pthread_create(&tidArray[i], NULL, thread_routine, NULL);
    126     }
    127     
    128     for (i=0; i<procnum; i++)
    129     {
    130         pthread_join(tidArray[i], NULL); //等待线程结束。。。
    131     }
    132 
    133 
    134     printf("父进程退出 hello...
    ");
    135     return 0;
    136 }

    多线程访问共享内存,通过线程锁实现:

      1 #include <sys/types.h>
      2 #include <unistd.h>
      3 
      4 #include <stdlib.h>
      5 #include <stdio.h>
      6 #include <string.h>
      7 
      8 #include <signal.h>
      9 #include <errno.h>
     10 #include <signal.h>
     11 #include <sys/wait.h>
     12 
     13 #include "myipc_sem.h"
     14 #include "myipc_shm.h"
     15 
     16 #include <pthread.h>
     17 
     18 pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
     19 
     20 int g_key = 0x3333;
     21 
     22 void TestFunc(int loopnum)
     23 {
     24     printf("loopnum:%d
    ", loopnum);
     25     
     26     int ncount = 0;
     27     int ret = 0;
     28     int shmhdl = 0;
     29     int *addr = NULL;
     30     
     31     int semid = 0;
     32     sem_open(g_key, &semid);
     33 
     34 
     35      sem_p(semid); //临界区开始
     36     //
     37         ret = IPC_CreatShm(".", 0, &shmhdl);
     38         
     39         ret =IPC_MapShm(shmhdl, (void **)&addr);
     40         *((int *)addr) =  *((int *)addr)  + 1;
     41         ncount = *((int *)addr);
     42         printf("ncount:%d
    ", ncount);
     43         //addr[0] = addr[0] +1;
     44         ret =IPC_UnMapShm(addr);
     45         //sleep(2);
     46         
     47     sem_v(semid);  //临界区开始
     48     //
     49     printf("进程正常退出:%d
    ", getpid());
     50 }    
     51 
     52 void TestFunc_threadMutex(int loopnum)
     53 {
     54     printf("loopnum:%d
    ", loopnum);
     55     
     56     int ncount = 0;
     57     int ret = 0;
     58     int shmhdl = 0;
     59     int *addr = NULL;
     60     
     61     int semid = 0;
     62     sem_open(g_key, &semid);
     63 
     64 
     65      //sem_p(semid); //临界区开始
     66      pthread_mutex_lock(&mymutex);
     67     //
     68         ret = IPC_CreatShm(".", 0, &shmhdl);
     69         
     70         ret =IPC_MapShm(shmhdl, (void **)&addr);
     71         *((int *)addr) =  *((int *)addr)  + 1;
     72         ncount = *((int *)addr);
     73         printf("ncount:%d
    ", ncount);
     74         //addr[0] = addr[0] +1;
     75         ret =IPC_UnMapShm(addr);
     76         //sleep(2);
     77         
     78     //sem_v(semid);  //临界区开始
     79     pthread_mutex_unlock(&mymutex);
     80     //
     81     printf("进程正常退出:%d
    ", getpid());
     82 }    
     83 
     84 
     85 
     86 
     87 //posix 线程库的函数 线程库
     88 void *thread_routine(void* arg)
     89 {
     90     printf("thread_routine start
    ");
     91     //TestFunc(1);
     92     TestFunc_threadMutex(1);
     93     pthread_exit(0);
     94 
     95 } 
     96 
     97 int main(void )
     98 {
     99     int res;
    100     int procnum=10;
    101     int loopnum = 100;
    102     
    103     pthread_t tidArray[1024*10];
    104 
    105 
    106     int  i=0,j = 0;
    107 
    108     printf("请输入要创建子进程的个数 : 
    ");
    109     scanf("%d", &procnum);
    110 
    111     printf("请输入让每个子进程测试多少次 :
    ");
    112     scanf("%d", &loopnum);
    113     
    114     //共享内存创建
    115     int        ret = 0;
    116     int     shmhdl = 0;
    117     ret = IPC_CreatShm(".", sizeof(int), &shmhdl);
    118     if (ret != 0)
    119     {
    120         printf("func IPC_CreatShm() err:%d 
    ", ret);
    121         return ret;
    122     }
    123     
    124     
    125     //信号量的创建
    126      int      semid = 0;
    127     ret = sem_creat(g_key, &semid);
    128     if (ret != 0)
    129     {
    130         printf("func sem_creat() err:%d,重新按照open打开信号量 
    ", ret);
    131         if (ret == SEMERR_EEXIST)
    132         {
    133             ret = sem_open(g_key, &semid);
    134             if (ret != 0)
    135             {
    136                 printf("按照打开的方式,重新获取sem失败:%d 
    ", ret);
    137                 return ret;
    138             }
    139         }
    140         else
    141         {
    142             return ret;
    143         }
    144         
    145     }
    146     
    147     int  val = 0;
    148     ret = sem_getval(semid, &val);
    149     if (ret != 0 )
    150     {
    151         printf("func sem_getval() err:%d 
    ", ret);
    152         return ret;
    153     }
    154     printf("sem val:%d
    ", val);
    155     getchar();
    156     
    157     for (i=0; i<procnum; i++)
    158     {
    159     
    160         //tmp.numId = i; //这说明 线程函数在使用了一个不断变化的内存空间。。。。
    161         pthread_create(&tidArray[i], NULL, thread_routine, NULL);
    162     }
    163     
    164     for (i=0; i<procnum; i++)
    165     {
    166         pthread_join(tidArray[i], NULL); //等待线程结束。。。
    167     }
    168 
    169 
    170     printf("父进程退出 hello...
    ");
    171     return 0;
    172 }

    服务器多进程模型、共享内存、信号量:

    信号量封装:

      1 #include <unistd.h>
      2 #include <sys/types.h>
      3 #include <sys/stat.h>
      4 #include <fcntl.h>
      5 #include <sys/mman.h>
      6 #include <sys/ipc.h>
      7 #include <sys/shm.h>
      8 #include <sys/sem.h>
      9 
     10 #include <stdlib.h>
     11 #include <stdio.h>
     12 #include <errno.h>
     13 #include <string.h>
     14 
     15 #include <sys/ipc.h>
     16 #include <sys/sem.h>
     17 #include "myipc_sem.h"
     18 
     19 
     20 union semun {
     21 int              val;    /* Value for SETVAL */
     22 struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
     23 unsigned short  *array;  /* Array for GETALL, SETALL */
     24 struct seminfo  *__buf;  /* Buffer for IPC_INFO
     25                            (Linux specific) */
     26 };
     27 
     28 //@返回值 0 正确 其他错误
     29 int sem_creat(int key, int *semid)
     30 {
     31     int     ret = 0;
     32     //int     tmpsemid = 0;
     33     
     34     if (semid == NULL)
     35     {
     36         ret = SEMERR_PARAM;
     37         printf("func sem_creat() err:%d
    ", ret);
     38         return ret;
     39     }
     40     ret = semget(key, 1, 0666| IPC_CREAT | IPC_EXCL); 
     41     if (ret == -1)
     42     {
     43             ret = errno;
     44             //perror("semget");
     45             if (errno == EEXIST) 
     46             {
     47                 ret = SEMERR_EEXIST;
     48                 printf("func sem_creat() 检测到信号量集已经存在:%d
    ", ret);
     49                 return ret;
     50             }
     51     }
     52     *semid = ret;
     53     
     54     ret = sem_setval(*semid, 1);
     55     if (ret != 0)
     56     {
     57         printf("func sem_setval() err:%d
    ", ret);
     58         return ret;
     59     }
     60     ret = 0;
     61     return ret;
     62 }   
     63 
     64 int sem_open(int key, int *semid)
     65 {
     66     int ret = 0;
     67     
     68     if (semid == NULL)
     69     {
     70         ret = SEMERR_PARAM;
     71         printf("func sem_open() err:%d
    ", ret);
     72         return ret;
     73     }
     74     
     75     ret = semget(key, 0, 0); 
     76     if (ret == -1)
     77     {
     78             ret = errno;
     79             printf("func sem_open() 失败:%d
    ", ret);
     80             return ret;    
     81     }
     82     *semid = ret;
     83     ret = 0;
     84     return ret;
     85 }  
     86 
     87 int sem_setval(int semid, int val)
     88 {
     89     int ret = 0;
     90     union semun su;
     91     su.val = val;
     92     ret = semctl(semid, 0,  SETVAL, su);
     93     return ret;
     94 }
     95 
     96 /*
     97 int sem_getval(int semid, int *val)
     98 {
     99     int ret = 0;
    100     int tmpval;
    101     if (val == NULL)
    102     {
    103         ret = SEMERR_PARAM;
    104         printf("func sem_getval() err:%d
    ", ret);
    105         return ret;
    106     }
    107     union semun su;
    108     tmpval = su.val ;
    109     ret = semctl(semid, 0, GETVAL, su);
    110     *val = tmpval  ;
    111     printf("val:%d
    ", tmpval);
    112     return ret;
    113 }
    114 */
    115 int sem_getval(int semid, int *myval)
    116 {
    117     int ret = 0;
    118     int val;
    119     union semun su;
    120     val = su.val ;
    121     //信号量 计数值
    122     ret = semctl(semid, 0, GETVAL, su);
    123     //printf("val:%d
    ", val);
    124     
    125     *myval = ret;
    126     ret = 0;
    127     return ret;
    128 }
    129 
    130 //信号量p操作时候,需要传递好几个信息给linux内核
    131 //所以linux内核定义了一个结构
    132 //我要操作信号量集的下标 0
    133 //我要执行什么操作 -1 +1
    134 //我按照什么策略执行操作 0  UNDO NOWAITing
    135 int sem_p(int semid)
    136 {
    137     struct sembuf buf = {0, -1, 0};
    138     int ret = 0;
    139     ret = semop(semid, &buf, 1);
    140     return ret;
    141 }
    142 
    143 int sem_v(int semid)
    144 {
    145     struct sembuf buf = {0, 1, 0};
    146     int ret = 0;
    147     ret = semop(semid, &buf, 1);
    148     return ret;
    149 }
    150 
    151 
    152 /*
    153 int main()
    154 {
    155     
    156     //IPC_CREAT and IPC_EXCL a
    157     int semid;
    158     semid  = sem_creat(0x1111);
    159     //sem_setval(semid, 1);
    160     //sem_getval(semid);
    161     
    162     
    163     int pid = 0;
    164     pid = fork();
    165     
    166 
    167     //sem_p(semid);
    168         int i=0;
    169     //    printf("i:%d pid:%d 
    ", i++, getpid());
    170     //    sleep(3);
    171     //    printf("i:%d pid:%d 
    ", i++, getpid());
    172         
    173     //sem_v(semid);
    174     
    175     
    176     
    177     return 0;
    178 }
    179 */

    共享内存封装:

     1 #define    _OS_LINUX_
     2 
     3 #if defined _OS_LINUX_
     4 #include <stdio.h>
     5 #include <errno.h>
     6 #include <unistd.h>
     7 #include <memory.h>
     8 #include <sys/ipc.h>
     9 #include <sys/shm.h>
    10 #include <sys/sem.h>
    11 #include <sys/msg.h>
    12 #include "myipc_shm.h" 
    13 
    14 #endif
    15 
    16 int shmflag = 0;
    17 int shmkey;
    18 
    19 /***********************************************************************
    20   功能描述:    创建共享内存
    21   参数说明:    shmname  [in]  是共享内存名,系统中唯一标志
    22                 shmsize  [in]  是要创建的共享内存的大小;
    23                 shmhdl   [out] 共享内存的句柄.
    24   返回值:      返回0函数执行成功;非0返回错误码
    25 ************************************************************************/
    26 int IPC_CreatShm(char *shmseedfile, int shmsize, int *shmhdl)
    27 {
    28     if(shmflag == 0)            //判断接口中共享内存key是否已经存在
    29     {
    30         shmkey = ftok(shmseedfile, 'c');
    31         if (shmkey == -1)
    32         {
    33             perror("ftok");
    34             return -1;
    35         }
    36             
    37         shmflag = 1;
    38     }
    39     
    40     //创建共享内存
    41     *shmhdl = shmget(shmkey,shmsize,IPC_CREAT|0666);
    42     if (*shmhdl == -1)            //创建失败
    43         return -2;
    44     return 0;
    45 
    46 }
    47 /***********************************************************************
    48   功能描述:    关联共享内存
    49   参数说明:    shmhdl    [in]  共享的句柄
    50                 mapaddr [out] 共享内存首地址
    51   返回值:      返回0函数执行成功;非0返回错误码
    52 ************************************************************************/
    53 int
    54 IPC_MapShm(int  shmhdl, void  **mapaddr)
    55 {
    56     void *tempptr = NULL;
    57 
    58     //连接共享内存
    59     tempptr = (void *)shmat(shmhdl,0,SHM_RND);
    60     if ((int)tempptr == -1)        //共享内存连接失败
    61         return -1;
    62     *mapaddr = tempptr;            //导出共享内存首指针
    63 
    64     return 0;
    65 }
    66 /***********************************************************************
    67   功能描述:    取消共享内存关联
    68   参数说明:    unmapaddr   [in] 共享内存首地址
    69   返回值:      返回0函数执行成功;非0返回错误码
    70 ************************************************************************/
    71 int IPC_UnMapShm(void *unmapaddr)
    72 {
    73     int  rv;
    74     //取消连接共享内存 
    75     rv = shmdt((char *)unmapaddr);
    76     if (rv == -1)            //取消连接失败
    77         return -1;
    78 
    79     return 0;
    80 }
    81 /***********************************************************************
    82   功能描述:    删除共享内存
    83   参数说明:    shmhdl    [in]  共享的句柄
    84   返回值:      返回0函数执行成功;非0返回错误码
    85 ************************************************************************/
    86 int IPC_DelShm(int shmhdl)
    87 {
    88     int  rv;
    89     //删除共享内存
    90     rv = shmctl(shmhdl,IPC_RMID,NULL);
    91     if(rv < 0)                //删除共享内存失败
    92         return -1;
    93     return 0;
    94 }

    客户端:

      1 #include <unistd.h>
      2 #include <sys/types.h>
      3 //#include <sys/socket.h>
      4 //#include <netinet/in.h>
      5 //#include <arpa/inet.h>
      6 #include <signal.h>
      7 #include <sys/wait.h>
      8 //#include "sckutil.h"
      9 
     10 #include <stdlib.h>
     11 #include <stdio.h>
     12 #include <errno.h>
     13 #include <string.h>
     14 
     15 #include "commsocket.h"
     16 
     17 void handle(int signum)
     18 {
     19     int pid = 0;
     20     printf("recv signum:%d 
    ", signum);
     21     
     22     //避免僵尸进程
     23     while ((pid = waitpid(-1, NULL, WNOHANG) ) > 0)
     24     {
     25         printf("退出子进程pid%d 
    ", pid);
     26         fflush(stdout);
     27     } 
     28 }
     29 
     30 
     31 
     32 int main()
     33 {    
     34     int         ret = 0;
     35     void         *handle = NULL;
     36     //void         handle = NULL;
     37     int         connfd;
     38     int                 i = 0, j = 0;
     39     
     40     signal(SIGCHLD, handle);
     41     signal(SIGPIPE, SIG_IGN); //防止 sokcet破裂
     42     
     43     int procnum=10;
     44     int loopnum = 100;
     45     
     46     pthread_t tidArray[1024*2];
     47     
     48 
     49 
     50     printf("请输入要创建子进程的个数 : 
    ");
     51     scanf("%d", &procnum);
     52 
     53     printf("请输入让每个子进程测试多少次 :
    ");
     54     scanf("%d", &loopnum);
     55     
     56         
     57      unsigned char *data = (unsigned char *)"aaaaaafffffffffffssssss";
     58      int         datalen = 10;
     59      
     60      unsigned char out[1024];
     61      int outlen = 1024;
     62     //客户端环境初始化
     63     
     64     ret = sckCliet_init(&handle, 15, 5, 5, 10);
     65 
     66     for (i=0; i<procnum; i++)
     67     {
     68         int pid = fork();
     69         if (pid == 0)
     70         {
     71             ret = sckCliet_getconn(handle, "127.0.0.1", 8001, &connfd);
     72             for (j=0; j<loopnum; j++)
     73             {
     74                 //客户端发送报文
     75                 ret = sckClient_send(handle,  connfd, data, datalen);
     76                 if (ret != 0)
     77                 {
     78                     if (ret == Sck_ErrTimeOut)
     79                     {
     80                         continue;
     81                     }
     82                     break;
     83                 }
     84                 //printf("ccccccc
    ");
     85                     //客户端端接受报文
     86                 ret = sckClient_rev(handle, connfd, out, &outlen); //10
     87                 if (ret != 0)
     88                 {
     89                     if (ret == Sck_ErrTimeOut)
     90                     {
     91                         break; 
     92                     }
     93                     break;
     94                 }
     95                 out[outlen] = '';
     96             
     97                 printf("out: %s 
    ", out);
     98             }
     99             
    100             close(connfd);
    101             exit(0); //让子进程退出,不参与fork
    102         }
    103     }
    104     
    105 
    106     // 客户端环境释放 
    107     ret = sckClient_destroy(handle);
    108     
    109     //避免僵尸进程
    110     int mypid;
    111     while ((mypid = waitpid(-1, NULL, WNOHANG) ) > 0)
    112     {
    113         printf("退出子进程pid%d 
    ", mypid);
    114         fflush(stdout);
    115     } 
    116 
    117 }

    服务器:

      1 #include <unistd.h>
      2 #include <sys/types.h>
      3 #include <signal.h>
      4 #include <sys/wait.h>
      5 
      6 
      7 #include <stdlib.h>
      8 #include <stdio.h>
      9 #include <errno.h>
     10 #include <string.h>
     11 #include "commsocket.h"
     12 
     13 #include "myipc_sem.h"
     14 #include "myipc_shm.h"
     15 int g_key = 0x3333;
     16 
     17 #define ERR_EXIT(m) 
     18         do 
     19         { 
     20                 perror(m); 
     21                 exit(EXIT_FAILURE); 
     22         } while(0)
     23         
     24         
     25 #include <signal.h>
     26 
     27 void handle(int signum)
     28 {
     29     int pid = 0;
     30     printf("recv signum:%d 
    ", signum);
     31     
     32     //避免僵尸进程
     33     while ((pid = waitpid(-1, NULL, WNOHANG) ) > 0)
     34     {
     35         printf("退出子进程pid%d 
    ", pid);
     36         fflush(stdout);
     37     } 
     38 }
     39 
     40 int srvInit()
     41 {
     42     //共享内存创建
     43     int        ret = 0;
     44     int     shmhdl = 0;
     45     ret = IPC_CreatShm(".", sizeof(int), &shmhdl);
     46     if (ret != 0)
     47     {
     48         printf("func IPC_CreatShm() err:%d 
    ", ret);
     49         return ret;
     50     }
     51     
     52     
     53     //信号量的创建
     54      int      semid = 0;
     55     ret = sem_creat(g_key, &semid);
     56     if (ret != 0)
     57     {
     58         printf("func sem_creat() err:%d,重新按照open打开信号量 
    ", ret);
     59         if (ret == SEMERR_EEXIST)
     60         {
     61             ret = sem_open(g_key, &semid);
     62             if (ret != 0)
     63             {
     64                 printf("按照打开的方式,重新获取sem失败:%d 
    ", ret);
     65                 return ret;
     66             }
     67         }
     68         else
     69         {
     70             return ret;
     71         }
     72         
     73     }
     74     
     75     int  val = 0;
     76     ret = sem_getval(semid, &val);
     77     if (ret != 0 )
     78     {
     79         printf("func sem_getval() err:%d 
    ", ret);
     80         return ret;
     81     }
     82     printf("sem val:%d
    ", val);
     83     
     84 }
     85 
     86 
     87 void TestFunc()
     88 {
     89     int ncount = 0;
     90     int ret = 0;
     91     int shmhdl = 0;
     92     int *addr = NULL;
     93     
     94     int semid = 0;
     95     sem_open(g_key, &semid);
     96 
     97 
     98      sem_p(semid); //临界区开始
     99     //
    100         ret = IPC_CreatShm(".", 0, &shmhdl);
    101         
    102         ret =IPC_MapShm(shmhdl, (void **)&addr);
    103         *((int *)addr) =  *((int *)addr)  + 1;
    104         ncount = *((int *)addr);
    105         printf("ncount:%d
    ", ncount);
    106         //addr[0] = addr[0] +1;
    107         ret =IPC_UnMapShm(addr);
    108         //sleep(2);
    109         
    110     sem_v(semid);  //临界区开始
    111     //
    112     //printf("进程正常退出:%d
    ", getpid());
    113 }    
    114 
    115 int main(void)
    116 {
    117     int        ret = 0;
    118     int     listenfd;
    119     
    120     
    121     signal(SIGCHLD, handle);
    122     signal(SIGPIPE, SIG_IGN);
    123     
    124     //完成共享内存和信号量的创建
    125     srvInit();
    126     //getchar();
    127 
    128     ret = sckServer_init(8001, &listenfd);
    129     if (ret != 0)
    130     {
    131         printf("sckServer_init() err:%d 
    ", ret);
    132         return ret;
    133     }
    134     
    135     while(1)
    136     {
    137         int ret = 0;
    138         int wait_seconds = 5;
    139         int connfd = 0;
    140 
    141         ret = sckServer_accept(listenfd, &connfd,  wait_seconds);
    142         if (ret == Sck_ErrTimeOut)
    143         {
    144             //printf("timeout....
    ");
    145             continue;
    146         }
    147     
    148         int pid = fork();
    149         if (pid == 0)
    150         {
    151             unsigned char     recvbuf[1024];
    152             int     recvbuflen = 1024;
    153                 
    154             close(listenfd);
    155             while(1)
    156             {
    157                 memset(recvbuf, 0, sizeof(recvbuf));
    158                 ret = sckServer_rev(connfd, recvbuf, &recvbuflen,  wait_seconds);
    159                 if (ret != 0)
    160                 {
    161                     printf("func sckServer_rev() err:%d 
    ", ret);
    162                     break;
    163                 }
    164                 
    165                 TestFunc();
    166                 printf("recvbuf:%s
    ", recvbuf);
    167                 
    168                 //
    169                 ret = sckServer_send(connfd,  recvbuf, recvbuflen, wait_seconds);
    170                 if (ret != 0)
    171                 {
    172                     printf("func sckServer_send() err:%d 
    ", ret);
    173                     break;
    174                 }
    175             }
    176             close(connfd);
    177             exit(ret);
    178         }
    179         else if (pid > 0)
    180         {
    181             close(connfd);
    182         }
    183     }
    184     
    185 
    186     return 0;
    187 }

    服务器多线程框架:

    客户端连接封装:

      1 #include <unistd.h>
      2 #include <sys/types.h>
      3 #include <sys/socket.h>
      4 #include <netinet/in.h>
      5 #include <arpa/inet.h>
      6 #include <signal.h>
      7 #include <sys/wait.h>
      8 
      9 #include <fcntl.h>
     10 
     11 
     12 #include <stdlib.h>
     13 #include <stdio.h>
     14 #include <errno.h>
     15 #include <string.h>
     16 
     17 #include "commsocket.h"
     18 
     19 typedef struct _SckHandle
     20 {
     21     int sockArray[100];
     22     int arrayNum;
     23     int sockfd;
     24     int contime;
     25     int sendtime;
     26     int revtime;
     27 
     28 }SckHandle;
     29 
     30 /**
     31  * readn - 读取固定字节数
     32  * @fd: 文件描述符
     33  * @buf: 接收缓冲区
     34  * @count: 要读取的字节数
     35  * 成功返回count,失败返回-1,读到EOF返回<count
     36  */
     37 ssize_t readn(int fd, void *buf, size_t count)
     38 {
     39     size_t nleft = count;
     40     ssize_t nread;
     41     char *bufp = (char*)buf;
     42 
     43     while (nleft > 0)
     44     {
     45         if ((nread = read(fd, bufp, nleft)) < 0)
     46         {
     47             if (errno == EINTR)
     48                 continue;
     49             return -1;
     50         }
     51         else if (nread == 0)
     52             return count - nleft;
     53 
     54         bufp += nread;
     55         nleft -= nread;
     56     }
     57 
     58     return count;
     59 }
     60 
     61 /**
     62  * writen - 发送固定字节数
     63  * @fd: 文件描述符
     64  * @buf: 发送缓冲区
     65  * @count: 要读取的字节数
     66  * 成功返回count,失败返回-1
     67  */
     68 ssize_t writen(int fd, const void *buf, size_t count)
     69 {
     70     size_t nleft = count;
     71     ssize_t nwritten;
     72     char *bufp = (char*)buf;
     73 
     74     while (nleft > 0)
     75     {
     76         if ((nwritten = write(fd, bufp, nleft)) < 0)
     77         {
     78             if (errno == EINTR)
     79                 continue;
     80             return -1;
     81         }
     82         else if (nwritten == 0)
     83             continue;
     84 
     85         bufp += nwritten;
     86         nleft -= nwritten;
     87     }
     88 
     89     return count;
     90 }
     91 
     92 /**
     93  * recv_peek - 仅仅查看套接字缓冲区数据,但不移除数据
     94  * @sockfd: 套接字
     95  * @buf: 接收缓冲区
     96  * @len: 长度
     97  * 成功返回>=0,失败返回-1
     98  */
     99 ssize_t recv_peek(int sockfd, void *buf, size_t len)
    100 {
    101     while (1)
    102     {
    103         int ret = recv(sockfd, buf, len, MSG_PEEK);
    104         if (ret == -1 && errno == EINTR)
    105             continue;
    106         return ret;
    107     }
    108 }
    109 
    110 
    111 //函数声明
    112 //客户端环境初始化
    113 int sckCliet_init(void **handle, int contime, int sendtime, int revtime, int nConNum)
    114 {
    115     int     ret = 0;
    116     if (handle == NULL ||contime<0 || sendtime<0 || revtime<0)
    117     {
    118         ret = Sck_ErrParam;
    119         printf("func sckCliet_init() err: %d, check  (handle == NULL ||contime<0 || sendtime<0 || revtime<0)
    ", ret);
    120         return ret;
    121     }
    122     
    123     SckHandle *tmp = (SckHandle *)malloc(sizeof(SckHandle));
    124     if (tmp == NULL)
    125     {
    126         ret = Sck_ErrMalloc;
    127         printf("func sckCliet_init() err: malloc %d
    ", ret);
    128         return ret;
    129     }
    130     
    131     tmp->contime = contime;
    132     tmp->sendtime = sendtime;
    133     tmp->revtime = revtime;
    134     tmp->arrayNum = nConNum;
    135         
    136         
    137     /*
    138     int sockfd;
    139     int i = 0;
    140     for (i=0; i<1; i++)
    141     {
    142         //链表的顺序
    143         sockfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    144         if (sockfd < 0)
    145         {
    146             ret = errno;
    147             printf("func socket() err:  %d
    ", ret);
    148             return ret;
    149         }
    150         tmp->sockfd = sockfd;
    151     }
    152     */
    153 
    154     *handle = tmp; 
    155     return ret;
    156 }
    157 
    158 /**
    159  * activate_noblock - 设置I/O为非阻塞模式
    160  * @fd: 文件描符符
    161  */
    162 int activate_nonblock(int fd)
    163 {
    164     int ret = 0;
    165     int flags = fcntl(fd, F_GETFL);
    166     if (flags == -1)
    167     {
    168         ret = flags;
    169         printf("func activate_nonblock() err:%d", ret);
    170         return ret;
    171     }
    172         
    173 
    174     flags |= O_NONBLOCK;
    175     ret = fcntl(fd, F_SETFL, flags);
    176     if (ret == -1)
    177     {
    178         printf("func activate_nonblock() err:%d", ret);
    179         return ret;
    180     }
    181     return ret;
    182 }
    183 
    184 /**
    185  * deactivate_nonblock - 设置I/O为阻塞模式
    186  * @fd: 文件描符符
    187  */
    188 int deactivate_nonblock(int fd)
    189 {
    190     int ret = 0;
    191     int flags = fcntl(fd, F_GETFL);
    192     if (flags == -1)
    193     {
    194         ret = flags;
    195         printf("func deactivate_nonblock() err:%d", ret);
    196         return ret;
    197     }
    198 
    199     flags &= ~O_NONBLOCK;
    200     ret = fcntl(fd, F_SETFL, flags);
    201     if (ret == -1)
    202     {
    203         printf("func deactivate_nonblock() err:%d", ret);
    204         return ret;
    205     }
    206     return ret;
    207 }
    208 
    209 
    210 
    211 /**
    212  * connect_timeout - connect
    213  * @fd: 套接字
    214  * @addr: 要连接的对方地址
    215  * @wait_seconds: 等待超时秒数,如果为0表示正常模式
    216  * 成功(未超时)返回0,失败返回-1,超时返回-1并且errno = ETIMEDOUT
    217  */
    218 static int connect_timeout(int fd, struct sockaddr_in *addr, unsigned int wait_seconds)
    219 {
    220     int ret;
    221     socklen_t addrlen = sizeof(struct sockaddr_in);
    222 
    223     if (wait_seconds > 0)
    224         activate_nonblock(fd);
    225 
    226     ret = connect(fd, (struct sockaddr*)addr, addrlen);
    227     if (ret < 0 && errno == EINPROGRESS)
    228     {
    229         //printf("11111111111111111111
    ");
    230         fd_set connect_fdset;
    231         struct timeval timeout;
    232         FD_ZERO(&connect_fdset);
    233         FD_SET(fd, &connect_fdset);
    234         timeout.tv_sec = wait_seconds;
    235         timeout.tv_usec = 0;
    236         do
    237         {
    238             // 一但连接建立,则套接字就可写  所以connect_fdset放在了写集合中
    239             ret = select(fd + 1, NULL, &connect_fdset, NULL, &timeout);
    240         } while (ret < 0 && errno == EINTR);
    241         if (ret == 0)
    242         {
    243             ret = -1;
    244             errno = ETIMEDOUT;
    245         }
    246         else if (ret < 0)
    247             return -1;
    248         else if (ret == 1)
    249         {
    250             //printf("22222222222222222
    ");
    251             /* ret返回为1(表示套接字可写),可能有两种情况,一种是连接建立成功,一种是套接字产生错误,*/
    252             /* 此时错误信息不会保存至errno变量中,因此,需要调用getsockopt来获取。 */
    253             int err;
    254             socklen_t socklen = sizeof(err);
    255             int sockoptret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &socklen);
    256             if (sockoptret == -1)
    257             {
    258                 return -1;
    259             }
    260             if (err == 0)
    261             {
    262                 //printf("3333333333333
    ");
    263                 ret = 0;
    264             }
    265             else
    266             {
    267                 //printf("4444444444444444:%d
    ", err);
    268                 errno = err;
    269                 ret = -1;
    270             }
    271         }
    272     }
    273     if (wait_seconds > 0)
    274     {
    275         deactivate_nonblock(fd);
    276     }
    277     return ret;
    278 }
    279 
    280 
    281 
    282 //
    283 int sckCliet_getconn(void *handle, char *ip, int port, int *connfd)
    284 {
    285     
    286     int ret = 0;
    287     SckHandle  *tmp = NULL;
    288     if (handle == NULL || ip==NULL || connfd==NULL || port<0 || port>65537)
    289     {
    290         ret = Sck_ErrParam;
    291         printf("func sckCliet_getconn() err: %d, check  (handle == NULL || ip==NULL || connfd==NULL || port<0 || port>65537) 
    ", ret);
    292         return ret;
    293     }
    294     
    295     //
    296     int sockfd;
    297     sockfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    298     if (sockfd < 0)
    299     {
    300         ret = errno;
    301         printf("func socket() err:  %d
    ", ret);
    302         return ret;
    303     }
    304     
    305     struct sockaddr_in servaddr;
    306     memset(&servaddr, 0, sizeof(servaddr));
    307     servaddr.sin_family = AF_INET;
    308     servaddr.sin_port = htons(port);
    309     servaddr.sin_addr.s_addr = inet_addr(ip);
    310     
    311     tmp = (SckHandle* )handle;
    312 
    313     /*
    314     ret = connect(sockfd, (struct sockaddr*) (&servaddr), sizeof(servaddr));
    315     if (ret < 0)
    316     {
    317         ret = errno;
    318         printf("func connect() err:  %d
    ", ret);
    319         return ret;
    320     }
    321     */
    322     
    323     ret = connect_timeout(sockfd, (struct sockaddr_in*) (&servaddr), (unsigned int )tmp->contime);
    324     if (ret < 0)
    325     {
    326         if (ret==-1 && errno == ETIMEDOUT)
    327         {
    328             ret = Sck_ErrTimeOut;
    329             return ret;
    330         }
    331         else
    332         {
    333             printf("func connect_timeout() err:  %d
    ", ret);
    334         }
    335     }
    336     
    337     *connfd = sockfd;
    338    
    339        return ret;
    340     
    341 }
    342 
    343 
    344 
    345 /**
    346  * write_timeout - 写超时检测函数,不含写操作
    347  * @fd: 文件描述符
    348  * @wait_seconds: 等待超时秒数,如果为0表示不检测超时
    349  * 成功(未超时)返回0,失败返回-1,超时返回-1并且errno = ETIMEDOUT
    350  */
    351 int write_timeout(int fd, unsigned int wait_seconds)
    352 {
    353     int ret = 0;
    354     if (wait_seconds > 0)
    355     {
    356         fd_set write_fdset;
    357         struct timeval timeout;
    358 
    359         FD_ZERO(&write_fdset);
    360         FD_SET(fd, &write_fdset);
    361 
    362         timeout.tv_sec = wait_seconds;
    363         timeout.tv_usec = 0;
    364         do
    365         {
    366             ret = select(fd + 1, NULL, &write_fdset, NULL, &timeout);
    367         } while (ret < 0 && errno == EINTR);
    368 
    369         if (ret == 0)
    370         {
    371             ret = -1;
    372             errno = ETIMEDOUT;
    373         }
    374         else if (ret == 1)
    375             ret = 0;
    376     }
    377 
    378     return ret;
    379 }
    380 
    381 
    382 //客户端发送报文
    383 int sckClient_send(void *handle, int  connfd,  unsigned char *data, int datalen)
    384 {
    385     int     ret = 0;
    386     
    387     SckHandle  *tmp = NULL;
    388     tmp = (SckHandle *)handle;
    389     ret = write_timeout(connfd, tmp->sendtime);
    390     if (ret == 0)
    391     {
    392         int writed = 0;
    393         unsigned char *netdata = ( unsigned char *)malloc(datalen + 4);
    394         if ( netdata == NULL)
    395         {
    396             ret = Sck_ErrMalloc;
    397             printf("func sckClient_send() mlloc Err:%d
     ", ret);
    398             return ret;
    399         }
    400         int netlen = htonl(datalen);
    401         memcpy(netdata, &netlen, 4);
    402         memcpy(netdata+4, data, datalen);
    403         
    404         writed = writen(connfd, netdata, datalen + 4);
    405         if (writed < (datalen + 4) )
    406         {
    407             if (netdata != NULL) 
    408             {
    409                 free(netdata);
    410                 netdata = NULL;
    411             }
    412             return writed;
    413         }
    414           
    415     }
    416     
    417     if (ret < 0)
    418     {
    419         //失败返回-1,超时返回-1并且errno = ETIMEDOUT
    420         if (ret == -1 && errno == ETIMEDOUT)
    421         {
    422             ret = Sck_ErrTimeOut;
    423             printf("func sckClient_send() mlloc Err:%d
     ", ret);
    424             return ret;
    425         }
    426         return ret;
    427     }
    428     
    429     return ret;
    430 }
    431 
    432 
    433 
    434 /**
    435  * read_timeout - 读超时检测函数,不含读操作
    436  * @fd: 文件描述符
    437  * @wait_seconds: 等待超时秒数,如果为0表示不检测超时
    438  * 成功(未超时)返回0,失败返回-1,超时返回-1并且errno = ETIMEDOUT
    439  */
    440 int read_timeout(int fd, unsigned int wait_seconds)
    441 {
    442     int ret = 0;
    443     if (wait_seconds > 0)
    444     {
    445         fd_set read_fdset;
    446         struct timeval timeout;
    447 
    448         FD_ZERO(&read_fdset);
    449         FD_SET(fd, &read_fdset);
    450 
    451         timeout.tv_sec = wait_seconds;
    452         timeout.tv_usec = 0;
    453         
    454         //select返回值三态
    455         //1 若timeout时间到(超时),没有检测到读事件 ret返回=0
    456         //2 若ret返回<0 &&  errno == EINTR 说明select的过程中被别的信号中断(可中断睡眠原理)
    457         //2-1 若返回-1,select出错
    458         //3 若ret返回值>0 表示有read事件发生,返回事件发生的个数
    459         
    460         do
    461         {
    462             ret = select(fd + 1, &read_fdset, NULL, NULL, &timeout);
    463         } while (ret < 0 && errno == EINTR); 
    464 
    465         if (ret == 0)
    466         {
    467             ret = -1;
    468             errno = ETIMEDOUT;
    469         }
    470         else if (ret == 1)
    471             ret = 0;
    472     }
    473 
    474     return ret;
    475 }
    476 
    477 //客户端端接受报文
    478 int sckClient_rev(void *handle,  int  connfd, unsigned char *out, int *outlen)
    479 {
    480     
    481     int        ret = 0;
    482     SckHandle *tmpHandle = (SckHandle *)handle;
    483     
    484     if (handle==NULL || out==NULL)
    485     {
    486         ret = Sck_ErrParam;
    487         printf("func sckClient_rev() timeout , err:%d 
    ", Sck_ErrTimeOut);
    488         return ret;
    489     }
    490     
    491     ret =  read_timeout(connfd, tmpHandle->revtime ); //bugs modify bombing
    492     if (ret != 0)
    493     {
    494         if (ret==-1 || errno == ETIMEDOUT)
    495         {
    496             ret = Sck_ErrTimeOut;
    497             printf("func sckClient_rev() timeout , err:%d 
    ", Sck_ErrTimeOut);
    498             return ret;
    499         }
    500         else
    501         {
    502             printf("func sckClient_rev() timeout , err:%d 
    ", Sck_ErrTimeOut);
    503             return ret;
    504         }    
    505     }
    506     
    507     int netdatalen = 0;
    508     ret = readn(connfd, &netdatalen,  4); //读包头 4个字节
    509     if (ret == -1)
    510     {
    511         printf("func readn() err:%d 
    ", ret);
    512         return ret;
    513     }
    514     else if (ret < 4)
    515     {
    516         ret = Sck_ErrPeerClosed;
    517         printf("func readn() err peer closed:%d 
    ", ret);
    518         return ret;
    519     }
    520     
    521     int n;
    522     n = ntohl(netdatalen);
    523     ret = readn(connfd, out, n); //根据长度读数据
    524     if (ret == -1)
    525     {
    526         printf("func readn() err:%d 
    ", ret);
    527         return ret;
    528     }
    529     else if (ret < n)
    530     {
    531         ret = Sck_ErrPeerClosed;
    532         printf("func readn() err peer closed:%d 
    ", ret);
    533         return ret;
    534     }
    535     
    536     *outlen = n;
    537     
    538     return 0;
    539 }
    540 
    541 // 客户端环境释放 
    542 int sckClient_destroy(void *handle)
    543 {
    544     if (handle != NULL)
    545     {
    546         free(handle);
    547     }
    548     return 0;
    549 }
    550 
    551 int sckCliet_closeconn(int connfd)
    552 {
    553     if (connfd >=0 )
    554     {
    555         close(connfd);
    556     }
    557     return 0;
    558 }
    559 
    560 
    561 
    562 /////////////////////////////////////////////////////////////////////////////////////
    563 //函数声明
    564 //服务器端初始化
    565 int sckServer_init(int port, int *listenfd)
    566 {
    567     int     ret = 0;
    568     int mylistenfd;
    569     struct sockaddr_in servaddr;
    570     memset(&servaddr, 0, sizeof(servaddr));
    571     servaddr.sin_family = AF_INET;
    572     servaddr.sin_port = htons(port);
    573     servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    574     
    575         
    576     mylistenfd = socket(PF_INET, SOCK_STREAM, 0);
    577     if (mylistenfd < 0)
    578     {
    579         ret = errno ;
    580         printf("func socket() err:%d 
    ", ret);
    581         return ret;
    582     }
    583         
    584 
    585     int on = 1;
    586     ret = setsockopt(mylistenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) );
    587     if (ret < 0)
    588     {
    589         ret = errno ;
    590         printf("func setsockopt() err:%d 
    ", ret);
    591         return ret;
    592     }
    593     
    594 
    595     ret = bind(mylistenfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
    596     if (ret < 0)
    597     {
    598         ret = errno ;
    599         printf("func bind() err:%d 
    ", ret);
    600         return ret;
    601     }
    602         
    603     ret = listen(mylistenfd, SOMAXCONN);
    604     if (ret < 0)
    605     {
    606         ret = errno ;
    607         printf("func listen() err:%d 
    ", ret);
    608         return ret;
    609     }
    610         
    611     *listenfd = mylistenfd;
    612 
    613     return 0;
    614 }
    615 
    616 /**
    617  * accept_timeout - 带超时的accept
    618  * @fd: 套接字
    619  * @addr: 输出参数,返回对方地址
    620  * @wait_seconds: 等待超时秒数,如果为0表示正常模式
    621  * 成功(未超时)返回已连接套接字,超时返回-1并且errno = ETIMEDOUT
    622  */
    623 int accept_timeout(int fd, struct sockaddr_in *addr, unsigned int wait_seconds)
    624 {
    625     int ret;
    626     socklen_t addrlen = sizeof(struct sockaddr_in);
    627 
    628     if (wait_seconds > 0)
    629     {
    630         fd_set accept_fdset;
    631         struct timeval timeout;
    632         FD_ZERO(&accept_fdset);
    633         FD_SET(fd, &accept_fdset);
    634         timeout.tv_sec = wait_seconds;
    635         timeout.tv_usec = 0;
    636         do
    637         {
    638             ret = select(fd + 1, &accept_fdset, NULL, NULL, &timeout);
    639         } while (ret < 0 && errno == EINTR);
    640         if (ret == -1)
    641             return -1;
    642         else if (ret == 0)
    643         {
    644             errno = ETIMEDOUT;
    645             return -1;
    646         }
    647     }
    648 
    649     //一但检测出 有select事件发生,表示对等方完成了三次握手,客户端有新连接建立
    650     //此时再调用accept将不会堵塞
    651     if (addr != NULL)
    652         ret = accept(fd, (struct sockaddr*)addr, &addrlen); //返回已连接套接字
    653     else
    654         ret = accept(fd, NULL, NULL);
    655         if (ret == -1)
    656         {
    657             ret = errno;
    658             printf("func accept() err:%d 
    ", ret);
    659             return ret;
    660         }
    661     
    662 
    663     return ret;
    664 }
    665 
    666 int sckServer_accept(int listenfd, int *connfd,  int timeout)
    667 {
    668     int    ret = 0;
    669     
    670     ret = accept_timeout(listenfd, NULL, (unsigned int) timeout);
    671     if (ret < 0)
    672     {
    673         if (ret == -1 && errno == ETIMEDOUT)
    674         {
    675             ret = Sck_ErrTimeOut;
    676             //printf("func accept_timeout() timeout err:%d 
    ", ret);
    677             return ret;
    678         }
    679         else
    680         {
    681             ret = errno;
    682             printf("func accept_timeout() err:%d 
    ", ret);
    683             return ret;
    684         }
    685     }
    686     
    687     *connfd = ret;
    688     return 0;
    689 }
    690 //服务器端发送报文
    691 int sckServer_send(int connfd,  unsigned char *data, int datalen, int timeout)
    692 {
    693     int     ret = 0;
    694     
    695     ret = write_timeout(connfd, timeout);
    696     if (ret == 0)
    697     {
    698         int writed = 0;
    699         unsigned char *netdata = ( unsigned char *)malloc(datalen + 4);
    700         if ( netdata == NULL)
    701         {
    702             ret = Sck_ErrMalloc;
    703             printf("func sckServer_send() mlloc Err:%d
     ", ret);
    704             return ret;
    705         }
    706         int netlen = htonl(datalen);
    707         memcpy(netdata, &netlen, 4);
    708         memcpy(netdata+4, data, datalen);
    709         
    710         writed = writen(connfd, netdata, datalen + 4);
    711         if (writed < (datalen + 4) )
    712         {
    713             if (netdata != NULL) 
    714             {
    715                 free(netdata);
    716                 netdata = NULL;
    717             }
    718             return writed;
    719         }
    720           
    721     }
    722     
    723     if (ret < 0)
    724     {
    725         //失败返回-1,超时返回-1并且errno = ETIMEDOUT
    726         if (ret == -1 && errno == ETIMEDOUT)
    727         {
    728             ret = Sck_ErrTimeOut;
    729             printf("func sckServer_send() mlloc Err:%d
     ", ret);
    730             return ret;
    731         }
    732         return ret;
    733     }
    734     
    735     return ret;
    736 }
    737 //服务器端端接受报文
    738 int sckServer_rev(int  connfd, unsigned char *out, int *outlen,  int timeout)
    739 {
    740         
    741     int        ret = 0;
    742     
    743     if (out==NULL || outlen==NULL)
    744     {
    745         ret = Sck_ErrParam;
    746         printf("func sckClient_rev() timeout , err:%d 
    ", Sck_ErrTimeOut);
    747         return ret;
    748     }
    749     
    750     ret =  read_timeout(connfd, timeout); //bugs modify bombing
    751     if (ret != 0)
    752     {
    753         if (ret==-1 || errno == ETIMEDOUT)
    754         {
    755             ret = Sck_ErrTimeOut;
    756             printf("func sckClient_rev() timeout , err:%d 
    ", Sck_ErrTimeOut);
    757             return ret;
    758         }
    759         else
    760         {
    761             printf("func sckClient_rev() timeout , err:%d 
    ", Sck_ErrTimeOut);
    762             return ret;
    763         }    
    764     }
    765     
    766     int netdatalen = 0;
    767     ret = readn(connfd, &netdatalen,  4); //读包头 4个字节
    768     if (ret == -1)
    769     {
    770         printf("func readn() err:%d 
    ", ret);
    771         return ret;
    772     }
    773     else if (ret < 4)
    774     {
    775         ret = Sck_ErrPeerClosed;
    776         printf("func readn() err peer closed:%d 
    ", ret);
    777         return ret;
    778     }
    779     
    780     int n;
    781     n = ntohl(netdatalen);
    782     ret = readn(connfd, out, n); //根据长度读数据
    783     if (ret == -1)
    784     {
    785         printf("func readn() err:%d 
    ", ret);
    786         return ret;
    787     }
    788     else if (ret < n)
    789     {
    790         ret = Sck_ErrPeerClosed;
    791         printf("func readn() err peer closed:%d 
    ", ret);
    792         return ret;
    793     }
    794     
    795     *outlen = n;
    796     
    797     return 0;
    798 }
    799 
    800 //服务器端环境释放 
    801 int sckServer_destroy(void *handle)
    802 {
    803     return 0;
    804 }

    服务器:

      1 #include <unistd.h>
      2 #include <sys/types.h>
      3 #include <signal.h>
      4 #include <sys/wait.h>
      5 
      6 
      7 #include <stdlib.h>
      8 #include <stdio.h>
      9 #include <errno.h>
     10 #include <string.h>
     11 #include <pthread.h>
     12 
     13 #include "commsocket.h"
     14 
     15 #include "myipc_sem.h"
     16 #include "myipc_shm.h"
     17 
     18 pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
     19 
     20 
     21 int g_key = 0x3333;
     22 
     23 #define ERR_EXIT(m) 
     24         do 
     25         { 
     26                 perror(m); 
     27                 exit(EXIT_FAILURE); 
     28         } while(0)
     29         
     30         
     31 #include <signal.h>
     32 
     33 void handle(int signum)
     34 {
     35     int pid = 0;
     36     printf("recv signum:%d 
    ", signum);
     37     
     38     //避免僵尸进程
     39     while ((pid = waitpid(-1, NULL, WNOHANG) ) > 0)
     40     {
     41         printf("退出子进程pid%d 
    ", pid);
     42         fflush(stdout);
     43     } 
     44 }
     45 
     46 int srvInit()
     47 {
     48     //共享内存创建
     49     int        ret = 0;
     50     int     shmhdl = 0;
     51     ret = IPC_CreatShm(".", sizeof(int), &shmhdl);
     52     if (ret != 0)
     53     {
     54         printf("func IPC_CreatShm() err:%d 
    ", ret);
     55         return ret;
     56     }
     57     
     58     
     59     //信号量的创建
     60      int      semid = 0;
     61     ret = sem_creat(g_key, &semid);
     62     if (ret != 0)
     63     {
     64         printf("func sem_creat() err:%d,重新按照open打开信号量 
    ", ret);
     65         if (ret == SEMERR_EEXIST)
     66         {
     67             ret = sem_open(g_key, &semid);
     68             if (ret != 0)
     69             {
     70                 printf("按照打开的方式,重新获取sem失败:%d 
    ", ret);
     71                 return ret;
     72             }
     73         }
     74         else
     75         {
     76             return ret;
     77         }
     78         
     79     }
     80     
     81     int  val = 0;
     82     ret = sem_getval(semid, &val);
     83     if (ret != 0 )
     84     {
     85         printf("func sem_getval() err:%d 
    ", ret);
     86         return ret;
     87     }
     88     printf("sem val:%d
    ", val);
     89     return 0;
     90     
     91 }
     92 
     93 
     94 void TestFunc_sem()
     95 {
     96     int ncount = 0;
     97     int ret = 0;
     98     int shmhdl = 0;
     99     int *addr = NULL;
    100     
    101     int semid = 0;
    102     sem_open(g_key, &semid);
    103 
    104 
    105      sem_p(semid); //临界区开始
    106      
    107     //
    108         ret = IPC_CreatShm(".", 0, &shmhdl);
    109         
    110         ret =IPC_MapShm(shmhdl, (void **)&addr);
    111         *((int *)addr) =  *((int *)addr)  + 1;
    112         ncount = *((int *)addr);
    113         printf("ncount:%d
    ", ncount);
    114         //addr[0] = addr[0] +1;
    115         ret =IPC_UnMapShm(addr);
    116         //sleep(2);
    117         
    118     sem_v(semid);  //临界区开始
    119     //
    120     //printf("进程正常退出:%d
    ", getpid());
    121 }    
    122 
    123 
    124 void TestFunc_mutex()
    125 {
    126     int ncount = 0;
    127     int ret = 0;
    128     int shmhdl = 0;
    129     int *addr = NULL;
    130     
    131     int semid = 0;
    132     sem_open(g_key, &semid);
    133 
    134 
    135      //sem_p(semid); //临界区开始
    136      pthread_mutex_lock(&mymutex);
    137      
    138     //
    139         ret = IPC_CreatShm(".", 0, &shmhdl);
    140         
    141         ret =IPC_MapShm(shmhdl, (void **)&addr);
    142         *((int *)addr) =  *((int *)addr)  + 1;
    143         ncount = *((int *)addr);
    144         printf("ncount:%d
    ", ncount);
    145         //addr[0] = addr[0] +1;
    146         ret =IPC_UnMapShm(addr);
    147         //sleep(2);
    148         
    149     //sem_v(semid);  //临界区开始
    150     pthread_mutex_unlock(&mymutex);
    151     //
    152     //printf("进程正常退出:%d
    ", getpid());
    153 }    
    154 
    155 
    156 void *thread_routine(void* arg)
    157 {
    158     int     connfd = 0;
    159     int     ret = 0;
    160     
    161     pthread_detach(pthread_self());
    162     
    163     if (arg == NULL)
    164     {
    165         return NULL;
    166     }
    167     connfd = *((int *)arg);
    168     free(arg);
    169 
    170     unsigned char     recvbuf[1024];
    171     int         recvbuflen = 1024;
    172     int         wait_seconds = 5;
    173         
    174     
    175     while(1)
    176     {
    177         memset(recvbuf, 0, sizeof(recvbuf));
    178         ret = sckServer_rev(connfd, recvbuf, &recvbuflen,  wait_seconds);
    179         if (ret != 0)
    180         {
    181             if (ret == Sck_ErrTimeOut)
    182             {
    183                 continue;
    184             }
    185             printf("func sckServer_rev() err:%d 
    ", ret);
    186             break;
    187         }
    188         
    189         TestFunc_mutex();
    190         printf("recvbuf:%s
    ", recvbuf);
    191         
    192         //
    193         ret = sckServer_send(connfd,  recvbuf, recvbuflen, wait_seconds);
    194         if (ret != 0)
    195         {
    196             printf("func sckServer_send() err:%d 
    ", ret);
    197             break;
    198         }
    199     }
    200     close(connfd);
    201     return NULL;    
    202 }
    203 
    204 int main(void)
    205 {
    206     int        ret = 0;
    207     int     listenfd;
    208     
    209     
    210     signal(SIGCHLD, handle);
    211     signal(SIGPIPE, SIG_IGN);
    212     
    213     //完成共享内存和信号量的创建
    214     srvInit();
    215 
    216     ret = sckServer_init(8001, &listenfd);
    217     if (ret != 0)
    218     {
    219         printf("sckServer_init() err:%d 
    ", ret);
    220         return ret;
    221     }
    222     
    223     while(1)
    224     {
    225         int ret = 0;
    226         int wait_seconds = 5;
    227         int connfd = 0;
    228 
    229         ret = sckServer_accept(listenfd, &connfd,  wait_seconds);
    230         if (ret == Sck_ErrTimeOut)
    231         {
    232             //printf("timeout....
    ");
    233             continue;
    234         }
    235         int tid = 0;
    236         
    237         int *pCon = (int *)malloc(sizeof(int)) ; 
    238         *pCon = connfd;
    239         
    240         //每来一个连接,需要启动I个线程,线程需要收发报文 
    241         pthread_create(&tid, NULL, thread_routine, (void *)pCon);
    242 
    243     }
    244     
    245 
    246     return 0;
    247 }

    客户端:

      1 #include <unistd.h>
      2 #include <sys/types.h>
      3 //#include <sys/socket.h>
      4 //#include <netinet/in.h>
      5 //#include <arpa/inet.h>
      6 #include <signal.h>
      7 #include <sys/wait.h>
      8 //#include "sckutil.h"
      9 
     10 #include <stdlib.h>
     11 #include <stdio.h>
     12 #include <errno.h>
     13 #include <string.h>
     14 
     15 #include "commsocket.h"
     16 
     17 void handle(int signum)
     18 {
     19     int pid = 0;
     20     printf("recv signum:%d 
    ", signum);
     21     
     22     //避免僵尸进程
     23     while ((pid = waitpid(-1, NULL, WNOHANG) ) > 0)
     24     {
     25         printf("退出子进程pid%d 
    ", pid);
     26         fflush(stdout);
     27     } 
     28 }
     29 
     30 
     31 
     32 int main()
     33 {    
     34     int         ret = 0;
     35     void         *handle = NULL;
     36     //void         handle = NULL;
     37     int         connfd;
     38     int                 i = 0, j = 0;
     39     
     40     signal(SIGCHLD, handle);
     41     signal(SIGPIPE, SIG_IGN); //防止 sokcet破裂
     42     
     43     int procnum=10;
     44     int loopnum = 100;
     45     
     46     pthread_t tidArray[1024*2];
     47     
     48 
     49 
     50     printf("请输入要创建子进程的个数 : 
    ");
     51     scanf("%d", &procnum);
     52 
     53     printf("请输入让每个子进程测试多少次 :
    ");
     54     scanf("%d", &loopnum);
     55     
     56         
     57      unsigned char *data = (unsigned char *)"aaaaaafffffffffffssssss";
     58      int         datalen = 10;
     59      
     60      unsigned char out[1024];
     61      int outlen = 1024;
     62     //客户端环境初始化
     63     
     64     ret = sckCliet_init(&handle, 15, 5, 5, 10);
     65 
     66     for (i=0; i<procnum; i++)
     67     {
     68         int pid = fork();
     69         if (pid == 0)
     70         {
     71             ret = sckCliet_getconn(handle, "127.0.0.1", 8001, &connfd);
     72             for (j=0; j<loopnum; j++)
     73             {
     74                 //客户端发送报文
     75                 ret = sckClient_send(handle,  connfd, data, datalen);
     76                 if (ret != 0)
     77                 {
     78                     if (ret == Sck_ErrTimeOut)
     79                     {
     80                         continue;
     81                     }
     82                     break;
     83                 }
     84                 //printf("ccccccc
    ");
     85                     //客户端端接受报文
     86                 ret = sckClient_rev(handle, connfd, out, &outlen); //10
     87                 if (ret != 0)
     88                 {
     89                     if (ret == Sck_ErrTimeOut)
     90                     {
     91                         break; 
     92                     }
     93                     break;
     94                 }
     95                 out[outlen] = '';
     96             
     97                 printf("out: %s 
    ", out);
     98             }
     99             
    100             close(connfd);
    101             exit(0); //让子进程退出,不参与fork
    102         }
    103     }
    104     
    105 
    106     // 客户端环境释放 
    107     ret = sckClient_destroy(handle);
    108     
    109     //避免僵尸进程
    110     int mypid;
    111     while ((mypid = waitpid(-1, NULL, WNOHANG) ) > 0)
    112     {
    113         printf("退出子进程pid%d 
    ", mypid);
    114         fflush(stdout);
    115     } 
    116 
    117 }
  • 相关阅读:
    记录下我常用的工具
    记录下Lambda常用的表现形式
    链式编程学习之练习篇
    MySQL5.6.35部署
    jdk+Tomcat环境
    查找Linux中内存和CPU使用率最高的进程
    Linux 双网卡绑定
    saltstack 把数据返回到mysql服务器
    Python中map,filter,reduce,zip的应用
    python使用psutil获取服务器信息
  • 原文地址:https://www.cnblogs.com/wanmeishenghuo/p/9457764.html
Copyright © 2011-2022 走看看