公司里开发的一个项目需要在server端添加多线程缓冲队列,来存取数据,我也是初出茅庐没有太多经验,在网上搜集了大量资料后,终于有了一套自己的设计思路,并解决了项目里的问题,因为当时搜集资料时,发现网上这个的具体文章不是太多或者要么太复杂,要么太简陋,对于新手很难能看懂,所有我就打算将我的设计思路发出来,希望能帮助和我一样的同学朋友们,如有不足请指导!谢谢
项目需求:
两个线程,线程1接收客户端数据并有序的存入缓冲队列;线程2从缓冲队列有序的取出数据并解析插入数据库;
解决方法:
1:新建一个结构体buff_quere,内有一个枚举类型buffstatus缓冲区状态,为空 已读 已写,和一个缓存数据的数组类型buff_up_quere,接收的数据存在这里;(全局)
1 /* 缓冲区结构体 */ 2 struct buff_quere{ 3 enum buffstatus{empty,wirte,reads}bufstatus; //为空,已写,已读 4 uint8_t buff_up_quere[RX_BUFF_SIZE]; //缓冲区 5 };
2:创建结构体数组为缓冲区,需要几个就建几个;(全局)
1 static struct buff_quere buff_data1[BUFFER_QUEUE_LEN];//缓冲区1 2 static int count_len1 = 0; //缓冲区1计数器 3 static struct buff_quere buff_data2[BUFFER_QUEUE_LEN];//缓冲区2 4 static int count_len2 = 0; //缓冲区2计数器
3:线程1接收并储存数据到缓冲区;
1 int buff_nb = 1;//默认从第一个缓冲区开始储存 2 int nb_next = 0;//下一个缓冲区 3 int len=0; 4 while(true) 5 { 6 //缓冲队列 7 nb_next = buff_nb+1; 8 if(nb_next == 2){ 9 nb_next = 1; 10 } 11 switch(buff_nb){ 12 case 1: 13 printf("进入缓冲区%d进行写入缓冲区index=%d ",buff_nb,len); 14 buff_data1[len].bufstatus = wirte; 15 memcpy(&buff_data1[len].buff_up_quere, &buff_up, sizeof(buff_up)); 16 len++; 17 if (len == BUFFER_QUEUE_LEN) 18 { 19 printf("缓冲区%d已写入完毕",buff_nb); 20 while(count_len2 != 0){ 21 printf("警告:缓冲区%d还未清空,停止缓存数据 ",nb_next); 22 sleep(1); 23 } 24 printf(",转到缓冲区%d进行缓存 ",nb_next); 25 len = 0; 26 buff_nb = nb_next; 27 } 28 break; 29 case 2: 30 printf("进入缓冲区%d进行写入缓冲区index=%d ",buff_nb,len); 31 buff_data2[len].bufstatus = wirte; 32 memcpy(&buff_data2[len].buff_up_quere, &buff_up, sizeof(buff_up)); 33 len++; 34 if (len == BUFFER_QUEUE_LEN) 35 { 36 printf("缓冲区%d已写入完毕",buff_nb); 37 while(count_len1 != 0){ 38 printf("警告:缓冲区%d还未清空,停止缓存数据 ",nb_next); 39 sleep(1); 40 } 41 printf(",转到缓冲区%d进行缓存 ",nb_next); 42 len = 0; 43 buff_nb = nb_next; 44 } 45 break; 46 default: 47 continue; 48 break; 49 }
4:线程2从缓冲区获取数据并操作数据;
1 int buff_nb = 1;//获取缓冲区编号,默认为1 2 int nb_next = 0;//下一个缓冲区 3 int len=0; 4 while (true) 5 { 6 //缓冲队列 7 nb_next = buff_nb+1; 8 if(nb_next == 11){ 9 nb_next = 1; 10 } 11 switch(buff_nb){ 12 case 1: 13 if(buff_data1[len].bufstatus == wirte){ 14 printf("进入缓冲区%d进行读取缓冲区数据index=%d ",buff_nb,len); 15 buff_data1[len].bufstatus = reads; 16 memcpy(&buff_up, &buff_data1[len].buff_up_quere, sizeof(buff_data1[len].buff_up_quere)); 17 len++; 18 if (len == BUFFER_QUEUE_LEN) 19 { 20 printf("缓冲区%d已读取完毕index=%d,清空此缓冲区 ",buff_nb,len); 21 len = 0; 22 buff_nb = nb_next; 23 memset(buff_data1, 0, sizeof(buff_data1)); 24 } 25 }else{ 26 printf("缓冲区%d为空或已读,请等待... ",buff_nb); 27 sleep(1); 28 continue; 29 } 30 count_len1 = len; 31 break; 32 case 2: 33 if(buff_data2[len].bufstatus == wirte){ 34 printf("进入缓冲区%d进行读取缓冲区数据index=%d ",buff_nb,len); 35 buff_data2[len].bufstatus = reads; 36 memcpy(&buff_up, &buff_data2[len].buff_up_quere, sizeof(buff_data2[len].buff_up_quere)); 37 len++; 38 if (len == BUFFER_QUEUE_LEN) 39 { 40 printf("缓冲区%d已读取完毕index=%d,清空此缓冲区 ",buff_nb,len); 41 len = 0; 42 buff_nb = nb_next; 43 memset(buff_data2, 0, sizeof(buff_data2)); 44 } 45 }else{ 46 printf("缓冲区%d为空或已读,请等待... ",buff_nb); 47 sleep(1); 48 continue; 49 } 50 count_len2 = len; 51 break; 52 default: 53 continue; 54 break; 55 }
总结思路:当线程1将数据储存到缓冲区1(buff_data1[BUFFER_QUEUE_LEN])的时候先将缓冲区1当前储存的元素赋值一个写入的状态(buff_data1[len].bufstatus = wirte;),然后再写入(memcpy(&buff_data1[len].buff_up_quere, &buff_up, sizeof(buff_up)););当缓冲区1写满后通过count_len2是否为0判断缓冲区2是否为空,为空就切换缓冲区2并将下标设为0继续写入(len = 0;buff_nb = nb_next;),否则进行等待;
当线程2从缓冲区1(buff_data1[BUFFER_QUEUE_LEN])获取数据的时候,先判断缓冲区1里当前元素的状态是否为写入(if(buff_data1[len].bufstatus == wirte)),是则给当前元素赋值一个读取的状态(buff_data1[len].bufstatus = reads;)然后再获取(memcpy(&buff_up, &buff_data1[len].buff_up_quere, sizeof(buff_data1[len].buff_up_quere));),不是就等待(sleep(1);continue;),当获取完此缓冲区就清空掉,(memset(buff_data1, 0, sizeof(buff_data1));)并将count_len2=0,告诉线程1我清空了,你可以写了!然后读取下一个缓冲区(len = 0;buff_nb = nb_next;)。