zoukankan      html  css  js  c++  java
  • zeromq中两个dealer 通过一个router进行通信

    发现有童鞋不是很清楚ZMQ中的“请求-回复”模式中的ROUTER怎么用,所以简单介绍一下“请求-回复”模式的使用(最后付代码)。

    一、讲一讲

    1、要使用zmq 通过一个router进行通信,你首先需要知道ZMQ中的“请求-回复”模式,不清楚的话可以先看一下下面这篇文章,连接如下:

    http://www.cnblogs.com/fengbohello/p/4354989.html

    在“请求-回复”模式中,router是一个比较特殊的 socket类型,它会把它接收到的第一个消息作为消息来源的标志,也就是消息源的identity;而在使用ZMQ_ROUTER类型的socket发送消息的时候呢,会把这个socket发送的第一个消息作为目的地址,及消息的目的identity。示意图如下

    1、identity 为 aaa 的 socket 发送了一个消息,这个消息由两部分组成,一个是目的 socket的identity,名字为bbb,另一个是真正的消息,就是"hello"。

    2、当aaa 发送的消息被其连接的router接收到之后呢,就不仅仅是刚刚的消息了,ZMQ的底层会偷偷的增加一个消息,那就是 aaa 的identity,所以在 router 看来呢,它接收到的其实是三部分的消息,第一个是消息的来源,第二个是目的地址(bbb 的 identity),第三部分就是真正要传达的信息。

    3、当router接收到这么一个消息的时候,会发现,这个消息来源于aaa,并且是发向bbb的,所以router就会发送如下消息:首先发送一个 bbb,表示要发给的目的地址的identity是bbb,然后发送aaa,最后是信息hello。

    4、identity 为 bbb的dealer 接收到消息之后,就只有aaa和hello了。router发送的时候不是首先发送了一个bbb吗,去哪里了呢?这次被ZMQ偷偷的拿走了。这就是router的神奇之处,它会看到ZMQ_DEALER和ZMQ_REQ/ZMQ_REP不能看到的东西

    现在再把“请求-回复”模式的规则说一下:就是,ZMQ_ROUTER能够看到消息的来源,以及消息的去向,并且ZMQ_ROUTER会把接收到的第一个消息作为消息来源的identity,把发送的第一个消息作为消息目的地址的identity。

    二、下面是代码,代码由五个文件组成,还有一个makefile。

      我相信在你使用ZMQ的时候已经安装好了ZMQ的链接库,如果确实还没有安装好的话,按照下面这篇文章安装就可以了。http://www.cnblogs.com/fengbohello/p/4046686.html

      注:代码在CentOS下编译并运行通过,其它机器没有测试。

    在本页复制或者到我的百度网盘进行下载:dlr2rtr2dlr.rar  http://pan.baidu.com/s/1pJIICpt

    comm.h

    //comm.h
    #ifndef _ZMQCOMM_H_
    #define _ZMQCOMM_H_
    #include <zmq.h>
    
    #define NAME_LEN    256
    #define MSG_LEN        1024
    
    typedef struct {
        char szSrc[NAME_LEN];
        char szDst[NAME_LEN];
        char szMsg[MSG_LEN];
    }Zmqmsg;
    
    typedef struct {
        void * sock;
        int iType;
    }ZmqSock;
    
    void lockSocket();
    void unlockSocket();
    
    int  s_recv(ZmqSock * sock, Zmqmsg * zMsg);
    int  s_send(ZmqSock * sock, Zmqmsg * zMsg);
    
    #endif
    View Code

    comm.c

    //comm.c
    #include <string.h>
    #include "comm.h"
    
    void lockSocket()
    {
        // lock
    }
    void unlockSocket()
    {
        // unlock
    }
    
    int  s_recv(ZmqSock * zmqsock, Zmqmsg * zMsg)
    {
        if(NULL == zmqsock || NULL == zMsg)
        {
            return -1;
        }
        int iRet = -1;
        lockSocket();
        int iType = 0;
        int len = sizeof(iType);
        void * sock = zmqsock->sock;
        do{
            iType = zmqsock->iType;
            if(ZMQ_ROUTER == iType)
            {
                printf("router:
    ");
                errno = 0;
                if(zmq_recv(sock, zMsg->szSrc, sizeof(zMsg->szSrc), 0) < 0)
                {
                    printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                    break;
                }
                printf("recv : [%s]
    ", zMsg->szSrc);
                if(zmq_recv(sock, NULL, 0, 0) < 0)
                {
                    printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                    break;
                }
                printf("recv : []
    ");
            }
            else if (ZMQ_DEALER == iType)
            {
                printf("dealer:
    ");
                if(zmq_recv(sock, NULL, 0, 0) < 0)
                {
                    printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                    break;
                }
                printf("recv : []
    ");
            }
            else if (ZMQ_REQ == iType)
            {
                printf("req:
    ");
            }
            else if (ZMQ_REP == iType)
            {
                printf("rep:
    ");
            }
    
            if(zmq_recv(sock, zMsg->szDst, sizeof(zMsg->szDst), 0) < 0)
            {
                printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                break;
            }
            printf("recv : [%s]
    ", zMsg->szDst);
            if(zmq_recv(sock, NULL, 0, 0) < 0)
            {
                printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                break;
            }
            printf("recv : []
    ");
            if(zmq_recv(sock, zMsg->szMsg, sizeof(zMsg->szMsg), 0) < 0)
            {
                printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                break;
            }
            printf("recv : [%s]
    ", zMsg->szMsg);
            iRet = 0;
        }while(0);
        unlockSocket();
    
        return iRet;
    }
    
    int  s_send(ZmqSock * zmqsock, Zmqmsg * zMsg)
    {
        if(NULL == zmqsock || NULL == zMsg)
        {
            return -1;
        }
        int iRet = -1;
        lockSocket();
        int iType = zmqsock->iType;
        int len = sizeof(iType);
        void * sock = zmqsock->sock;
        do{
            if(ZMQ_ROUTER == iType)
            {
                printf("router:
    ");
                if(zmq_send(sock, zMsg->szDst, strlen(zMsg->szDst), ZMQ_SNDMORE) < 0)
                {
                    printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                    break;
                }
                printf("send : [%s]
    ", zMsg->szDst);
                if(zmq_send(sock, "", 0, ZMQ_SNDMORE) < 0)
                {
                    printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                    break;
                }
                printf("send : []
    ");
                if(zmq_send(sock, zMsg->szSrc, strlen(zMsg->szSrc), ZMQ_SNDMORE) < 0)
                {
                    printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                    break;
                }
                printf("send : [%s]
    ", zMsg->szSrc);
            }
            else if (ZMQ_DEALER == iType)
            {
                printf("dealer:
    ");
                if(zmq_send(sock, "", 0, ZMQ_SNDMORE) < 0)
                {
                    printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                    break;
                }
                printf("send : []
    ");
                if(zmq_send(sock, zMsg->szDst, strlen(zMsg->szDst), ZMQ_SNDMORE) < 0)
                {
                    printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                    break;
                }
                printf("send : [%s]
    ", zMsg->szDst);
            }
            else if (ZMQ_REQ == iType || ZMQ_REP == iType)
            {
                printf("rex:
    ");
                if(zmq_send(sock, zMsg->szDst, strlen(zMsg->szDst), ZMQ_SNDMORE) < 0)
                {
                    printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                    break;
                }
                printf("send : [%s]
    ", zMsg->szDst);
            }
    
            if(zmq_send(sock, "", 0, ZMQ_SNDMORE) < 0)
            {
                printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                break;
            }
            printf("send : []
    ");
            if(zmq_send(sock, zMsg->szMsg, strlen(zMsg->szMsg), 0) < 0)
            {
                printf("send msg faild : [%s]
    ", zmq_strerror(errno));
                break;
            }
            printf("send : [%s]
    ", zMsg->szMsg);
            iRet = 0;
        }while(0);
        unlockSocket();
    
        return iRet;
    }
    View Code

    recv.c

    //recv.c
    //包含zmq的头文件 
    #include <zmq.h>
    #include <stdio.h>
    #include <string.h>
    #include "comm.h"
    
    int main(int argc, char * argv[])
    {
        void * pCtx = NULL;
        void * pSock = NULL;
        const char * pAddr = "ipc://drd.ipc";
    
        //创建context,zmq的socket 需要在context上进行创建 
        if((pCtx = zmq_ctx_new()) == NULL)
        {
            return 0;
        }
        //创建zmq socket ,socket目前有6中属性 ,这里使用dealer方式
        //具体使用方式请参考zmq官方文档(zmq手册) 
        if((pSock = zmq_socket(pCtx, ZMQ_DEALER)) == NULL)
        {
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        int iRcvTimeout = 5000;// millsecond
        //设置zmq的接收超时时间为5秒 
        if(zmq_setsockopt(pSock, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
        {
            zmq_close(pSock);
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        char * pName = "recv";
        if(zmq_setsockopt(pSock, ZMQ_IDENTITY, pName, strlen(pName)) < 0)
        {
            zmq_close(pSock);
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        //绑定地址 ipc://drd.ipc 
        //也就是使用ipc协议进行通信,地址为当前目录下的drd.ipc
        if(zmq_connect(pSock, pAddr) < 0)
        {
            zmq_close(pSock);
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        printf("bind at : %s
    ", pAddr);
        ZmqSock zmqsock;
        zmqsock.iType = ZMQ_DEALER;
        zmqsock.sock = pSock;
        while(1)
        {
            printf("waitting...
    ");
            errno = 0;
            //循环等待接收到来的消息,当超过5秒没有接到消息时,
            //recv函数返回错误信息 ,并使用zmq_strerror函数进行错误定位 
            Zmqmsg zmsg;
            memset(&zmsg, 0, sizeof(zmsg));
            if(s_recv(&zmqsock, &zmsg) < 0)
            {
                printf("error = %s
    ", zmq_strerror(errno));
                continue;
            }
            printf("------------------------
    ");
        }
    
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    View Code

    router.c

    //router.c
    //包含zmq的头文件 
    #include <zmq.h>
    #include <stdio.h>
    #include <string.h>
    #include "comm.h"
    
    int main(int argc, char * argv[])
    {
        void * pCtx = NULL;
        void * pSock = NULL;
        const char * pAddr = "ipc://drd.ipc";
    
        //创建context,zmq的socket 需要在context上进行创建 
        if((pCtx = zmq_ctx_new()) == NULL)
        {
            return 0;
        }
        //创建zmq socket ,socket目前有6中属性 ,这里使用dealer方式
        //具体使用方式请参考zmq官方文档(zmq手册) 
        if((pSock = zmq_socket(pCtx, ZMQ_ROUTER)) == NULL)
        {
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        int iRcvTimeout = -1;// millsecond
        //设置zmq的接收超时时间为5秒 
        if(zmq_setsockopt(pSock, ZMQ_RCVTIMEO, &iRcvTimeout, sizeof(iRcvTimeout)) < 0)
        {
            zmq_close(pSock);
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        char * pName = "router";
        if(zmq_setsockopt(pSock, ZMQ_IDENTITY, pName, strlen(pName)) < 0)
        {
            zmq_close(pSock);
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        //绑定地址 ipc://drd.ipc 
        //也就是使用ipc协议进行通信,地址为当前目录下的drd.ipc
        if(zmq_bind(pSock, pAddr) < 0)
        {
            zmq_close(pSock);
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        printf("bind at : %s
    ", pAddr);
        ZmqSock zmqsock;
        zmqsock.iType = ZMQ_ROUTER;
        zmqsock.sock = pSock;
        while(1)
        {
            printf("waitting...
    ");
            errno = 0;
            //循环等待接收到来的消息,当超过5秒没有接到消息时,
            //recv函数返回错误信息 ,并使用zmq_strerror函数进行错误定位 
            Zmqmsg zmsg;
            memset(&zmsg, 0, sizeof(zmsg));
            if(s_recv(&zmqsock, &zmsg) < 0)
            {
                printf("error = %s
    ", zmq_strerror(errno));
                continue;
            }
            if(s_send(&zmqsock, &zmsg) < 0)
            {
                printf("error = %s
    ", zmq_strerror(errno));
                continue;
            }
        }
    
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    View Code

    send.c

    //send.c
    //包含zmq的头文件 
    #include <zmq.h>
    #include <stdio.h>
    #include <string.h>
    #include "comm.h"
    
    int main(int argc, char * argv[])
    {
        void * pCtx = NULL;
        void * pSock = NULL;
        //使用ipc协议进行通信,需要连接的目标机器IP地址为drd.ipc
        const char * pAddr = "ipc://drd.ipc";
    
        //创建context 
        if((pCtx = zmq_ctx_new()) == NULL)
        {
            return 0;
        }
        //创建socket 
        if((pSock = zmq_socket(pCtx, ZMQ_DEALER)) == NULL)
        {
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        int iSndTimeout = 5000;// millsecond
        //设置接收超时 
        if(zmq_setsockopt(pSock, ZMQ_RCVTIMEO, &iSndTimeout, sizeof(iSndTimeout)) < 0)
        {
            zmq_close(pSock);
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        char * pName = "send";
        if(zmq_setsockopt(pSock, ZMQ_IDENTITY, pName, strlen(pName)) < 0)
        {
            zmq_close(pSock);
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        if(zmq_connect(pSock, pAddr) < 0)
        {
            zmq_close(pSock);
            zmq_ctx_destroy(pCtx);
            return 0;
        }
        printf("connect to [%s]
    ", pAddr);
        ZmqSock zmqsock;
        zmqsock.iType = ZMQ_DEALER;
        zmqsock.sock = pSock;
        //循环发送消息 
        while(1)
        {
            static int i = 0;
            Zmqmsg zmsg;
            memset(&zmsg, 0, sizeof(zmsg));
            snprintf(zmsg.szDst, sizeof(zmsg.szDst), "recv");
            snprintf(zmsg.szMsg, sizeof(zmsg.szMsg), "hello world : %3d", i++);
            printf("Enter to send...
    ");
            if(s_send(&zmqsock, &zmsg) < 0)
            {
                fprintf(stderr, "send message faild
    ");
            }
            printf("------------------------
    ");
            getchar();
        }
    
        zmq_close(pSock);
        zmq_ctx_destroy(pCtx);
        return 0;
    }
    View Code

    Makefile

    .PHONY : dummy clean
    
    CFLAGS    = -Wall
    LDFLAGS    = -lzmq -lpthread
    
    CC        = gcc -g
    CXX        = g++ -g
    MAKEF    = make -f Makefile
    CPA        = cp -a
    MAKE    = $(CC)
    
    subdir-list         = $(patsubst %,_subdir_%,$(SUB_DIRS))
    subdir-clean-list     = $(patsubst %,_subdir_clean_%,$(SUB_DIRS))
    
    %.o: %.c
        $(MAKE) -o $@ -c $< $(CFLAGS)
    
    %.o: %.cpp
        $(MAKE) -o $@ -c $< $(CFLAGS)
    
    %.os: %.c
        $(MAKE) -fPIC -c $< -o $@ $(CFLAGS) 
    
    %.os: %.cpp
        $(MAKE) -fPIC -c $< -o $@ $(CFLAGS) 
    
    ALL_FILES    = recv send router
    
    all : $(ALL_FILES)
    
    recv : recv.o comm.o
        $(MAKE) -o $@ $(LDFLAGS) $^
    
    send : send.o comm.o
        $(MAKE) -o $@ $(LDFLAGS) $^
    
    router : router.o comm.o
        $(MAKE) -o $@ $(LDFLAGS) $^
    
    clean : 
        rm -rf *.o
        rm -rf $(ALL_FILES)
    View Code

    作者 :风波

    mail : fengbohello@qq.com

  • 相关阅读:
    [转]C#、VB.NET使用HttpWebRequest访问https地址(SSL)的实现
    C#设置System.Net.ServicePointManager.DefaultConnectionLimit,突破Http协议的并发连接数限制
    [转]WebBrowser控件禁用超链接转向、脚本错误提示、默认右键菜单和快捷键
    [转]C#打印DataGridView的例子源码
    c# TreeView 父节点选中/不选时子节点都同步选中/不选
    C#中PictureBox异步加载图片
    [转]FusionCharts 3.1 破解版 – 非常好用的Flash图表控件
    配合JavaScript拖动页面中控件
    在ThinkPad T400上安装win2003 所遇问题
    C# 抛弃MoveTo来实现文件重命名
  • 原文地址:https://www.cnblogs.com/fengbohello/p/4743868.html
Copyright © 2011-2022 走看看