zoukankan      html  css  js  c++  java
  • rabbitmq作为mqtt服务器实现websocket消息推送给浏览器

    rabbitmq的RabbitMQ Web MQTT插件可以用来支持将rabbitmq作为MQTT协议的服务器,而websocket支持mqtt协议通信实现消息推送。因为我们目前使用rabbitmq,所以采用其作为ws的服务端(原来有过activemq的做法,其原生也支持MQTT协议)。

    首先安装RabbitMQ Web MQTT插件,如下:

    rabbitmq-plugins enable rabbitmq_web_mqtt

    MQTT在15675端口下的ws命名空间暴露WebSocket端点。如下:

    http://IP:15675/ws

    Eclipse旗下的Paho JavaScript客户端可以使用MQTT协议实现ws通信,其使用如下:

    <script src="mqttws31.js"></script>
    <script>
    
        var wsbroker = location.hostname;  // mqtt websocket enabled broker
        var wsport = 15675; // port for above
        client = new Paho.MQTT.Client(wsbroker, wsport, "/ws",
            "myclientid_" + parseInt(Math.random() * 100, 10));
        client.onConnectionLost = function (responseObject) {
            debug("CONNECTION LOST - " + responseObject.errorMessage);
        };
        client.onMessageArrived = function (message) {
            debug("RECEIVE ON " + message.destinationName + " PAYLOAD " + message.payloadString);
            print_first(message.payloadString);
        };
    var sessionId = getSessionId();
    var options = {
        timeout: 3,
        keepAliveInterval: 30,
        onSuccess: function () {
            debug("CONNECTION SUCCESS");
    // 这样就可以做到点对点通信 client.subscribe(sessionId
    , {qos: 1}); }, onFailure: function (message) { debug("CONNECTION FAILURE - " + message.errorMessage); } }; if (location.protocol == "https:") { options.useSSL = true; } debug("CONNECT TO " + wsbroker + ":" + wsport); client.connect(options);

    java服务端发送消息给rabbitmq:

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    private static void testSendMqtt() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUsername("guest");
            factory.setPassword("guest");
            // factory.setVirtualHost("");
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            Connection conn = null;
            Channel channel = null;
            try {
                conn = factory.newConnection();
                channel = conn.createChannel();
                String sessionId = getSessionId();
                byte[] messageBodyBytes = "{'text':'Hello, world!中文'}".getBytes();
                // 这样就可以做到点对点通信了,amq.topic是默认exchange
                channel.basicPublish("amq.topic", sessionId, null, messageBodyBytes);
            }finally {
                if (channel != null) {
                    channel.close();
                }
                if (conn != null) {
                    conn.close();
                }
            }
        }

    参考:

    https://www.rabbitmq.com/web-mqtt.html

    https://www.eclipse.org/paho/clients/js/

  • 相关阅读:
    Node.js——fs常用API
    Node.js——Stream
    Node.js——Buffer
    Node.js——post方式提交的图片如何保存
    CSS——BFC
    Node.js——body方式提交数据
    Node.js——基本服务开启
    Node.js——url模块
    Node.js——render封装
    Node.js——开放静态资源原生写法
  • 原文地址:https://www.cnblogs.com/zhjh256/p/10542752.html
Copyright © 2011-2022 走看看