zoukankan      html  css  js  c++  java
  • 4.如何实现用MTQQ通过服务器实现订阅者和发布者的通讯

    1.本例子意在用moquette服务器来作为消息转发,通过订阅者订阅消息,发布者发布消息,然后发布者的消息可以通过服务器转发给订阅者

    服务器例子:

    https://github.com/andsel/moquette

    核心代码为:

        /*
     * Copyright (c) 2012-2015 The original author or authors
     * ------------------------------------------------------
     * All rights reserved. This program and the accompanying materials
     * are made available under the terms of the Eclipse Public License v1.0
     * and Apache License v2.0 which accompanies this distribution.
     *
     * The Eclipse Public License is available at
     * http://www.eclipse.org/legal/epl-v10.html
     *
     * The Apache License v2.0 is available at
     * http://www.opensource.org/licenses/apache2.0.php
     *
     * You may elect to redistribute this code under either of these licenses.
     */
    package io.moquette.testembedded;
    
    import io.moquette.interception.AbstractInterceptHandler;
    import io.moquette.interception.InterceptHandler;
    import io.moquette.interception.messages.*;
    import io.moquette.parser.proto.messages.AbstractMessage;
    import io.moquette.parser.proto.messages.PublishMessage;
    import io.moquette.server.Server;
    import io.moquette.server.config.IConfig;
    import io.moquette.server.config.ClasspathConfig;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.util.List;
    
    import static java.util.Arrays.asList;
    
        public class EmbeddedLauncher {
        static class PublisherListener extends AbstractInterceptHandler {
    
            @Override
            public void onPublish(InterceptPublishMessage msg) {
                System.out.println("Received on topic: " + msg.getTopicName() + " content: " + new String(msg.getPayload().array()));
            }
        }
    
        public static void main(String[] args) throws InterruptedException, IOException {
            final IConfig classPathConfig = new ClasspathConfig();
    
            final Server mqttBroker = new Server();
            List<? extends InterceptHandler> userHandlers = asList(new PublisherListener());
            mqttBroker.startServer(classPathConfig, userHandlers);
    
            System.out.println("Broker started press [CTRL+C] to stop");
            //Bind  a shutdown hook
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    System.out.println("Stopping broker");
                    mqttBroker.stopServer();
                    System.out.println("Broker stopped");
                }
            });
    
            Thread.sleep(20000);
            System.out.println("Before self publish");
            PublishMessage message = new PublishMessage();
            message.setTopicName("/exit");
            message.setRetainFlag(true);
    //        message.setQos(AbstractMessage.QOSType.MOST_ONE);
    //        message.setQos(AbstractMessage.QOSType.LEAST_ONE);
            message.setQos(AbstractMessage.QOSType.EXACTLY_ONCE);
            message.setPayload(ByteBuffer.wrap("Hello World!!".getBytes()));
            mqttBroker.internalPublish(message);
            System.out.println("After self publish");
        }
    }

    配置文件:

    ##############################################
    #  Moquette configuration file. 
    #
    #  The synthax is equals to mosquitto.conf
    # 
    ##############################################
    
    port 1883
    
    #websocket_port 8080
    
    host 127.0.0.1
    
    #Password file
    password_file password_file.conf
    
    #ssl_port 8883
    #jks_path serverkeystore.jks
    #key_store_password passw0rdsrv
    #key_manager_password passw0rdsrv
    
    allow_anonymous true

    配置端口为1883,而ip为127.0.0.1

    启动服务器:

    效果为:

    2.客户端源码

    https://github.com/eclipse/paho.mqtt.java

    核心代码:

    1)订阅者源码

    /*******************************************************************************
     * Copyright (c) 2009, 2014 IBM Corp.
     *
     * All rights reserved. This program and the accompanying materials
     * are made available under the terms of the Eclipse Public License v1.0
     * and Eclipse Distribution License v1.0 which accompany this distribution. 
     *
     * The Eclipse Public License is available at 
     *    http://www.eclipse.org/legal/epl-v10.html
     * and the Eclipse Distribution License is available at 
     *   http://www.eclipse.org/org/documents/edl-v10.php.
     *
     * Contributors:
     *    Dave Locke - initial API and implementation and/or initial documentation
     */
    
    package org.eclipse.paho.sample.mqttv3app;
    
    import java.io.IOException;
    import java.sql.Timestamp;
    
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
    
    /**
     * A sample application that demonstrates how to use the Paho MQTT v3.1 Client blocking API.
     *
     * It can be run from the command line in one of two modes:
     *  - as a publisher, sending a single message to a topic on the server
     *  - as a subscriber, listening for messages from the server
     *
     *  There are three versions of the sample that implement the same features
     *  but do so using using different programming styles:
     *  <ol>
     *  <li>Sample (this one) which uses the API which blocks until the operation completes</li>
     *  <li>SampleAsyncWait shows how to use the asynchronous API with waiters that block until
     *  an action completes</li>
     *  <li>SampleAsyncCallBack shows how to use the asynchronous API where events are
     *  used to notify the application when an action completes<li>
     *  </ol>
     *
     *  If the application is run with the -h parameter then info is displayed that
     *  describes all of the options / parameters.
     */
    public class Sample implements MqttCallback {
    
        /**
         * The main entry point of the sample.
         *
         * This method handles parsing of the arguments specified on the
         * command-line before performing the specified action.
         */
        public static void main(String[] args) {
    
            // Default settings:
            boolean quietMode     = false;
            String action         = "subscribe";
            String topic         = "";
            String message         = "this is a subscriber,to subscribe message";
            int qos             = 2;
            String broker         = "127.0.0.1";
            int port             = 1883;
            String clientId     = null;
            String subTopic        = "Sample/#";
            String pubTopic     = "Sample/Java/v3";
            boolean cleanSession = true;            // Non durable subscriptions
            boolean ssl = false;
            String password = null;
            String userName = null;
            // Parse the arguments -
            for (int i=0; i<args.length; i++) {
                // Check this is a valid argument
                if (args[i].length() == 2 && args[i].startsWith("-")) {
                    char arg = args[i].charAt(1);
                    // Handle arguments that take no-value
                    switch(arg) {
                        case 'h': case '?':    printHelp(); return;
                        case 'q': quietMode = true;    continue;
                    }
    
                    // Now handle the arguments that take a value and
                    // ensure one is specified
                    if (i == args.length -1 || args[i+1].charAt(0) == '-') {
                        System.out.println("Missing value for argument: "+args[i]);
                        printHelp();
                        return;
                    }
                    switch(arg) {
                        case 'a': action = args[++i];                 break;
                        case 't': topic = args[++i];                  break;
                        case 'm': message = args[++i];                break;
                        case 's': qos = Integer.parseInt(args[++i]);  break;
                        case 'b': broker = args[++i];                 break;
                        case 'p': port = Integer.parseInt(args[++i]); break;
                        case 'i': clientId = args[++i];                  break;
                        case 'c': cleanSession = Boolean.valueOf(args[++i]).booleanValue();  break;
                        case 'k': System.getProperties().put("javax.net.ssl.keyStore", args[++i]); break;
                        case 'w': System.getProperties().put("javax.net.ssl.keyStorePassword", args[++i]); break;
                        case 'r': System.getProperties().put("javax.net.ssl.trustStore", args[++i]); break;
                        case 'v': ssl = Boolean.valueOf(args[++i]).booleanValue(); break;
                        case 'u': userName = args[++i];               break;
                        case 'z': password = args[++i];               break;
                        default:
                            System.out.println("Unrecognised argument: "+args[i]);
                            printHelp();
                            return;
                    }
                } else {
                    System.out.println("Unrecognised argument: "+args[i]);
                    printHelp();
                    return;
                }
            }
    
            // Validate the provided arguments
            if (!action.equals("publish") && !action.equals("subscribe")) {
                System.out.println("Invalid action: "+action);
                printHelp();
                return;
            }
            if (qos < 0 || qos > 2) {
                System.out.println("Invalid QoS: "+qos);
                printHelp();
                return;
            }
            if (topic.equals("")) {
                // Set the default topic according to the specified action
                if (action.equals("publish")) {
                    topic = pubTopic;
                } else {
                    topic = subTopic;
                }
            }
    
            String protocol = "tcp://";
    
        if (ssl) {
          protocol = "ssl://";
        }
    
        String url = protocol + broker + ":" + port;
    
            if (clientId == null || clientId.equals("")) {
                clientId = "SampleJavaV3_"+action;
            }
    
            // With a valid set of arguments, the real work of
            // driving the client API can begin
            try {
                // Create an instance of this class
                Sample sampleClient = new Sample(url, clientId, cleanSession, quietMode,userName,password);
    
                // Perform the requested action
                if (action.equals("publish")) {
                    sampleClient.publish(topic,qos,message.getBytes());
                } else if (action.equals("subscribe")) {
                    sampleClient.subscribe(topic,qos);
                }
            } catch(MqttException me) {
                // Display full details of any exception that occurs
                System.out.println("reason "+me.getReasonCode());
                System.out.println("msg "+me.getMessage());
                System.out.println("loc "+me.getLocalizedMessage());
                System.out.println("cause "+me.getCause());
                System.out.println("excep "+me);
                me.printStackTrace();
            }
        }
    
        // Private instance variables
        private MqttClient             client;
        private String                 brokerUrl;
        private boolean             quietMode;
        private MqttConnectOptions     conOpt;
        private boolean             clean;
        private String password;
        private String userName;
    
        /**
         * Constructs an instance of the sample client wrapper
         * @param brokerUrl the url of the server to connect to
         * @param clientId the client id to connect with
         * @param cleanSession clear state at end of connection or not (durable or non-durable subscriptions)
         * @param quietMode whether debug should be printed to standard out
       * @param userName the username to connect with
       * @param password the password for the user
         * @throws MqttException
         */
        public Sample(String brokerUrl, String clientId, boolean cleanSession, boolean quietMode, String userName, String password) throws MqttException {
            this.brokerUrl = brokerUrl;
            this.quietMode = quietMode;
            this.clean        = cleanSession;
            this.password = password;
            this.userName = userName;
            //This sample stores in a temporary directory... where messages temporarily
            // stored until the message has been delivered to the server.
            //..a real application ought to store them somewhere
            // where they are not likely to get deleted or tampered with
            String tmpDir = System.getProperty("java.io.tmpdir");
            MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
    
            try {
                // Construct the connection options object that contains connection parameters
                // such as cleanSession and LWT
                conOpt = new MqttConnectOptions();
                conOpt.setCleanSession(clean);
                if(password != null ) {
                  conOpt.setPassword(this.password.toCharArray());
                }
                if(userName != null) {
                  conOpt.setUserName(this.userName);
                }
    
                // Construct an MQTT blocking mode client
                client = new MqttClient(this.brokerUrl,clientId, dataStore);
    
                // Set this wrapper as the callback handler
                client.setCallback(this);
    
            } catch (MqttException e) {
                e.printStackTrace();
                log("Unable to set up client: "+e.toString());
                System.exit(1);
            }
        }
    
        /**
         * Publish / send a message to an MQTT server
         * @param topicName the name of the topic to publish to
         * @param qos the quality of service to delivery the message at (0,1,2)
         * @param payload the set of bytes to send to the MQTT server
         * @throws MqttException
         */
        public void publish(String topicName, int qos, byte[] payload) throws MqttException {
    
            // Connect to the MQTT server
            log("Connecting to "+brokerUrl + " with client ID "+client.getClientId());
            client.connect(conOpt);
            log("Connected");
    
            String time = new Timestamp(System.currentTimeMillis()).toString();
            log("Publishing at: "+time+ " to topic ""+topicName+"" qos "+qos);
    
            // Create and configure a message
               MqttMessage message = new MqttMessage(payload);
            message.setQos(qos);
    
            // Send the message to the server, control is not returned until
            // it has been delivered to the server meeting the specified
            // quality of service.
            client.publish(topicName, message);
    
            // Disconnect the client
            client.disconnect();
            log("Disconnected");
        }
    
        /**
         * Subscribe to a topic on an MQTT server
         * Once subscribed this method waits for the messages to arrive from the server
         * that match the subscription. It continues listening for messages until the enter key is
         * pressed.
         * @param topicName to subscribe to (can be wild carded)
         * @param qos the maximum quality of service to receive messages at for this subscription
         * @throws MqttException
         */
        public void subscribe(String topicName, int qos) throws MqttException {
    
            // Connect to the MQTT server
            client.connect(conOpt);
            log("Connected to "+brokerUrl+" with client ID "+client.getClientId());
    
            // Subscribe to the requested topic
            // The QoS specified is the maximum level that messages will be sent to the client at.
            // For instance if QoS 1 is specified, any messages originally published at QoS 2 will
            // be downgraded to 1 when delivering to the client but messages published at 1 and 0
            // will be received at the same level they were published at.
            log("Subscribing to topic ""+topicName+"" qos "+qos);
            client.subscribe(topicName, qos);
    
            // Continue waiting for messages until the Enter is pressed
            log("Press <Enter> to exit");
            try {
                System.in.read();
            } catch (IOException e) {
                //If we can't read we'll just exit
            }
    
            // Disconnect the client from the server
            client.disconnect();
            log("Disconnected");
        }
    
        /**
         * Utility method to handle logging. If 'quietMode' is set, this method does nothing
         * @param message the message to log
         */
        private void log(String message) {
            if (!quietMode) {
                System.out.println(message);
            }
        }
    
        /****************************************************************/
        /* Methods to implement the MqttCallback interface              */
        /****************************************************************/
    
        /**
         * @see MqttCallback#connectionLost(Throwable)
         */
        public void connectionLost(Throwable cause) {
            // Called when the connection to the server has been lost.
            // An application may choose to implement reconnection
            // logic at this point. This sample simply exits.
            log("Connection to " + brokerUrl + " lost!" + cause);
            System.exit(1);
        }
    
        /**
         * @see MqttCallback#deliveryComplete(IMqttDeliveryToken)
         */
        public void deliveryComplete(IMqttDeliveryToken token) {
            // Called when a message has been delivered to the
            // server. The token passed in here is the same one
            // that was passed to or returned from the original call to publish.
            // This allows applications to perform asynchronous
            // delivery without blocking until delivery completes.
            //
            // This sample demonstrates asynchronous deliver and
            // uses the token.waitForCompletion() call in the main thread which
            // blocks until the delivery has completed.
            // Additionally the deliveryComplete method will be called if
            // the callback is set on the client
            //
            // If the connection to the server breaks before delivery has completed
            // delivery of a message will complete after the client has re-connected.
            // The getPendingTokens method will provide tokens for any messages
            // that are still to be delivered.
        }
    
        /**
         * @see MqttCallback#messageArrived(String, MqttMessage)
         */
        public void messageArrived(String topic, MqttMessage message) throws MqttException {
            // Called when a message arrives from the server that matches any
            // subscription made by the client
            String time = new Timestamp(System.currentTimeMillis()).toString();
            System.out.println("Time:	" +time +
                               "  Topic:	" + topic +
                               "  Message:	" + new String(message.getPayload()) +
                               "  QoS:	" + message.getQos());
        }
    
        /****************************************************************/
        /* End of MqttCallback methods                                  */
        /****************************************************************/
    
           static void printHelp() {
              System.out.println(
                  "Syntax:
    
    " +
                      "    Sample [-h] [-a publish|subscribe] [-t <topic>] [-m <message text>]
    " +
                      "            [-s 0|1|2] -b <hostname|IP address>] [-p <brokerport>] [-i <clientID>]
    
    " +
                      "    -h  Print this help text and quit
    " +
                      "    -q  Quiet mode (default is false)
    " +
                      "    -a  Perform the relevant action (default is publish)
    " +
                      "    -t  Publish/subscribe to <topic> instead of the default
    " +
                      "            (publish: "Sample/Java/v3", subscribe: "Sample/#")
    " +
                      "    -m  Use <message text> instead of the default
    " +
                      "            ("Message from MQTTv3 Java client")
    " +
                      "    -s  Use this QoS instead of the default (2)
    " +
                      "    -b  Use this name/IP address instead of the default (m2m.eclipse.org)
    " +
                      "    -p  Use this port instead of the default (1883)
    
    " +
                      "    -i  Use this client ID instead of SampleJavaV3_<action>
    " +
                      "    -c  Connect to the server with a clean session (default is false)
    " +
                      "     
    
     Security Options 
    " +
                      "     -u Username 
    " +
                      "     -z Password 
    " +
                      "     
    
     SSL Options 
    " +
                      "    -v  SSL enabled; true - (default is false) " +
                      "    -k  Use this JKS format key store to verify the client
    " +
                      "    -w  Passpharse to verify certificates in the keys store
    " +
                      "    -r  Use this JKS format keystore to verify the server
    " +
                      " If javax.net.ssl properties have been set only the -v flag needs to be set
    " +
                      "Delimit strings containing spaces with ""
    
    " +
                      "Publishers transmit a single message then disconnect from the server.
    " +
                      "Subscribers remain connected to the server and receive appropriate
    " +
                      "messages until <enter> is pressed.
    
    "
                  );
        }
    
    }

    客户端-发布者

    只需在configutation里面修改传入的参数即可:

    为保证是同一个主题,则需要保证传入-a -t两个参数

    -a subscribe -t Sample/Java/v3

    最终运行结果:

    服务器

    订阅者:

    发布者

  • 相关阅读:
    c# 调用C++动态库 问题
    Web应用简易框架。
    【转】SVN历史版本删除(为SVN库瘦身)
    程序员7武器序
    小系统单据自动生成存储过程
    【转】数据库和数据仓库的区别
    jQuery之extend 函数
    .NET单元测试断言(Assert)
    c#类型转换操作符:as和is
    oracle 表数据合并
  • 原文地址:https://www.cnblogs.com/caimuqing/p/6145366.html
Copyright © 2011-2022 走看看