zoukankan      html  css  js  c++  java
  • 4.如何实现用MTQQ通过服务器实现订阅者和发布者的通讯

    1.本例子意在用moquette服务器来作为消息转发,通过订阅者订阅消息,发布者发布消息,然后发布者的消息可以通过服务器转发给订阅者

    服务器例子:

    https://github.com/andsel/moquette

    核心代码为:

        /*
     * Copyright (c) 2012-2015 The original author or authors
     * ------------------------------------------------------
     * All rights reserved. This program and the accompanying materials
     * are made available under the terms of the Eclipse Public License v1.0
     * and Apache License v2.0 which accompanies this distribution.
     *
     * The Eclipse Public License is available at
     * http://www.eclipse.org/legal/epl-v10.html
     *
     * The Apache License v2.0 is available at
     * http://www.opensource.org/licenses/apache2.0.php
     *
     * You may elect to redistribute this code under either of these licenses.
     */
    package io.moquette.testembedded;
    
    import io.moquette.interception.AbstractInterceptHandler;
    import io.moquette.interception.InterceptHandler;
    import io.moquette.interception.messages.*;
    import io.moquette.parser.proto.messages.AbstractMessage;
    import io.moquette.parser.proto.messages.PublishMessage;
    import io.moquette.server.Server;
    import io.moquette.server.config.IConfig;
    import io.moquette.server.config.ClasspathConfig;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.util.List;
    
    import static java.util.Arrays.asList;
    
        public class EmbeddedLauncher {
        static class PublisherListener extends AbstractInterceptHandler {
    
            @Override
            public void onPublish(InterceptPublishMessage msg) {
                System.out.println("Received on topic: " + msg.getTopicName() + " content: " + new String(msg.getPayload().array()));
            }
        }
    
        public static void main(String[] args) throws InterruptedException, IOException {
            final IConfig classPathConfig = new ClasspathConfig();
    
            final Server mqttBroker = new Server();
            List<? extends InterceptHandler> userHandlers = asList(new PublisherListener());
            mqttBroker.startServer(classPathConfig, userHandlers);
    
            System.out.println("Broker started press [CTRL+C] to stop");
            //Bind  a shutdown hook
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    System.out.println("Stopping broker");
                    mqttBroker.stopServer();
                    System.out.println("Broker stopped");
                }
            });
    
            Thread.sleep(20000);
            System.out.println("Before self publish");
            PublishMessage message = new PublishMessage();
            message.setTopicName("/exit");
            message.setRetainFlag(true);
    //        message.setQos(AbstractMessage.QOSType.MOST_ONE);
    //        message.setQos(AbstractMessage.QOSType.LEAST_ONE);
            message.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
            message.setPayload(ByteBuffer.wrap("Hello World!!".getBytes()));
            mqttBroker.internalPublish(message);
            System.out.println("After self publish");
        }
    }

    配置文件:

    ##############################################
    #  Moquette configuration file. 
    #
    #  The synthax is equals to mosquitto.conf
    # 
    ##############################################
    
    port 1883
    
    #websocket_port 8080
    
    host 127.0.0.1
    
    #Password file
    password_file password_file.conf
    
    #ssl_port 8883
    #jks_path serverkeystore.jks
    #key_store_password passw0rdsrv
    #key_manager_password passw0rdsrv
    
    allow_anonymous true

    配置端口为1883,而ip为127.0.0.1

    启动服务器:

    效果为:

    2.客户端源码

    https://github.com/eclipse/paho.mqtt.java

    核心代码:

    1)订阅者源码

    /*******************************************************************************
     * Copyright (c) 2009, 2014 IBM Corp.
     *
     * All rights reserved. This program and the accompanying materials
     * are made available under the terms of the Eclipse Public License v1.0
     * and Eclipse Distribution License v1.0 which accompany this distribution. 
     *
     * The Eclipse Public License is available at 
     *    http://www.eclipse.org/legal/epl-v10.html
     * and the Eclipse Distribution License is available at 
     *   http://www.eclipse.org/org/documents/edl-v10.php.
     *
     * Contributors:
     *    Dave Locke - initial API and implementation and/or initial documentation
     */
    
    package org.eclipse.paho.sample.mqttv3app;
    
    import java.io.IOException;
    import java.sql.Timestamp;
    
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
    
    /**
     * A sample application that demonstrates how to use the Paho MQTT v3.1 Client blocking API.
     *
     * It can be run from the command line in one of two modes:
     *  - as a publisher, sending a single message to a topic on the server
     *  - as a subscriber, listening for messages from the server
     *
     *  There are three versions of the sample that implement the same features
     *  but do so using using different programming styles:
     *  <ol>
     *  <li>Sample (this one) which uses the API which blocks until the operation completes</li>
     *  <li>SampleAsyncWait shows how to use the asynchronous API with waiters that block until
     *  an action completes</li>
     *  <li>SampleAsyncCallBack shows how to use the asynchronous API where events are
     *  used to notify the application when an action completes<li>
     *  </ol>
     *
     *  If the application is run with the -h parameter then info is displayed that
     *  describes all of the options / parameters.
     */
    public class Sample implements MqttCallback {
    
        /**
         * The main entry point of the sample.
         *
         * This method handles parsing of the arguments specified on the
         * command-line before performing the specified action.
         */
        public static void main(String[] args) {
    
            // Default settings:
            boolean quietMode     = false;
            String action         = "subscribe";
            String topic         = "";
            String message         = "this is a subscriber,to subscribe message";
            int qos             = 2;
            String broker         = "127.0.0.1";
            int port             = 1883;
            String clientId     = null;
            String subTopic        = "Sample/#";
            String pubTopic     = "Sample/Java/v3";
            boolean cleanSession = true;            // Non durable subscriptions
            boolean ssl = false;
            String password = null;
            String userName = null;
            // Parse the arguments -
            for (int i=0; i<args.length; i++) {
                // Check this is a valid argument
                if (args[i].length() == 2 && args[i].startsWith("-")) {
                    char arg = args[i].charAt(1);
                    // Handle arguments that take no-value
                    switch(arg) {
                        case 'h': case '?':    printHelp(); return;
                        case 'q': quietMode = true;    continue;
                    }
    
                    // Now handle the arguments that take a value and
                    // ensure one is specified
                    if (i == args.length -1 || args[i+1].charAt(0) == '-') {
                        System.out.println("Missing value for argument: "+args[i]);
                        printHelp();
                        return;
                    }
                    switch(arg) {
                        case 'a': action = args[++i];                 break;
                        case 't': topic = args[++i];                  break;
                        case 'm': message = args[++i];                break;
                        case 's': qos = Integer.parseInt(args[++i]);  break;
                        case 'b': broker = args[++i];                 break;
                        case 'p': port = Integer.parseInt(args[++i]); break;
                        case 'i': clientId = args[++i];                  break;
                        case 'c': cleanSession = Boolean.valueOf(args[++i]).booleanValue();  break;
                        case 'k': System.getProperties().put("javax.net.ssl.keyStore", args[++i]); break;
                        case 'w': System.getProperties().put("javax.net.ssl.keyStorePassword", args[++i]); break;
                        case 'r': System.getProperties().put("javax.net.ssl.trustStore", args[++i]); break;
                        case 'v': ssl = Boolean.valueOf(args[++i]).booleanValue(); break;
                        case 'u': userName = args[++i];               break;
                        case 'z': password = args[++i];               break;
                        default:
                            System.out.println("Unrecognised argument: "+args[i]);
                            printHelp();
                            return;
                    }
                } else {
                    System.out.println("Unrecognised argument: "+args[i]);
                    printHelp();
                    return;
                }
            }
    
            // Validate the provided arguments
            if (!action.equals("publish") && !action.equals("subscribe")) {
                System.out.println("Invalid action: "+action);
                printHelp();
                return;
            }
            if (qos < 0 || qos > 2) {
                System.out.println("Invalid QoS: "+qos);
                printHelp();
                return;
            }
            if (topic.equals("")) {
                // Set the default topic according to the specified action
                if (action.equals("publish")) {
                    topic = pubTopic;
                } else {
                    topic = subTopic;
                }
            }
    
            String protocol = "tcp://";
    
        if (ssl) {
          protocol = "ssl://";
        }
    
        String url = protocol + broker + ":" + port;
    
            if (clientId == null || clientId.equals("")) {
                clientId = "SampleJavaV3_"+action;
            }
    
            // With a valid set of arguments, the real work of
            // driving the client API can begin
            try {
                // Create an instance of this class
                Sample sampleClient = new Sample(url, clientId, cleanSession, quietMode,userName,password);
    
                // Perform the requested action
                if (action.equals("publish")) {
                    sampleClient.publish(topic,qos,message.getBytes());
                } else if (action.equals("subscribe")) {
                    sampleClient.subscribe(topic,qos);
                }
            } catch(MqttException me) {
                // Display full details of any exception that occurs
                System.out.println("reason "+me.getReasonCode());
                System.out.println("msg "+me.getMessage());
                System.out.println("loc "+me.getLocalizedMessage());
                System.out.println("cause "+me.getCause());
                System.out.println("excep "+me);
                me.printStackTrace();
            }
        }
    
        // Private instance variables
        private MqttClient             client;
        private String                 brokerUrl;
        private boolean             quietMode;
        private MqttConnectOptions     conOpt;
        private boolean             clean;
        private String password;
        private String userName;
    
        /**
         * Constructs an instance of the sample client wrapper
         * @param brokerUrl the url of the server to connect to
         * @param clientId the client id to connect with
         * @param cleanSession clear state at end of connection or not (durable or non-durable subscriptions)
         * @param quietMode whether debug should be printed to standard out
       * @param userName the username to connect with
       * @param password the password for the user
         * @throws MqttException
         */
        public Sample(String brokerUrl, String clientId, boolean cleanSession, boolean quietMode, String userName, String password) throws MqttException {
            this.brokerUrl = brokerUrl;
            this.quietMode = quietMode;
            this.clean        = cleanSession;
            this.password = password;
            this.userName = userName;
            //This sample stores in a temporary directory... where messages temporarily
            // stored until the message has been delivered to the server.
            //..a real application ought to store them somewhere
            // where they are not likely to get deleted or tampered with
            String tmpDir = System.getProperty("java.io.tmpdir");
            MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
    
            try {
                // Construct the connection options object that contains connection parameters
                // such as cleanSession and LWT
                conOpt = new MqttConnectOptions();
                conOpt.setCleanSession(clean);
                if(password != null ) {
                  conOpt.setPassword(this.password.toCharArray());
                }
                if(userName != null) {
                  conOpt.setUserName(this.userName);
                }
    
                // Construct an MQTT blocking mode client
                client = new MqttClient(this.brokerUrl,clientId, dataStore);
    
                // Set this wrapper as the callback handler
                client.setCallback(this);
    
            } catch (MqttException e) {
                e.printStackTrace();
                log("Unable to set up client: "+e.toString());
                System.exit(1);
            }
        }
    
        /**
         * Publish / send a message to an MQTT server
         * @param topicName the name of the topic to publish to
         * @param qos the quality of service to delivery the message at (0,1,2)
         * @param payload the set of bytes to send to the MQTT server
         * @throws MqttException
         */
        public void publish(String topicName, int qos, byte[] payload) throws MqttException {
    
            // Connect to the MQTT server
            log("Connecting to "+brokerUrl + " with client ID "+client.getClientId());
            client.connect(conOpt);
            log("Connected");
    
            String time = new Timestamp(System.currentTimeMillis()).toString();
            log("Publishing at: "+time+ " to topic ""+topicName+"" qos "+qos);
    
            // Create and configure a message
               MqttMessage message = new MqttMessage(payload);
            message.setQos(qos);
    
            // Send the message to the server, control is not returned until
            // it has been delivered to the server meeting the specified
            // quality of service.
            client.publish(topicName, message);
    
            // Disconnect the client
            client.disconnect();
            log("Disconnected");
        }
    
        /**
         * Subscribe to a topic on an MQTT server
         * Once subscribed this method waits for the messages to arrive from the server
         * that match the subscription. It continues listening for messages until the enter key is
         * pressed.
         * @param topicName to subscribe to (can be wild carded)
         * @param qos the maximum quality of service to receive messages at for this subscription
         * @throws MqttException
         */
        public void subscribe(String topicName, int qos) throws MqttException {
    
            // Connect to the MQTT server
            client.connect(conOpt);
            log("Connected to "+brokerUrl+" with client ID "+client.getClientId());
    
            // Subscribe to the requested topic
            // The QoS specified is the maximum level that messages will be sent to the client at.
            // For instance if QoS 1 is specified, any messages originally published at QoS 2 will
            // be downgraded to 1 when delivering to the client but messages published at 1 and 0
            // will be received at the same level they were published at.
            log("Subscribing to topic ""+topicName+"" qos "+qos);
            client.subscribe(topicName, qos);
    
            // Continue waiting for messages until the Enter is pressed
            log("Press <Enter> to exit");
            try {
                System.in.read();
            } catch (IOException e) {
                //If we can't read we'll just exit
            }
    
            // Disconnect the client from the server
            client.disconnect();
            log("Disconnected");
        }
    
        /**
         * Utility method to handle logging. If 'quietMode' is set, this method does nothing
         * @param message the message to log
         */
        private void log(String message) {
            if (!quietMode) {
                System.out.println(message);
            }
        }
    
        /****************************************************************/
        /* Methods to implement the MqttCallback interface              */
        /****************************************************************/
    
        /**
         * @see MqttCallback#connectionLost(Throwable)
         */
        public void connectionLost(Throwable cause) {
            // Called when the connection to the server has been lost.
            // An application may choose to implement reconnection
            // logic at this point. This sample simply exits.
            log("Connection to " + brokerUrl + " lost!" + cause);
            System.exit(1);
        }
    
        /**
         * @see MqttCallback#deliveryComplete(IMqttDeliveryToken)
         */
        public void deliveryComplete(IMqttDeliveryToken token) {
            // Called when a message has been delivered to the
            // server. The token passed in here is the same one
            // that was passed to or returned from the original call to publish.
            // This allows applications to perform asynchronous
            // delivery without blocking until delivery completes.
            //
            // This sample demonstrates asynchronous deliver and
            // uses the token.waitForCompletion() call in the main thread which
            // blocks until the delivery has completed.
            // Additionally the deliveryComplete method will be called if
            // the callback is set on the client
            //
            // If the connection to the server breaks before delivery has completed
            // delivery of a message will complete after the client has re-connected.
            // The getPendingTokens method will provide tokens for any messages
            // that are still to be delivered.
        }
    
        /**
         * @see MqttCallback#messageArrived(String, MqttMessage)
         */
        public void messageArrived(String topic, MqttMessage message) throws MqttException {
            // Called when a message arrives from the server that matches any
            // subscription made by the client
            String time = new Timestamp(System.currentTimeMillis()).toString();
            System.out.println("Time:	" +time +
                               "  Topic:	" + topic +
                               "  Message:	" + new String(message.getPayload()) +
                               "  QoS:	" + message.getQos());
        }
    
        /****************************************************************/
        /* End of MqttCallback methods                                  */
        /****************************************************************/
    
           static void printHelp() {
              System.out.println(
                  "Syntax:
    
    " +
                      "    Sample [-h] [-a publish|subscribe] [-t <topic>] [-m <message text>]
    " +
                      "            [-s 0|1|2] -b <hostname|IP address>] [-p <brokerport>] [-i <clientID>]
    
    " +
                      "    -h  Print this help text and quit
    " +
                      "    -q  Quiet mode (default is false)
    " +
                      "    -a  Perform the relevant action (default is publish)
    " +
                      "    -t  Publish/subscribe to <topic> instead of the default
    " +
                      "            (publish: "Sample/Java/v3", subscribe: "Sample/#")
    " +
                      "    -m  Use <message text> instead of the default
    " +
                      "            ("Message from MQTTv3 Java client")
    " +
                      "    -s  Use this QoS instead of the default (2)
    " +
                      "    -b  Use this name/IP address instead of the default (m2m.eclipse.org)
    " +
                      "    -p  Use this port instead of the default (1883)
    
    " +
                      "    -i  Use this client ID instead of SampleJavaV3_<action>
    " +
                      "    -c  Connect to the server with a clean session (default is false)
    " +
                      "     
    
     Security Options 
    " +
                      "     -u Username 
    " +
                      "     -z Password 
    " +
                      "     
    
     SSL Options 
    " +
                      "    -v  SSL enabled; true - (default is false) " +
                      "    -k  Use this JKS format key store to verify the client
    " +
                      "    -w  Passpharse to verify certificates in the keys store
    " +
                      "    -r  Use this JKS format keystore to verify the server
    " +
                      " If javax.net.ssl properties have been set only the -v flag needs to be set
    " +
                      "Delimit strings containing spaces with ""
    
    " +
                      "Publishers transmit a single message then disconnect from the server.
    " +
                      "Subscribers remain connected to the server and receive appropriate
    " +
                      "messages until <enter> is pressed.
    
    "
                  );
        }
    
    }

    客户端-发布者

    只需在configutation里面修改传入的参数即可:

    为保证是同一个主题,则需要保证传入-a -t两个参数

    -a subscribe -t Sample/Java/v3

    最终运行结果:

    服务器

    订阅者:

    发布者

  • 相关阅读:
    Selenium简单测试页面加载速度的性能(Page loading performance)
    Selenium Page object Pattern usage
    Selenium如何支持测试Windows application
    UI Automation的两个成熟的框架(QTP 和Selenium)
    分享自己针对Automation做的两个成熟的框架(QTP 和Selenium)
    敏捷开发中的测试金字塔(转)
    Selenium 的基础框架类
    selenium2 run in Jenkins GUI testing not visible or browser not open but run in background浏览器后台运行不可见
    eclipse与SVN 结合(删除SVN中已经上传的问题)
    配置Jenkins的slave节点的详细步骤适合windows等其他平台
  • 原文地址:https://www.cnblogs.com/caimuqing/p/6145366.html
Copyright © 2011-2022 走看看