zoukankan      html  css  js  c++  java
  • 使用ZMonitor解决jeroMQ(ZMQ)应用中PUB/SUB消息丢失的问题

    应用:使用jeroMQ作为底层通信构件。首先建立一个REQ/REP连接,当需要进行大量数据交互时再建立一个PUB/SUB连接。服务器端采用bind,客户端采用connect。

    问题:在局域网条件下,当建立PUB/SUB连接时,前面的几个重要的消息可能丢失。

    解决方法:

    (1)建立连接前,客户端通过REQ/REP连接通知服务器要进行连接,服务器把客户端的主题通知客户端:

            //***!!! 在加入前建立连接,避免消息丢失
            //***!!! 第二个参数'0'表示请求连接,'1'表示连接成功,'2'表示连接失败
            HelloMessage hello = new HelloMessage("0", "0");
            String hmess = IMessage.makeMessageFrame(RTIAmbServiceCode.HelloMessage.getIndex(),
                    MessageRole.INVOKE, hello.toJson(), federationHandle);
    
            socket.send(hmess.getBytes(ZMQ.CHARSET), 0);
            byte[] creply = socket.recv(0);

    上面的返回消息就包含了订购主题

    (2)客户端向服务端提出连接请求

                //***!!!建立回调通道
                boolean suc = true;
                if(!messageReceiver.SetMessageWare(String.valueOf(federationHandle), federateHandle, federateReference)) {
                    suc = false;
                    disconectToRTI_core();
                }
        public boolean SetMessageWare(String federationHandle, String federateHandle, FederateAmbassador federateAmb) {
            MessageWare messageWare = new MessageWare(federationHandle, federateHandle, federateAmb );
            //建立连接
            ZMQ.Socket subscriber = context.createSocket(SocketType.SUB);
            //***!!!增加连接监控
            ZMonitor monitor = new ZMonitor(context, subscriber);
            monitor.add(ZMonitor.Event.ALL);
            monitor.start();
            
            //在poller中注册
            String errorString = "ERROR: 连接错误,无法建立回调通道,.....邦员:." + federationHandle + ":" + federateHandle;
            //***!!!这里的简化是为了下面的事件判定
            if(protocol.equals("tcp")) {
                subscriber.connect("tcp://"+ rtiHost + ":" + fport);
    //            if(!subscriber.connect("tcp://"+ rtiHost + ":" + fport)) {
    //                System.out.println(errorString);
    //                return false;
    //            }
            }
            else {
                subscriber.connect("ipc://"+ faIpcAddress);
    //            if(!subscriber.connect("ipc://"+ faIpcAddress)) {
    //                SRTI_core.fireRunningStatus(errorString);
    //                System.out.println(errorString);
    //                return false;
                //}           
            }
            //***!!!监控连接事件
            int times = 0;
            ZEvent cEvent = null;
            int wait_time = RTI.getCBCwaittime();
            int once_time = RTI.getCBConcetime();
            int round = wait_time / once_time;
            do {    
                //可能的事件有CONNECTION_CLOSE, CONNECTIION_DELEY,CONNECTED, HANDSHAKE_PROTOCOL, 其中
                //CONNECTED表示已经连接,HANDSHAKE_PROTOCOL表示完成了协议握手
                cEvent = monitor.nextEvent(once_time);
                if(cEvent != null) {
                    System.out.println("MessageReceiver连接事件:" + cEvent.toString());
                    if(cEvent.type == ZMonitor.Event.HANDSHAKE_PROTOCOL) {
                        break;
                    }
                }
            }
            while(times++ != round);
            
            //***删除监控器
            try {
                monitor.close();
                monitor.destroy();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                System.out.println("ZMonitor未能释放");
                return false;
            }
    
            //***!!!如果没有成功,返回false
     
            if(cEvent == null || cEvent.type != ZMonitor.Event.HANDSHAKE_PROTOCOL) {
                System.out.println(errorString);
                return false;
            }
            
            System.out.println("MessageWare与RTI建立PUB/SUB连接: 联邦---- " + federationHandle + "  邦员---- " + federateHandle +" , 连接端点:" + "tcp://" + rtiHost + ":" + fport);
            System.out.println("订购的主题: " + federationHandle + "." + federateHandle);
            subscriber.subscribe((federationHandle + "." + federateHandle).getBytes(ZMQ.CHARSET));        
            //subscriber.subscribe(("").getBytes(ZMQ.CHARSET));       
            int id = poller.register(subscriber, ZMQ.Poller.POLLIN);
    
            //将messageWare加入
            messageWare.socket = subscriber; 
            synchronized(messageWares) {
    
                messageWares.put(id, messageWare);
            }
            return true;
        }
     

    (3)客户端通过REQ/REP连接告诉服务端连接成功

                HelloMessage sucHelloMessage = new HelloMessage(federateHandle+"", suc == true? "1" : "2");
                String sucString = IMessage.makeMessageFrame(RTIAmbServiceCode.HelloMessage.getIndex(),
                        MessageRole.INVOKE, sucHelloMessage.toJson(), federationHandle);
    
                socket.send(sucString.getBytes(ZMQ.CHARSET), 0);
                socket.recv(0);
                if(suc == false)
                    throw new RTIinternalError("ERROR: 回调连接错误......" + federationHandle + ":" + federateHandle);
                //String crepString = new String(creply);
                System.out.println("RTIambassador:SUB回调连接建立:" + hmess);

    (4)此时服务器就可以开始发送PUB消息了

  • 相关阅读:
    kafka 官方 Quickstart
    oracle11.2 安装
    Perl参考函数/教程
    Mysql参见SHOW命令总结
    MySQL的Innodb缓存相关优化
    Oracle、Mysql和SQL Server数据库连接的URL写法
    jredis 客户端 使用
    sql基本命令-存储过程
    NoSql系列目录ElasticSearch-mongodb
    loadrunner 运行场景-Controller及Load Generators宿主主机优化
  • 原文地址:https://www.cnblogs.com/myboat/p/13739363.html
Copyright © 2011-2022 走看看