zoukankan      html  css  js  c++  java
  • 使用内存映射开发高性能进程间消息通信组件

    一、背景

      项目开发中免不了各模块或系统之间进行消息通信,目前热门的消息中间件有Redis、RabbitMQ、Kafka、RocketMQ等等。

    以上几种组件中Redis在消息队列方面表现还可以,但是如果涉及发布订阅功能,就不行了,最近项目就使用了redis的发布订阅,

    每秒只能发出几千条,虽然目前绰绰有余,但是瓶颈可以预期。

      其余的几种都是比较重量级的消息中间件,什么跨平台、分布式、集群、支持N种协议等等,很高大全,

    我们可能就只使用了其中1、2个功能。严格来说,项目中集成这几种MQ的工作量是不小的,对于中小型系统来说,可能维护MQ

    稳定的工作量都比项目还大,难度也高,所有功能用全了的程序员恐怕不多。

      从长远考虑出发,选择重量级MQ恐怕是板上钉钉的事,但是项目一开始就上这几种,我觉得那也是欠缺考虑的。如果项目

    根本不要求跨机器通信,那杀鸡就不要用牛刀了。比如,你只是在模块之间、线程之间、进程之间,或者是在同一主机的各种不同系统之间,

    其实都可以不用重量级MQ。当然你使用了也没事,看个人选择。

      最近的项目有这么个场景,采集近所有底层设备,每个设备有点3000个,总共20多万个点需要采集上来。刚开始使用了Redis的发布订阅,

    但是程序毫无疑问地挂了,根本带不起来;因为程序启动时每个点的值都是从0变成N,就需要发消息出来,那一开始消息是很多的,redis根本

    处理不完,而且有很高频率的超时断线。以至于想换RabbitMQ,后来想想还是算了,因为那样增加项目难度不说,后期维护也是个难题。

    说到底这是模块之间的通信,是主程序(Winform)调用采集C++的DLL类库,发出消息后主程序和web端订阅,在主程序与DLL这边,在DLL

    方法上增加一个回调函数就搞定了,完全不用走消息中间件,Web端要哪些点的实时值就先ASK,先请求需要看哪些点,如何在主程序这边

    发布那些点的实时值消息,这样发布订阅的数据量少了2、3个数量级不止。

    二、需求

      针对上边的业务场景,因为是模块之间的线程间通信,这样搞问题不大;如果是进程之间也要那么高频率的通信,那就不好办了,我们

    不想使用重量级MQ,又想高频率传输消息,怎么办呢?网上搜索了一番,貌似没看到有成熟的速度又快、体量又小,部署又简单的中间件。

    所以在下不才,针对这个问题抛砖引玉,做一个demo出来供大家讨论一下。

    三、原理

      应题,就是使用内存映射来做同一个机器下各种消息的通信,之前也写过一篇关于使用共享内存实现快速读写的文章,点击前往浏览

    “.net环境下跨进程、高频率读写数据”,但是内存映射比较适合做消息队列,因为消息可以持久化在本地,没读完下次进来还可以接着读。

    我预想是这样设计:

    1、发布订阅涉及到2个主要方法:Publish(string channel)、Subscribe(string channel, Callback callback);

    2、为每个channel生成一个文件:channel.db,默认每个db可以存储1000个同类型的结构体消息作为消息队列,从头部写入,尾部读出。

       每个db文件前面留一个索引区作为发布方与订阅方各自的读写位置。发布与订阅前,先读写这个索引区,因为是一对一读写,所以

          可以完美避开读写锁,大大提高性能。

    3、针对一对多需求,单独设计一个config.db文件存储种channel与其相关订阅信息,大概原理图如下:

     

    4、解决读写不加锁问题

    我们看结构体:SIndex有三个属性

    1) WriteIndex 记录发布方(Pubish)最后写入数据的位置

    2) ReadIndex 记录订阅方(Subscribe)最后读取数据的位置

    3) Over 表示WriteIndex已达到队列最大值,再WriteIndex小于等于队列最大值前,读写如下图:

    WriteIndex达到最大值后再往下写Over就要取反,如由False变为True。WriteIndex=0

    如果此时没有订阅方,那新消息就会被抛弃,因为已无空间存储。

    4) 如果ReadIndex数值到队列最大值,Over也取反,此时ReadIndex = 0,读写又变成图1所示

    5) 读写过程中并不存在互斥的情况,只要管理好读写位置,就可以避免加锁。

    四、接口设计

    4.1、主要参数定义

    #define FM_MAX_CHANNEL		100		// 暂定最多100个不同频道
    #define FM_MAX_SUBSCRIBE	3		// 暂定最多3个订阅用户
    #define FM_MAX_ROWS			1000	// 暂定最多队列大小为1000
    #define FM_DISCONNECT_TIME  5000	// 超过5000毫秒无心跳更新视为订阅断开
    #define FM_KEEP_CONN_CYCLE  1000	// 保持心跳连接的时间周期
    #define FM_NOTHING			-1		// 空白,数组为0等
    #define FM_WORD_SIZE		sizeof(WORD)	// WORD长度
    #define FM_INDEX_SIZE		sizeof(SIndex)  // SIndex长度 

    4.2、结构体

     1 // 索引
     2 typedef struct
     3 {
     4     WORD WriteIndex;
     5     WORD ReadIndex;
     6     WORD Over;         // 当W或R超过MAX一次,Over取反一次,Over默认为False
     7 }SIndex;
     8 
     9 // 内存映射参数
    10 typedef struct
    11 {
    12     HANDLE FileHandle;
    13     HANDLE FileMappingHandle;
    14     LPVOID MapViewOfFileHandle;
    15     UINT StructSize;
    16     char FileName[20];
    17     UINT SubscribeIndex;
    18     WORD Conned;
    19 }SDbConnInfo;
    20 
    21 // 频道
    22 typedef struct
    23 {
    24     char ChannelName[20];
    25     UINT StructSize;
    26     DWORD Subscribe1LastTime;
    27     DWORD Subscribe2LastTime;
    28     DWORD Subscribe3LastTime;
    29 }SChannel;
    30 
    31 // 频道与订阅映射
    32 typedef struct
    33 {
    34     char ChannelName[20];
    35     SDbConnInfo DbConnInfo[FM_MAX_SUBSCRIBE];
    36 }SChannelMapDbConnInfo;
    View Code

    4.3、主要方法

    	// 发布信息
    	template<typename T>
    	int Publish(const char *channel, T* data);
    
    	// 订阅信息
    	template<typename T>
    	void Subscribe(const char *channel, SubscribeCallBackHandle callback);
    

    五、代码实现

    5.1 、FMDBManager,主要管理内存映射相关操作,因为是读写位置不一样,所以不需要加互斥量

      1 class FMDBManager
      2 {
      3 public:
      4     FMDBManager() {};
      5     ~FMDBManager() {};
      6 
      7 public:
      8     static int Create(SDbConnInfo *info)
      9     {
     10         CString fileName(info->FileName);
     11         DWORD totalSize = (FM_MAX_ROWS * info->StructSize) + FM_INDEX_SIZE;
     12 
     13         info->FileHandle = CreateFile(fileName, (GENERIC_READ | GENERIC_WRITE), (FILE_SHARE_READ | FILE_SHARE_WRITE),
     14             NULL, OPEN_ALWAYS, FILE_FLAG_SEQUENTIAL_SCAN, NULL);
     15 
     16         info->FileMappingHandle = CreateFileMapping(info->FileHandle, NULL, PAGE_READWRITE, 0, totalSize, NULL);
     17 
     18         if(info->FileMappingHandle == NULL || info->FileMappingHandle == INVALID_HANDLE_VALUE) 
     19         {
     20             Log("");
     21             CloseHandle(info->FileHandle);
     22             return enumFail;
     23         }
     24 
     25         if(GetLastError() == ERROR_ALREADY_EXISTS) 
     26         {
     27             Log("");
     28             return enumFail;
     29         }
     30 
     31         // init
     32         info->MapViewOfFileHandle = MapViewOfFile(info->FileMappingHandle, FILE_MAP_ALL_ACCESS, 0, 0, totalSize);
     33 
     34         if(info->MapViewOfFileHandle == NULL) 
     35         {
     36             Log("");
     37             CloseHandle(info->FileMappingHandle);
     38             CloseHandle(info->FileHandle);
     39             return enumFail;
     40         }
     41 
     42         return enumSuccess;
     43     }
     44 
     45 protected:
     46     int Write(void *data, UINT order, SDbConnInfo *info)
     47     {
     48         if(info->MapViewOfFileHandle == NULL) 
     49         {
     50             Log("");
     51             return enumFail;
     52         }
     53         else
     54             memcpy((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, data, info->StructSize);
     55 
     56         return enumSuccess;
     57     }
     58     int Read(void *data, UINT order, SDbConnInfo *info)
     59     {
     60         if(info->MapViewOfFileHandle == NULL) 
     61         {
     62             Log("");
     63             return enumFail;
     64         }
     65         else
     66             memcpy(data, (char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, info->StructSize);
     67 
     68         return enumSuccess;
     69     }
     70     int Delete(UINT order, SDbConnInfo *info)
     71     {
     72         if(info->MapViewOfFileHandle == NULL) 
     73         {
     74             Log("");
     75             return enumFail;
     76         }
     77         else
     78             memset((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE, 0, info->StructSize);
     79 
     80         return enumSuccess;
     81     }
     82 
     83     int WriteConfig(void *data, UINT order, UINT pos, UINT size, SDbConnInfo *info)
     84     {
     85         if(info->MapViewOfFileHandle == NULL) 
     86         {
     87             Log("");
     88             return enumFail;
     89         }
     90         else
     91             memcpy((char *)info->MapViewOfFileHandle + (order * info->StructSize) + FM_INDEX_SIZE + pos, data, size);
     92 
     93         return enumSuccess;
     94     }
     95     int WriteIndex(void *data, UINT pos, UINT size, SDbConnInfo *info)
     96     {
     97         if(info->MapViewOfFileHandle == NULL) 
     98         {
     99             Log("");
    100             return enumFail;
    101         }
    102         else
    103             memcpy((char *)info->MapViewOfFileHandle + pos, data, size);
    104 
    105         return enumSuccess;
    106     }
    107     int ReadIndex(SIndex *sIndex, SDbConnInfo *info)
    108     {
    109         if(info->MapViewOfFileHandle == NULL) 
    110         {
    111             Log("");
    112             return enumFail;
    113         }
    114         else
    115             memcpy(sIndex, (char *)info->MapViewOfFileHandle, FM_INDEX_SIZE);
    116 
    117         return enumSuccess;
    118     }
    119 };
    View Code

    5.2、FMDBClient,内存映射客户端,主要封装Publish与Subscribe方法给前端调用,屏蔽复杂性

      1 class FMDBClient : public FMDBManager
      2 {
      3 private:
      4     mutable std::mutex mut;
      5     SChannelMapDbConnInfo channelMapDbConnInfo = { 0 };
      6 
      7     bool CanWrite(SIndex *sIndex)
      8     {
      9         int nextWriteIndex = sIndex->WriteIndex + 1;
     10         if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;
     11 
     12         return nextWriteIndex != sIndex->ReadIndex;
     13     }
     14     bool CanRead(SIndex *sIndex) {
     15         if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;
     16         else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;
     17     }
     18 
     19     int GetDbConnInfo(const char *channel, int size)
     20     {
     21         int rest = enumFail;
     22 
     23         for(int i = 0; i < FM_MAX_CHANNEL; i++)
     24         {
     25             char channelNameTmp[20] = { 0 };
     26             sprintf_s(channelNameTmp, "%s", channel);
     27 
     28             if(0 == strcmp(channelNameTmp, channelMapDbConnInfoArray[i].ChannelName))
     29             {
     30                 channelMapDbConnInfo = channelMapDbConnInfoArray[i];
     31                 rest = enumSuccess;
     32                 break;
     33             }
     34         }
     35 
     36         return rest;
     37     }
     38     int SetDbConnInfo(const char *channel, UINT *subscribeIndex, SDbConnInfo *dbConnInfo)
     39     {
     40         std::lock_guard<std::mutex> lk(mut);
     41 
     42         int nextSubscribeIndex = fmdbConfig->GetNextSubscribeIndex(channel);
     43         if(nextSubscribeIndex == FM_NOTHING)
     44         {
     45             SChannel sChannel = { 0 };
     46             sprintf_s(sChannel.ChannelName, "%s", channel);
     47             sChannel.Subscribe1LastTime = GetTickCount();
     48             sChannel.StructSize = dbConnInfo->StructSize;
     49 
     50             sprintf_s(dbConnInfo->FileName, "%s.1.db", channel);
     51             if(!fmdbConfig->IsFM_NOTHING(dbConnInfo->FileName) && dbConnInfo->StructSize > 0)
     52             {
     53                 dbConnInfo->SubscribeIndex = 1;
     54                 *subscribeIndex = dbConnInfo->SubscribeIndex;
     55 
     56                 if(Create(dbConnInfo) == enumSuccess) return fmdbConfig->Insert(&sChannel);
     57                 else sprintf_s(dbConnInfo->FileName, "%s", channel); //还原名称
     58             }
     59         }
     60 
     61         if(nextSubscribeIndex > 1)
     62         {
     63             sprintf_s(dbConnInfo->FileName, "%s.%d.db", channel, nextSubscribeIndex);
     64             if(!fmdbConfig->IsFM_NOTHING(dbConnInfo->FileName) && dbConnInfo->StructSize > 0)
     65             {
     66                 dbConnInfo->SubscribeIndex = nextSubscribeIndex;
     67                 *subscribeIndex = nextSubscribeIndex;
     68 
     69                 if(Create(dbConnInfo) == enumSuccess) return fmdbConfig->Save(channel, nextSubscribeIndex);
     70                 else sprintf_s(dbConnInfo->FileName, "%s", channel); //还原名称
     71             }
     72         }
     73 
     74         return enumFail;
     75     }
     76     bool SetSubscribeConned(const char *channel, int subscribeIndex, SDbConnInfo *dbConnInfo)
     77     {
     78         int rest = enumFail;
     79 
     80         if(subscribeIndex <= 0) return rest;
     81 
     82         for(int i = 0; i < FM_MAX_CHANNEL; i++)
     83         {
     84             char channelNameTmp[20] = { 0 };
     85             sprintf_s(channelNameTmp, "%s", channel);
     86 
     87             if(0 == strcmp(channelNameTmp, channelMapDbConnInfoArray[i].ChannelName))
     88             {
     89                 channelMapDbConnInfoArray[i].DbConnInfo[subscribeIndex - 1].SubscribeIndex = dbConnInfo->SubscribeIndex; 
     90                 channelMapDbConnInfoArray[i].DbConnInfo[subscribeIndex - 1].Conned = 1;
     91                 rest = enumSuccess;
     92                 break;
     93             }
     94         }
     95 
     96         return rest;
     97     }
     98     bool IsConning(SDbConnInfo *dbConnInfo) { return true; };
     99 
    100 public:
    101     FMDBClient()
    102     {
    103         while(!fmdbConfigLoadFinish) { Sleep(200); }
    104     };
    105     ~FMDBClient() {};
    106 
    107 public:
    108     int failTimes = 0;
    109 
    110     template<typename T>
    111     int Publish(const char *channel, T* data)
    112     {
    113         int rest = enumFail;
    114 
    115         // 查找
    116         if(GetDbConnInfo(channel, sizeof(T)) == enumFail)
    117         {
    118             printf_s("发布%s失败.
    ", channel);
    119             return enumFail;
    120         }
    121 
    122         for(int i = 0; i < FM_MAX_SUBSCRIBE; i++)
    123         {
    124             if(channelMapDbConnInfo.DbConnInfo[i].FileHandle == NULL) continue;
    125 
    126             while(IsConning(&channelMapDbConnInfo.DbConnInfo[i]))
    127             {
    128                 SIndex sIndex = { 0 };
    129                 if(ReadIndex(&sIndex, &channelMapDbConnInfo.DbConnInfo[i]) == enumFail)
    130                 {
    131                     throw "映射文件加载失败";
    132                 }
    133 
    134                 if(CanWrite(&sIndex))
    135                 {
    136                     WORD writeIndex = sIndex.WriteIndex;
    137                     if(Write(data, writeIndex, &channelMapDbConnInfo.DbConnInfo[i]) == enumSuccess)
    138                     {
    139                         writeIndex++;
    140                         if(writeIndex > FM_MAX_ROWS)
    141                         {
    142                             writeIndex = 0;
    143 
    144                             WORD Over = TRUE;
    145                             WriteIndex(&Over, (FM_WORD_SIZE * 2), FM_WORD_SIZE, &channelMapDbConnInfo.DbConnInfo[i]);
    146                         }
    147 
    148                         rest = WriteIndex(&writeIndex, 0, FM_WORD_SIZE, &channelMapDbConnInfo.DbConnInfo[i]);
    149                         break;
    150                     }
    151                 }
    152                 else
    153                 {
    154                     failTimes++;
    155                 }
    156             }
    157         }
    158 
    159         return rest;
    160     }
    161 
    162     template<typename T>
    163     void Subscribe(const char *channel, SubscribeCallBackHandle callback)
    164     {
    165         SDbConnInfo dbConnInfo = { 0 };
    166         dbConnInfo.StructSize = sizeof(T);
    167 
    168         UINT subscribeIndex = 0;
    169         if(SetDbConnInfo(channel, &subscribeIndex, &dbConnInfo) == enumFail)
    170         {
    171             printf_s("订阅%s失败.
    ", channel);
    172             return;
    173         }
    174 
    175         while(IsConning(&dbConnInfo))
    176         {
    177             SetSubscribeConned(channel, subscribeIndex, &dbConnInfo);
    178 
    179             SIndex sIndex = { 0 };
    180             if(ReadIndex(&sIndex, &dbConnInfo) == enumFail) throw "映射文件加载失败";
    181             if(!CanRead(&sIndex)) continue;
    182 
    183             T t = { 0 };
    184             int readIndex = sIndex.ReadIndex;
    185             if(Read(&t, readIndex, &dbConnInfo) == enumSuccess)
    186             {
    187                 readIndex++;
    188                 if(readIndex > FM_MAX_ROWS)
    189                 {
    190                     readIndex = 0;
    191 
    192                     WORD Over = FALSE;
    193                     WriteIndex(&Over, (FM_WORD_SIZE * 2), FM_WORD_SIZE, &dbConnInfo);
    194                 }
    195 
    196                 if(WriteIndex(&readIndex, FM_WORD_SIZE, FM_WORD_SIZE, &dbConnInfo) == enumSuccess)
    197                     if(Delete(sIndex.ReadIndex, &dbConnInfo) == enumSuccess)
    198                         if(callback(&t) == enumBreak) break;
    199             }
    200         }
    201     }
    202 };
    View Code

     请注意上边控制读写的2个方法

    	bool CanWrite(SIndex *sIndex)
    	{
    		int nextWriteIndex = sIndex->WriteIndex + 1;
    		if(nextWriteIndex > FM_MAX_ROWS) nextWriteIndex = 0;
    
    		return nextWriteIndex != sIndex->ReadIndex;
    	}
    	bool CanRead(SIndex *sIndex) 
    	{
    		if(sIndex->Over) return sIndex->ReadIndex > sIndex->WriteIndex;
    		else return sIndex->ReadIndex + 1 <= sIndex->WriteIndex;
    	}
    

    我们可以分析一下,下一个WriteIndex值如果大于队列最大值 WriteIndex置0,下一个WriteIndex数值如果不等于

    正在读的位置ReadIndex就能写;如果WriteIndex没有超出最大值,只要ReadIndex小于等于WriteIndex就能读,

    如果超出,就判断ReadIndex大于WriteIndex就能读。WriteIndex与ReadIndex数值在Publish与Subscribe中维护

    5.3、建立新线程获取最新订阅的客户端信息,这个功能主要是动态地像多个Subscribe端发生消息,比如订阅发生在发布之后,

    也应该能收到消息。

     1 void Update()
     2 {
     3     while(true)
     4     {
     5         if(fmdbConfig->GetChannelArray() == enumSuccess)
     6         {
     7             for(int i = 0; i < FM_MAX_CHANNEL; i++)
     8             {
     9                 if(fmdbConfig->IsFM_NOTHING(channelMapDbConnInfoArray[i].ChannelName)) continue;
    10 
    11                 for(int j = 0; j < FM_MAX_SUBSCRIBE; j++)
    12                 {
    13                     if(channelMapDbConnInfoArray[i].DbConnInfo[j].StructSize <= 0) continue;
    14 
    15                     // KeepConned
    16                     if(channelMapDbConnInfoArray[i].DbConnInfo[j].Conned)
    17                     {
    18                         fmdbConfig->KeepConned(channelMapDbConnInfoArray[i].ChannelName,
    19                             channelMapDbConnInfoArray[i].DbConnInfo[j].SubscribeIndex);
    20 
    21                         channelMapDbConnInfoArray[i].DbConnInfo[j].Conned = 0;
    22                         //printf_s("%s.KeepConned.
    ", channelDbParsArray[i].SDbPars[j].Channel);
    23                     }
    24 
    25                     if(!fmdbConfig->Exists(channelMapDbConnInfoArray[i].DbConnInfo[j].FileName))
    26                     {
    27                         FMDBManager::Create(&channelMapDbConnInfoArray[i].DbConnInfo[j]);
    28                         fmdbConfig->AddChannel(channelMapDbConnInfoArray[i].DbConnInfo[j].FileName);
    29                     }
    30                 }
    31             }
    32         }
    33 
    34         fmdbConfigLoadFinish = true;
    35         Sleep(1000);
    36     }
    37 }
    38 thread th(Update);
    View Code

     六、Demo测试

    6.1、Producer.cpp

     1 #include "pch.h"
     2 #include "../FMDB.h"
     3 
     4 using namespace std;
     5 
     6 int main()
     7 {
     8     FMClient * client = new FMClient();
     9 
    10     int times = 0;
    11     int index = 0;
    12     int total = 0;
    13     UINT structSize = sizeof(SPerson);
    14     DWORD dwStartTmp = GetTickCount();
    15 
    16     while(TRUE)
    17     {
    18         times++;
    19         if(index == 0)
    20         {
    21             dwStartTmp = GetTickCount();
    22         }
    23 
    24         SPerson sPerson = { 0 };
    25         sPerson.Idx = index;
    26         sprintf_s(sPerson.Name, "Name.%d", index);
    27         sPerson.Age = index;
    28 
    29         if(client->Publish("Person", &sPerson) == enumSuccess)
    30         {
    31             if(index % 2 == 0) total = total + sPerson.Idx;
    32             else total = total - sPerson.Idx;
    33 
    34             index++;
    35             if(index % 50000 == 0)
    36                 printf_s("发送条数: %d, 耗时:%d 
    ", index, (GetTickCount() - dwStartTmp));
    37         }
    38 
    39         if(index >= 2000000) break;
    40     }
    41 
    42     printf_s("调用次数: %d, 成功条数: %d, 检验值: %d 
    ", times, index, total);
    43     system("pause");
    44 }

    6.2、Consumer.cpp

     1 #include "pch.h"
     2 #include "../FMDB.h"
     3 
     4 using namespace std;
     5 
     6 int index = 0;
     7 int total = 0;
     8 DWORD dwStartTmp = GetTickCount();
     9 
    10 int SubscribeCallback(void *msg)
    11 {
    12     SPerson * person = (SPerson *)msg;
    13 
    14     if(index == 0)
    15     {
    16         dwStartTmp = GetTickCount();
    17     }
    18 
    19     if(index % 2 == 0) total = total + person->Idx;
    20     else total = total - person->Idx;
    21 
    22     index++;
    23     if(index % 50000 == 0)
    24     {
    25         printf("接收条数: %d, 耗时:%d, Idx:%d, Name:%s, Age:%d
    ",
    26             index, (GetTickCount() - dwStartTmp), person->Idx, person->Name, person->Age);
    27     }
    28 
    29     if(index >= 2000000)
    30     {
    31         return enumBreak;
    32     }
    33 
    34     return enumSuccess;
    35 };
    36 
    37 int main()
    38 {
    39     FMClient * client = new FMClient();
    40     client->Subscribe<SPerson>("Person", SubscribeCallback);
    41 
    42     printf("接收条数: %d, 检验值: %d 
    ", index, total);
    43     system("pause");
    44 }

    6.3、运行,测试用例中使用了向队列发送200万条数据,消息大小128字节,订阅端也是接受到200万数据后退出,并且打印检验值。

    1) 检验值计算:0+1-2+3-4+ ---------  - 2000000 = -1000000,如果队列运行正常,那两边的检验值应该都是是 -1000000.

    2) 每5万条打印一次日志,运行情况如下

    一对一方式运行三次,分别耗时(毫秒):2886、2979、2871

    3) 一对二方式运行三次,分别耗时(毫秒):4087、4009、4040

    4)运行过程中产生的文件

    6.4、200万数据一对一耗时近3秒,貌似也不是非常快是不是?但是这就是最大速度了吗?

    当然不是哦,别忘了这是debug版本,我们切换到release版本看速度会不会有所提升。

     

    一对一运行三次耗时分别是:1224、1373、1326

    厉害了,

    SPerson结构体128字节,每秒可以处理180万数据,当然实际运用肯定达不到,因为处理其他业务逻辑也要耗时间。

    好了,为了这个demo脑壳都想疼了,思考模型,调试BUG,期间各种问题,实在茶壶煮饺子,有苦说不出。

    你看,又浪费我周末2天时间,期间就吃了一餐,今天的还没吃呢,等下去旁边山上走走,不然就要发霉了。拜拜。。。

  • 相关阅读:
    jQuery 基本选择器
    JavaScriptif while for switch流程控制 JS函数 内置对象
    JavaScrip基本语法
    数据库 存储引擎 表的操作 数值类型 时间类型 字符串类型 枚举集合 约束
    数据库基础知识 管理员 用户登录授权的操作
    粘包的产生原理 以及如何解决粘包问题
    socket TCP DPT 网络编程
    2018年年终总结
    Android技术分享
    No accelerator found
  • 原文地址:https://www.cnblogs.com/lanxiaoke/p/10228355.html
Copyright © 2011-2022 走看看