zoukankan      html  css  js  c++  java
  • 使用内存映射开发高性能进程间消息通信组件

    一、背景

      项目开发中免不了各模块或系统之间进行消息通信,目前热门的消息中间件有Redis、RabbitMQ、Kafka、RocketMQ等等。

    以上几种组件中Redis在消息队列方面表现还可以,但是如果涉及发布订阅功能,就不行了,最近项目就使用了redis的发布订阅,

    每秒只能发出几千条,虽然目前绰绰有余,但是瓶颈可以预期。

      其余的几种都是比较重量级的消息中间件,什么跨平台、分布式、集群、支持N种协议等等,很高大全,

    我们可能就只使用了其中1、2个功能。严格来说,项目中集成这几种MQ的工作量是不小的,对于中小型系统来说,可能维护MQ

    稳定的工作量都比项目还大,难度也高,所有功能用全了的程序员恐怕不多。

      从长远考虑出发,选择重量级MQ恐怕是板上钉钉的事,但是项目一开始就上这几种,我觉得那也是欠缺考虑的。如果项目

    根本不要求跨机器通信,那杀鸡就不要用牛刀了。比如,你只是在模块之间、线程之间、进程之间,或者是在同一主机的各种不同系统之间,

    其实都可以不用重量级MQ。当然你使用了也没事,看个人选择。

      最近的项目有这么个场景,采集近所有底层设备,每个设备有点3000个,总共20多万个点需要采集上来。刚开始使用了Redis的发布订阅,

    但是程序毫无疑问地挂了,根本带不起来;因为程序启动时每个点的值都是从0变成N,就需要发消息出来,那一开始消息是很多的,redis根本

    处理不完,而且有很高频率的超时断线。以至于想换RabbitMQ,后来想想还是算了,因为那样增加项目难度不说,后期维护也是个难题。

    说到底这是模块之间的通信,是主程序(Winform)调用采集C++的DLL类库,发出消息后主程序和web端订阅,在主程序与DLL这边,在DLL

    方法上增加一个回调函数就搞定了,完全不用走消息中间件,Web端要哪些点的实时值就先ASK,先请求需要看哪些点,如何在主程序这边

    发布那些点的实时值消息,这样发布订阅的数据量少了2、3个数量级不止。

    二、需求

      针对上边的业务场景,因为是模块之间的线程间通信,这样搞问题不大;如果是进程之间也要那么高频率的通信,那就不好办了,我们

    不想使用重量级MQ,又想高频率传输消息,怎么办呢?网上搜索了一番,貌似没看到有成熟的速度又快、体量又小,部署又简单的中间件。

    所以在下不才,针对这个问题抛砖引玉,做一个demo出来供大家讨论一下。

    三、原理

      应题,就是使用内存映射来做同一个机器下各种消息的通信,之前也写过一篇关于使用共享内存实现快速读写的文章,点击前往浏览

    “.net环境下跨进程、高频率读写数据”,但是内存映射比较适合做消息队列,因为消息可以持久化在本地,没读完下次进来还可以接着读。

    我预想是这样设计:

    1、发布订阅涉及到2个主要方法:Publish(string channel)、Subscribe(string channel, Callback callback);

    2、为每个channel生成一个文件:channel.db,默认每个db可以存储1000个同类型的结构体消息作为消息队列,从头部写入,尾部读出。

       每个db文件前面留一个索引区作为发布方与订阅方各自的读写位置。发布与订阅前,先读写这个索引区,因为是一对一读写,所以

          可以完美避开读写锁,大大提高性能。

    3、针对一对多需求,单独设计一个config.db文件存储种channel与其相关订阅信息,大概原理图如下:

     

    4、解决读写不加锁问题

    我们看结构体:SIndex有三个属性

    1) WriteIndex 记录发布方(Pubish)最后写入数据的位置

    2) ReadIndex 记录订阅方(Subscribe)最后读取数据的位置

    3) Over 表示WriteIndex已达到队列最大值,再WriteIndex小于等于队列最大值前,读写如下图:

    WriteIndex达到最大值后再往下写Over就要取反,如由False变为True。WriteIndex=0

    如果此时没有订阅方,那新消息就会被抛弃,因为已无空间存储。

    4) 如果ReadIndex数值到队列最大值,Over也取反,此时ReadIndex = 0,读写又变成图1所示

    5) 读写过程中并不存在互斥的情况,只要管理好读写位置,就可以避免加锁。

    四、接口设计

    4.1、主要参数定义

    #define FM_MAX_CHANNEL		100		// 暂定最多100个不同频道
    #define FM_MAX_SUBSCRIBE	3		// 暂定最多3个订阅用户
    #define FM_MAX_ROWS			1000	// 暂定最多队列大小为1000
    #define FM_DISCONNECT_TIME  5000	// 超过5000毫秒无心跳更新视为订阅断开
    #define FM_KEEP_CONN_CYCLE  1000	// 保持心跳连接的时间周期
    #define FM_NOTHING			-1		// 空白,数组为0等
    #define FM_WORD_SIZE		sizeof(WORD)	// WORD长度
    #define FM_INDEX_SIZE		sizeof(SIndex)  // SIndex长度 

    4.2、结构体

     1 // 索引
     2 typedef struct
     3 {
     4     WORD WriteIndex;
     5     WORD ReadIndex;
     6     WORD Over;         // 当W或R超过MAX一次,Over取反一次,Over默认为False
     7 }SIndex;
     8 
     9 // 内存映射参数
    10 typedef struct
    11 {
    12     HANDLE FileHandle;
    13     HANDLE FileMappingHandle;
    14     LPVOID MapViewOfFileHandle;
    15     UINT StructSize;
    16     char FileName[20];
    17     UINT SubscribeIndex;
    18     WORD Conned;
    19 }SDbConnInfo;
    20 
    21 // 频道
    22 typedef struct
    23 {
    24     char ChannelName[20];
    25     UINT StructSize;
    26     DWORD Subscribe1LastTime;
    27     DWORD Subscribe2LastTime;
    28     DWORD Subscribe3LastTime;
    29 }SChannel;
    30 
    31 // 频道与订阅映射
    32 typedef struct
    33 {
    34     char ChannelName[20];
    35     SDbConnInfo DbConnInfo[FM_MAX_SUBSCRIBE];
    36 }SChannelMapDbConnInfo;
    View Code

    4.3、主要方法

    	// 发布信息
    	template<typename T>
    	int Publish(const char *channel, T* data);
    
    	// 订阅信息
    	template<typename T>
    	void Subscribe(const char *channel, SubscribeCallBackHandle callback);
    

    五、代码实现

    5.1 、FMDBManager,主要管理内存映射相关操作,因为是读写位置不一样,所以不需要加互斥量

      1 class FMDBManager
      2 {
      3 public:
      4     FMDBManager() {};
      5     ~FMDBManager() {};
      6 
      7 public:
      8     static int Create(SDbConnInfo *info)
      9     {
     10         CString fileName(info->FileName);
     11         DWORD totalSize = (FM_MAX_ROWS * info->StructSize) + FM_INDEX_SIZE;
     12 
     13         info->FileHandle = CreateFile(fileName, (GENERIC_READ | GENERIC_WRITE), (FILE_SHARE_READ | FILE_SHARE_WRITE),
     14             NULL, OPEN_ALWAYS, FILE_FLAG_SEQUENTIAL_SCAN, NULL);
     15 
     16         info->FileMappingHandle = CreateFileMapping(info->FileHandle, NULL, PAGE_READWRITE, 0, totalSize, NULL);
     17 
     18         if(info->FileMappingHandle == NULL || info->FileMappingHandle == INVALID_HANDLE_VALUE) 
     19         {
     20             Log("");
     21             CloseHandle(info->FileHandle);
     22             return enumFail;
     23         }
     24 
     25         if(GetLastError() == ERROR_ALREADY_EXISTS) 
     26         {
     27             Log("");
     28             return enumFail;
     29         }
     30 
     31         // init
     32         info->MapViewOfFileHandle = MapViewOfFile(info->FileMappingHandle, FILE_MAP_ALL_ACCESS, 0, 0, totalSize);
     33 
     34         if(info->MapViewOfFileHandle == NULL) 
     35         {
     36             Log("");
     37             CloseHandle(info->FileMappingHandle);
     38             CloseHandle(info->FileHandle);
     39             return enumFail;
     40         }
     41 
     42         return enumSuccess;
     43     }
     44 
     45 protected:
     46     int Write(void *data, UINT order, SDbConnInfo *info)
     47     {
     48         if(info->MapViewOfFileHandle == NULL) 
     49         {
     50             Log("");
     51             return enumFail;
     52         }
     53         else
     54             memcpy((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, data, info->StructSize);
     55 
     56         return enumSuccess;
     57     }
     58     int Read(void *data, UINT order, SDbConnInfo *info)
     59     {
     60         if(info->MapViewOfFileHandle == NULL) 
     61         {
     62             Log("");
     63             return enumFail;
     64         }
     65         else
     66             memcpy(data, (char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, info->StructSize);
     67 
     68         return enumSuccess;
     69     }
     70     int Delete(UINT order, SDbConnInfo *info)
     71     {
     72         if(info->MapViewOfFileHandle == NULL) 
     73         {
     74             Log("");
     75             return enumFail;
     76         }
     77         else
     78             memset((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, 0, info->StructSize);
     79 
     80         return enumSuccess;
     81     }
     82 
     83     int WriteConfig(void *data, UINT order, UINT pos, UINT size, SDbConnInfo *info)
     84     {
     85         if(info->MapViewOfFileHandle == NULL) 
     86         {
     87             Log("");
     88             return enumFail;
     89         }
     90         else
     91             memcpy((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE + pos, data, size);
     92 
     93         return enumSuccess;
     94     }
     95     int WriteIndex(void *data, UINT pos, UINT size, SDbConnInfo *info)
     96     {
     97         if(info->MapViewOfFileHandle == NULL) 
     98         {
     99             Log("");
    100             return enumFail;
    101         }
    102         else
    103             memcpy((char *)info->MapViewOfFileHandle + pos, data, size);
    104 
    105         return enumSuccess;
    106     }
    107     int ReadIndex(SIndex *sIndex, SDbConnInfo *info)
    108     {
    109         if(info->MapViewOfFileHandle == NULL) 
    110         {
    111             Log("");
    112             return enumFail;
    113         }
    114         else
    115             memcpy(sIndex, (char *)info->MapViewOfFileHandle, FM_INDEX_SIZE);
    116 
    117         return enumSuccess;
    118     }
    119 };
    View Code

    5.2、FMDBClient,内存映射客户端,主要封装Publish与Subscribe方法给前端调用,屏蔽复杂性

      1 class FMDBClient : public FMDBManager
      2 {
      3 private:
      4     mutable std::mutex mut;
      5     SChannelMapDbConnInfo channelMapDbConnInfo = { 0 };
      6 
      7     bool CanWrite(SIndex *sIndex)
      8     {
      9         int nextWriteIndex = sIndex->WriteIndex + 1;
     10         if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;
     11 
     12         return nextWriteIndex != sIndex->ReadIndex;
     13     }
     14     bool CanRead(SIndex *sIndex) {
     15         if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;
     16         else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;
     17     }
     18 
     19     int GetDbConnInfo(const char *channel, int size)
     20     {
     21         int rest = enumFail;
     22 
     23         for(int i = 0; i < FM_MAX_CHANNEL; i++)
     24         {
     25             char channelNameTmp[20] = { 0 };
     26             sprintf_s(channelNameTmp, "%s", channel);
     27 
     28             if(0 == strcmp(channelNameTmp, channelMapDbConnInfoArray[i].ChannelName))
     29             {
     30                 channelMapDbConnInfo = channelMapDbConnInfoArray[i];
     31                 rest = enumSuccess;
     32                 break;
     33             }
     34         }
     35 
     36         return rest;
     37     }
     38     int SetDbConnInfo(const char *channel, UINT *subscribeIndex, SDbConnInfo *dbConnInfo)
     39     {
     40         std::lock_guard<std::mutex> lk(mut);
     41 
     42         int nextSubscribeIndex = fmdbConfig->GetNextSubscribeIndex(channel);
     43         if(nextSubscribeIndex == FM_NOTHING)
     44         {
     45             SChannel sChannel = { 0 };
     46             sprintf_s(sChannel.ChannelName, "%s", channel);
     47             sChannel.Subscribe1LastTime = GetTickCount();
     48             sChannel.StructSize = dbConnInfo->StructSize;
     49 
     50             sprintf_s(dbConnInfo->FileName, "%s.1.db", channel);
     51             if(!fmdbConfig->IsFM_NOTHING(dbConnInfo->FileName) && dbConnInfo->StructSize > 0)
     52             {
     53                 dbConnInfo->SubscribeIndex = 1;
     54                 *subscribeIndex = dbConnInfo->SubscribeIndex;
     55 
     56                 if(Create(dbConnInfo) == enumSuccess) return fmdbConfig->Insert(&sChannel);
     57                 else sprintf_s(dbConnInfo->FileName, "%s", channel); //还原名称
     58             }
     59         }
     60 
     61         if(nextSubscribeIndex > 1)
     62         {
     63             sprintf_s(dbConnInfo->FileName, "%s.%d.db", channel, nextSubscribeIndex);
     64             if(!fmdbConfig->IsFM_NOTHING(dbConnInfo->FileName) && dbConnInfo->StructSize > 0)
     65             {
     66                 dbConnInfo->SubscribeIndex = nextSubscribeIndex;
     67                 *subscribeIndex = nextSubscribeIndex;
     68 
     69                 if(Create(dbConnInfo) == enumSuccess) return fmdbConfig->Save(channel, nextSubscribeIndex);
     70                 else sprintf_s(dbConnInfo->FileName, "%s", channel); //还原名称
     71             }
     72         }
     73 
     74         return enumFail;
     75     }
     76     bool SetSubscribeConned(const char *channel, int subscribeIndex, SDbConnInfo *dbConnInfo)
     77     {
     78         int rest = enumFail;
     79 
     80         if(subscribeIndex <= 0) return rest;
     81 
     82         for(int i = 0; i < FM_MAX_CHANNEL; i++)
     83         {
     84             char channelNameTmp[20] = { 0 };
     85             sprintf_s(channelNameTmp, "%s", channel);
     86 
     87             if(0 == strcmp(channelNameTmp, channelMapDbConnInfoArray[i].ChannelName))
     88             {
     89                 channelMapDbConnInfoArray[i].DbConnInfo[subscribeIndex - 1].SubscribeIndex = dbConnInfo->SubscribeIndex; 
     90                 channelMapDbConnInfoArray[i].DbConnInfo[subscribeIndex - 1].Conned = 1;
     91                 rest = enumSuccess;
     92                 break;
     93             }
     94         }
     95 
     96         return rest;
     97     }
     98     bool IsConning(SDbConnInfo *dbConnInfo) { return true; };
     99 
    100 public:
    101     FMDBClient()
    102     {
    103         while(!fmdbConfigLoadFinish) { Sleep(200); }
    104     };
    105     ~FMDBClient() {};
    106 
    107 public:
    108     int failTimes = 0;
    109 
    110     template<typename T>
    111     int Publish(const char *channel, T* data)
    112     {
    113         int rest = enumFail;
    114 
    115         // 查找
    116         if(GetDbConnInfo(channel, sizeof(T)) == enumFail)
    117         {
    118             printf_s("发布%s失败.
    ", channel);
    119             return enumFail;
    120         }
    121 
    122         for(int i = 0; i < FM_MAX_SUBSCRIBE; i++)
    123         {
    124             if(channelMapDbConnInfo.DbConnInfo[i].FileHandle == NULL) continue;
    125 
    126             while(IsConning(&channelMapDbConnInfo.DbConnInfo[i]))
    127             {
    128                 SIndex sIndex = { 0 };
    129                 if(ReadIndex(&sIndex, &channelMapDbConnInfo.DbConnInfo[i]) == enumFail)
    130                 {
    131                     throw "映射文件加载失败";
    132                 }
    133 
    134                 if(CanWrite(&sIndex))
    135                 {
    136                     WORD writeIndex = sIndex.WriteIndex;
    137                     if(Write(data, writeIndex, &channelMapDbConnInfo.DbConnInfo[i]) == enumSuccess)
    138                     {
    139                         writeIndex++;
    140                         if(writeIndex > FM_MAX_ROWS)
    141                         {
    142                             writeIndex = 0;
    143 
    144                             WORD Over = TRUE;
    145                             WriteIndex(&Over, (FM_WORD_SIZE * 2), FM_WORD_SIZE, &channelMapDbConnInfo.DbConnInfo[i]);
    146                         }
    147 
    148                         rest = WriteIndex(&writeIndex, 0, FM_WORD_SIZE, &channelMapDbConnInfo.DbConnInfo[i]);
    149                         break;
    150                     }
    151                 }
    152                 else
    153                 {
    154                     failTimes++;
    155                 }
    156             }
    157         }
    158 
    159         return rest;
    160     }
    161 
    162     template<typename T>
    163     void Subscribe(const char *channel, SubscribeCallBackHandle callback)
    164     {
    165         SDbConnInfo dbConnInfo = { 0 };
    166         dbConnInfo.StructSize = sizeof(T);
    167 
    168         UINT subscribeIndex = 0;
    169         if(SetDbConnInfo(channel, &subscribeIndex, &dbConnInfo) == enumFail)
    170         {
    171             printf_s("订阅%s失败.
    ", channel);
    172             return;
    173         }
    174 
    175         while(IsConning(&dbConnInfo))
    176         {
    177             SetSubscribeConned(channel, subscribeIndex, &dbConnInfo);
    178 
    179             SIndex sIndex = { 0 };
    180             if(ReadIndex(&sIndex, &dbConnInfo) == enumFail) throw "映射文件加载失败";
    181             if(!CanRead(&sIndex)) continue;
    182 
    183             T t = { 0 };
    184             int readIndex = sIndex.ReadIndex;
    185             if(Read(&t, readIndex, &dbConnInfo) == enumSuccess)
    186             {
    187                 readIndex++;
    188                 if(readIndex > FM_MAX_ROWS)
    189                 {
    190                     readIndex = 0;
    191 
    192                     WORD Over = FALSE;
    193                     WriteIndex(&Over, (FM_WORD_SIZE * 2), FM_WORD_SIZE, &dbConnInfo);
    194                 }
    195 
    196                 if(WriteIndex(&readIndex, FM_WORD_SIZE, FM_WORD_SIZE, &dbConnInfo) == enumSuccess)
    197                     if(Delete(sIndex.ReadIndex, &dbConnInfo) == enumSuccess)
    198                         if(callback(&t) == enumBreak) break;
    199             }
    200         }
    201     }
    202 };
    View Code

     请注意上边控制读写的2个方法

    	bool CanWrite(SIndex *sIndex)
    	{
    		int nextWriteIndex = sIndex->WriteIndex + 1;
    		if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;
    
    		return nextWriteIndex != sIndex->ReadIndex;
    	}
    	bool CanRead(SIndex *sIndex) 
    	{
    		if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;
    		else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;
    	}
    

    我们可以分析一下,下一个WriteIndex值如果大于队列最大值 WriteIndex置0,下一个WriteIndex数值如果不等于

    正在读的位置ReadIndex就能写;如果WriteIndex没有超出最大值,只要ReadIndex小于等于WriteIndex就能读,

    如果超出,就判断ReadIndex大于WriteIndex就能读。WriteIndex与ReadIndex数值在Publish与Subscribe中维护

    5.3、建立新线程获取最新订阅的客户端信息,这个功能主要是动态地像多个Subscribe端发生消息,比如订阅发生在发布之后,

    也应该能收到消息。

     1 void Update()
     2 {
     3     while(true)
     4     {
     5         if(fmdbConfig->GetChannelArray() == enumSuccess)
     6         {
     7             for(int i = 0; i < FM_MAX_CHANNEL; i++)
     8             {
     9                 if(fmdbConfig->IsFM_NOTHING(channelMapDbConnInfoArray[i].ChannelName)) continue;
    10 
    11                 for(int j = 0; j < FM_MAX_SUBSCRIBE; j++)
    12                 {
    13                     if(channelMapDbConnInfoArray[i].DbConnInfo[j].StructSize <= 0) continue;
    14 
    15                     // KeepConned
    16                     if(channelMapDbConnInfoArray[i].DbConnInfo[j].Conned)
    17                     {
    18                         fmdbConfig->KeepConned(channelMapDbConnInfoArray[i].ChannelName,
    19                             channelMapDbConnInfoArray[i].DbConnInfo[j].SubscribeIndex);
    20 
    21                         channelMapDbConnInfoArray[i].DbConnInfo[j].Conned = 0;
    22                         //printf_s("%s.KeepConned.
    ", channelDbParsArray[i].SDbPars[j].Channel);
    23                     }
    24 
    25                     if(!fmdbConfig->Exists(channelMapDbConnInfoArray[i].DbConnInfo[j].FileName))
    26                     {
    27                         FMDBManager::Create(&channelMapDbConnInfoArray[i].DbConnInfo[j]);
    28                         fmdbConfig->AddChannel(channelMapDbConnInfoArray[i].DbConnInfo[j].FileName);
    29                     }
    30                 }
    31             }
    32         }
    33 
    34         fmdbConfigLoadFinish = true;
    35         Sleep(1000);
    36     }
    37 }
    38 thread th(Update);
    View Code

     六、Demo测试

    6.1、Producer.cpp

     1 #include "pch.h"
     2 #include "../FMDB.h"
     3 
     4 using namespace std;
     5 
     6 int main()
     7 {
     8     FMClient * client = new FMClient();
     9 
    10     int times = 0;
    11     int index = 0;
    12     int total = 0;
    13     UINT structSize = sizeof(SPerson);
    14     DWORD dwStartTmp = GetTickCount();
    15 
    16     while(TRUE)
    17     {
    18         times++;
    19         if(index == 0)
    20         {
    21             dwStartTmp = GetTickCount();
    22         }
    23 
    24         SPerson sPerson = { 0 };
    25         sPerson.Idx = index;
    26         sprintf_s(sPerson.Name, "Name.%d", index);
    27         sPerson.Age = index;
    28 
    29         if(client->Publish("Person", &sPerson) == enumSuccess)
    30         {
    31             if(index % 2 == 0) total = total + sPerson.Idx;
    32             else total = total - sPerson.Idx;
    33 
    34             index++;
    35             if(index % 50000 == 0)
    36                 printf_s("发送条数: %d, 耗时:%d 
    ", index, (GetTickCount() - dwStartTmp));
    37         }
    38 
    39         if(index >= 2000000) break;
    40     }
    41 
    42     printf_s("调用次数: %d, 成功条数: %d, 检验值: %d 
    ", times, index, total);
    43     system("pause");
    44 }

    6.2、Consumer.cpp

     1 #include "pch.h"
     2 #include "../FMDB.h"
     3 
     4 using namespace std;
     5 
     6 int index = 0;
     7 int total = 0;
     8 DWORD dwStartTmp = GetTickCount();
     9 
    10 int SubscribeCallback(void *msg)
    11 {
    12     SPerson * person = (SPerson *)msg;
    13 
    14     if(index == 0)
    15     {
    16         dwStartTmp = GetTickCount();
    17     }
    18 
    19     if(index % 2 == 0) total = total + person->Idx;
    20     else total = total - person->Idx;
    21 
    22     index++;
    23     if(index % 50000 == 0)
    24     {
    25         printf("接收条数: %d, 耗时:%d, Idx:%d, Name:%s, Age:%d
    ",
    26             index, (GetTickCount() - dwStartTmp), person->Idx, person->Name, person->Age);
    27     }
    28 
    29     if(index >= 2000000)
    30     {
    31         return enumBreak;
    32     }
    33 
    34     return enumSuccess;
    35 };
    36 
    37 int main()
    38 {
    39     FMClient * client = new FMClient();
    40     client->Subscribe<SPerson>("Person", SubscribeCallback);
    41 
    42     printf("接收条数: %d, 检验值: %d 
    ", index, total);
    43     system("pause");
    44 }

    6.3、运行,测试用例中使用了向队列发送200万条数据,消息大小128字节,订阅端也是接受到200万数据后退出,并且打印检验值。

    1) 检验值计算:0+1-2+3-4+ ---------  - 2000000 = -1000000,如果队列运行正常,那两边的检验值应该都是是 -1000000.

    2) 每5万条打印一次日志,运行情况如下

    一对一方式运行三次,分别耗时(毫秒):2886、2979、2871

    3) 一对二方式运行三次,分别耗时(毫秒):4087、4009、4040

    4)运行过程中产生的文件

    6.4、200万数据一对一耗时近3秒,貌似也不是非常快是不是?但是这就是最大速度了吗?

    当然不是哦,别忘了这是debug版本,我们切换到release版本看速度会不会有所提升。

     

    一对一运行三次耗时分别是:1224、1373、1326

    厉害了,

    SPerson结构体128字节,每秒可以处理180万数据,当然实际运用肯定达不到,因为处理其他业务逻辑也要耗时间。

    好了,为了这个demo脑壳都想疼了,思考模型,调试BUG,期间各种问题,实在茶壶煮饺子,有苦说不出。

    你看,又浪费我周末2天时间,期间就吃了一餐,今天的还没吃呢,等下去旁边山上走走,不然就要发霉了。拜拜。。。

  • 相关阅读:
    Linux 多进程锁的几种实现方案
    Linux man手册没有pthread_mutex_init的解决办法
    IP地址结构信息与字符串相互转化:inet_pton和inet_ntop, etc.
    Linux 将计算md5值功能做成md5命令
    Unix/Linux inet守护进程
    Unix/Linux syslogd守护进程 & 日志记录syslog
    UNP 学习笔记 #11 名字与地址转换
    git 使用总结
    AUPE 输出致标准错误的出错函数分析与实现 err_sys, err_quit, err_doit etc.
    Linux C常见数I/O函数比较: printf, sprintf, fprintf, write...
  • 原文地址:https://www.cnblogs.com/lanxiaoke/p/10228355.html
Copyright © 2011-2022 走看看