zoukankan      html  css  js  c++  java
  • 利用ZeroMQ消息队列实现多端口并发数据处理

    这里至于ZeroMQ是什么就不过多赘述了,这是基础的知识,不懂的读者可以先去百度一下,这里也提供一个老哥的详细解释,贴上链接大家可以去看看https://www.cnblogs.com/chenny7/p/6245236.html。

    先说一下为什么要这样处理,本人是从事新能源电池检测相关工作的,属于工控的领域,有大量的指令下发(通过上位机的控制),指令下发以后会接收到下位机(电源模块)上传的数据,是一对多的模式(一个上位机,面对着多个电源模块)。这里就有一个并发的问题,同时要接收多个电源模块上传的数据,而且是1秒钟一条数据,这个并发量相对来说比较大,为了性能考虑,防止拥塞这里用到了ZeroMQ消息队列,ZMQ对这一块的处理相对合理。

    这里只是一个模拟的程序,相当于一个demo,实际开发工程中要比这复制的多,因为上抛的数据还分很多种,比如记录数据,告警数据,针床的数据等等。

    用QT做了一个简单的界面,这里的姓名  年龄  学号模拟的是上位机控制指令,当然实际的情况是上位机下发有相对应的协议,这个协议必须是上位机和下位机定义好。

    首先是定义初始化上下文,这是ZeroMQ中必须要做的工作。

    void ZeroMQComm::InitCtx()
    {
        //1.建上下文
        m_ctx = zmq_init(1);
        if (!m_ctx)
        {
            printf("build zmq_init():%s
    ", zmq_strerror(errno));
            return;
        }
        else
        {
            printf("创建上下文成功!
    ");
        }
    }

    然后初始化socket,其实可以把ZMQ看成是对socket的进一步封装,这里用到了3个端口,5000,5001,5002。为什么要用这三个端口呢?每一个端口代表了不同的信息,比如5000端口是作为的PUSH端,相当于一个分发任务的角色,这个demo中

    当点击发送指令的时候就会利用这个端口来给PULL端发送数据。5001端口(作为PULL端)是接收及格数据,5002端口(作为PULL端)是接收不及格数据,这两个端口模式的是在实际的项目中的记录数据和告警数据,记录数据是通过5001端口上抛,告

    警数据是由5002端口上抛。下面代码中绑定的IP可以忽略,我是按照我测试的环境中来配置IP,读者要进行的时候需要根据自己的实际情况来!

    void ZeroMQComm::InitSockets()
    {
        //1.创建发布Socket通讯对象
        //该对象用于下中位机发送控制指令
        if((m_sokt5000 = zmq_socket(m_ctx, ZMQ_PUSH)) == NULL)
        {
            printf("%s
    ", zmq_strerror(errno));
            zmq_close(m_sokt5000);
            zmq_ctx_term(m_ctx);
            return;
         }
        else
        {
            printf("创建发送端口socket成功!
    ");
        }
    
    
        int tmp_bet = zmq_bind(m_sokt5000, "tcp://10.168.205.73:5000");
        //printf("zmq_bind=%d
    ", tmp_bet);
        if (tmp_bet < 0)
        {
            printf("bind port %s
    ", zmq_strerror(errno));
            return;
        }
        else
        {
            printf("bind port success!
    ");
        }
        
    
        //设置发送超时时间3秒
        int iSendTimeout = 3000;
        if (zmq_setsockopt(m_sokt5000, ZMQ_SNDTIMEO, &iSendTimeout, sizeof(iSendTimeout)) < 0)
        {
            zmq_close(m_sokt5000);
            zmq_ctx_destroy(m_ctx);
            return;
        }
    
        //2.创建中位机回复Socket
        m_sokt5001 = zmq_socket(m_ctx, ZMQ_PULL);
        zmq_connect(m_sokt5001, "tcp://10.168.205.73:5001");
    
        //设置接收超时时间5秒
        int iRcvTimeout = 5000;
        if (zmq_setsockopt(m_sokt5001, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
        {
            zmq_close(m_sokt5001);
            zmq_ctx_destroy(m_ctx);
            return;
        }
    
    
        //3.创建中位机警告信息Socket
        m_sokt5002 = zmq_socket(m_ctx, ZMQ_PULL);
        zmq_connect(m_sokt5002, "tcp://10.168.205.73:5002");
    
        if (zmq_setsockopt(m_sokt5002, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
        {
            zmq_close(m_sokt5002);
            zmq_ctx_destroy(m_ctx);
            return;
        }
    
    
    
    }

    这里是上面界面中"发送指令"按钮的信号和槽,关于Qt的信号和槽也不赘述了,想要了解的自行去百度。

        connect(m_pBtnSendMedCom, SIGNAL(clicked()), this, SLOT(slot_sendmedcomsg()));

    槽函数的实现是获取到姓名 年龄 学号然后通过封装的接口下发(从PUSH端到PULL端),对于每一个QLineEdit都了相应容错处理,不能为空,然后通过QJsonObject以json的形式下发数据,m_VirUpComZMQ就是封装的一个ZMQ的类,在最后通过SendCmd5000()

    接口下发数据。

    void CVirUpComWgt::slot_sendmedcomsg()
    {
        QString GetNameStr = m_pLeditName->text();
        if (GetNameStr == "")
        {
            //printf("姓名不能为空!
    ");
            QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("姓名不能为空"));
            return;
        }
    
        QString GetAgeStr = m_pLeditAge->text();
    
        if (GetAgeStr == "")
        {
            //printf("姓名不能为空!
    ");
            QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("年龄不能为空"));
            return;
        }
    
        QString GetStuNoStr = m_pLeditStuno->text();
    
        if (GetStuNoStr == "")
        {
            //printf("姓名不能为空!
    ");
            QMessageBox::warning(NULL, QString::fromLocal8Bit("警告"), QString::fromLocal8Bit("学号不能为空"));
            return;
        }
    
        QJsonObject basemsg_json;
        basemsg_json.insert("name", GetNameStr.toLocal8Bit().data());
        basemsg_json.insert("age", GetAgeStr.toInt());
        basemsg_json.insert("stuno", GetStuNoStr.toInt());
    
        QJsonDocument document;
        document.setObject(basemsg_json);
        QByteArray basemsg_array = document.toJson(QJsonDocument::Compact);
    
        QString BaseMsgJsonStr(basemsg_array);
        qDebug() << QString::fromLocal8Bit("简单的QtJson数据:") << BaseMsgJsonStr;
    
        //发送指令
        m_VirUpComZMQ->SendCmd5000(BaseMsgJsonStr.toLocal8Bit().data());
    
        
    }

    在模拟的这个上位机程序中还要对接收模块做处理,当下发了指令以后会接收到上抛的数据,这些数据会通过5001端口和5002端口来接收,前面已经提到过,它们是PULL端。这里要创建两个线程来专门接收上抛的数据。

    void ZeroMQComm::CreateCommThread()
    {
        DWORD dwThreadID = 0;
        m_handle5001 = CreateThread(NULL, 0, ZeroMQComm::Recv5001Msg, this, 0, &dwThreadID);
        m_handle5002 = CreateThread(NULL, 0, ZeroMQComm::Recv5002Msg, this, 0, &dwThreadID);
    }

    定时器的处理是为了定时刷新界面上接收到的数据,表示数据一直在发送中,这个就相当于实际项目中,电源模块不停的给上位机上抛数据。

        m_timer = new QTimer(this);
        connect(m_timer, SIGNAL(timeout()), this, SLOT(slot_updaterecordmsg()));
        m_timer->start(50);

    到此,上位机的模拟程序就差不多完成了。一些核心的东西没有贴出来,大概的思路已经给读者梳理情况了,其实也没那么难,自己动手去写一下很快就能明白。

    模拟下位机的程序是完全可以倒推出来的。

    比如下位机肯定也需要做上下文初始化和socket的初始化。这是ZMQ更古不变的原则,请参考上面模拟上位机的操作。

    其次,下位机这边肯定也需要一个线程来接收上位机下发的指令

    void CVirMedComZMQ::CreateRcvUpComDataThread()
    {
        DWORD dwThreadID = 0;
        m_handle5000 = CreateThread(NULL, 0, CVirMedComZMQ::Recv5000Msg, this, 0, &dwThreadID);
    }

    接收到了上位机的数据以后直接上抛数据给上位机,这里做的相对简单了一些,只是做了指针的判空处理,实际项目中肯定对于接收到了上位机的数据是要做一定处理的。当接收到的数据不为空的情况下,直接通过5001端口和5002端口发送数据到上位机,

    上位机的接收端口也是5001(PULL端)和5002(PULL端)。

    DWORD WINAPI CVirMedComZMQ::Recv5000Msg(LPVOID para)
    {
        CVirMedComZMQ* VirMedCom = (CVirMedComZMQ*)(para);
        if (!VirMedCom)
        {
            return 0;
        }
    
        while (true)
        {
            bool isAcceptMsg = VirMedCom->Is5000AcceptMsg();
            void* sokt5000 = VirMedCom->Get5000Socket();
            while (isAcceptMsg && sokt5000)
            {
                char* msg = s_recv(sokt5000);
                if (msg != NULL)
                {
                    VirMedCom->RcvUpComData(msg);
                    while (true)
                    {
                        //通过5001端口发送及格门数数据
                        srand(clock());
                        QString PssNumStr = QString::number(rand() % 100);
                        QJsonObject passnum_json;
                        passnum_json.insert("Pass", PssNumStr.toInt());
                        QJsonDocument pass_document;
                        pass_document.setObject(passnum_json);
                        QByteArray passnum_array = pass_document.toJson(QJsonDocument::Compact);
                        QString PassNumJsonStr(passnum_array);
                        qDebug() << QString::fromLocal8Bit("发送及格门数数据:") << PassNumJsonStr;
                        VirMedCom->Send5001BaseData(PassNumJsonStr.toLocal8Bit().data());
    
                        //通过5002端口发送不及格门数数据
                        QString FailNumStr = QString::number(rand() % 100);
                        QJsonObject failnum_json;
                        failnum_json.insert("Fail", FailNumStr.toInt());
                        QJsonDocument fail_document;
                        fail_document.setObject(failnum_json);
                        QByteArray failnum_array = fail_document.toJson(QJsonDocument::Compact);
                        QString FailNumJsonStr(failnum_array);
                        qDebug() << QString::fromLocal8Bit("发送不及格门数数据:") << FailNumJsonStr;
                        VirMedCom->Send5002WarnData(FailNumJsonStr.toLocal8Bit().data());
                    }
                }
                free(msg);
            }
            s_sleep(1);
        }
    }

    到此,整个模拟的上下位机已经完成,程序中还有很多地方不完善,读者有问题可以一起讨论。

  • 相关阅读:
    缓存问题
    基情探测器心得
    新手最常见的误解和错误
    C语言书籍推荐
    初学者编程实战指南 (4) 由一个简单的例子学习抽象
    数据结构的动画演示
    利用IDE使你的代码风格好看一些
    初学者编程实战指南 (2) 避免逻辑的重复
    入门编程语言的选择问题
    关于ACM集训队
  • 原文地址:https://www.cnblogs.com/joorey/p/14785608.html
Copyright © 2011-2022 走看看