zoukankan      html  css  js  c++  java
  • RabbitMQ基于Stomp实现与MQTT客户端通信

    请参照RabbitMQ应用SpringBoot集成RabbitMQ并实现消息确认机制

    详情参照官方文档https://www.rabbitmq.com/stomp.htmlhttps://github.com/rabbitmq/rabbitmq-web-stomp-examples

    安装插件

    rabbitmq-plugins enable rabbitmq_stomp  
    rabbitmq-plugins enable rabbitmq_web_stomp_examples  

    官方说明

    MQTT配置


    网页STOMP配置

    注意监听的主题及持久化配置

    <!DOCTYPE html>
    <html><head>
      <script src="jquery-1.9.1.min.js"></script>
      <script src="stomp.js"></script>
      <style>
          .box {
               440px;
              float: left;
              margin: 0 20px 0 20px;
          }
    
          .box div, .box input {
              border: 1px solid;
              -moz-border-radius: 4px;
              border-radius: 4px;
               100%;
              padding: 5px;
              margin: 3px 0 10px 0;
          }
    
          .box div {
              border-color: grey;
              height: 300px;
              overflow: auto;
          }
    
          div code {
              display: block;
          }
    
          #first div code {
              -moz-border-radius: 2px;
              border-radius: 2px;
              border: 1px solid #eee;
              margin-bottom: 5px;
          }
    
          #second div {
              font-size: 0.8em;
          }
      </style>
      <title>RabbitMQ Web STOMP Examples : Temporary Queue</title>
      <link href="main.css" rel="stylesheet" type="text/css"/>
    </head><body lang="en">
    
        <div id="first" class="box">
          <h2>Received</h2>
          <div></div>
          <form><input autocomplete="off" placeholder="Type here..."></input></form>
        </div>
    
        <div id="second" class="box">
          <h2>Logs</h2>
          <div></div>
        </div>
    
        <script>
          var ws = new WebSocket('ws://127.0.0.1:15674/ws');
          var client = Stomp.over(ws);
    
          client.debug = function(e) {
            $('#second div').append($("<code>").text(e));
          };
    
          // default receive callback to get message from temporary queues
          client.onreceive = function(m) {
            $('#first div').append($("<code>").text(m.body));
          }
    
          var on_connect = function(x) {
              var headers = new Object();
              
              headers.id='1234';
              headers.durable=true;
              headers['auto-delete']=false;
              id = client.subscribe("/exchange/amq.topic/topic.baqgl.#", function(m) {
                // reply by sending the reversed text to the temp queue defined in the "reply-to" header
                var reversedText = m.body.split("").reverse().join("");
                alert(reversedText);
                //client.send(m.headers['reply-to'], {"content-type":"text/plain"}, reversedText);
              },headers);
          };
          var on_error =  function() {
            console.log('error');
          };
          client.connect('guest', 'guest', on_connect, on_error, '/');
    
          $('#first form').submit(function() {
            var text = $('#first form input').val();
            if (text) {
                client.send('/exchange/amq.topic/topic.baqgl.admin.1', {'reply-to': '/temp-queue/foo'}, text);
                alert(text);
                $('#first form input').val("");
              }
              return false;
          });
        </script>
    </body></html>

    读取持久化配置

    官方说明






    stomp.js

    // Generated by CoffeeScript 1.7.1
    
    /*
       Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0
    
       Copyright (C) 2010-2013 [Jeff Mesnil](http://jmesnil.net/)
       Copyright (C) 2012 [FuseSource, Inc.](http://fusesource.com)
     */
    
    (function() {
      var Byte, Client, Frame, Stomp,
        __hasProp = {}.hasOwnProperty,
        __slice = [].slice;
    
      Byte = {
        LF: 'x0A',
        NULL: 'x00'
      };
    
      Frame = (function() {
        var unmarshallSingle;
    
        function Frame(command, headers, body) {
          this.command = command;
          this.headers = headers != null ? headers : {};
          this.body = body != null ? body : '';
        }
    
        Frame.prototype.toString = function() {
          var lines, name, skipContentLength, value, _ref;
          lines = [this.command];
          skipContentLength = this.headers['content-length'] === false ? true : false;
          if (skipContentLength) {
            delete this.headers['content-length'];
          }
          _ref = this.headers;
          for (name in _ref) {
            if (!__hasProp.call(_ref, name)) continue;
            value = _ref[name];
            lines.push("" + name + ":" + value);
          }
          if (this.body && !skipContentLength) {
            lines.push("content-length:" + (Frame.sizeOfUTF8(this.body)));
          }
          lines.push(Byte.LF + this.body);
          return lines.join(Byte.LF);
        };
    
        Frame.sizeOfUTF8 = function(s) {
          if (s) {
            return encodeURI(s).match(/%..|./g).length;
          } else {
            return 0;
          }
        };
    
        unmarshallSingle = function(data) {
          var body, chr, command, divider, headerLines, headers, i, idx, len, line, start, trim, _i, _j, _len, _ref, _ref1;
          divider = data.search(RegExp("" + Byte.LF + Byte.LF));
          headerLines = data.substring(0, divider).split(Byte.LF);
          command = headerLines.shift();
          headers = {};
          trim = function(str) {
            return str.replace(/^s+|s+$/g, '');
          };
          _ref = headerLines.reverse();
          for (_i = 0, _len = _ref.length; _i < _len; _i++) {
            line = _ref[_i];
            idx = line.indexOf(':');
            headers[trim(line.substring(0, idx))] = trim(line.substring(idx + 1));
          }
          body = '';
          start = divider + 2;
          if (headers['content-length']) {
            len = parseInt(headers['content-length']);
            body = ('' + data).substring(start, start + len);
          } else {
            chr = null;
            for (i = _j = start, _ref1 = data.length; start <= _ref1 ? _j < _ref1 : _j > _ref1; i = start <= _ref1 ? ++_j : --_j) {
              chr = data.charAt(i);
              if (chr === Byte.NULL) {
                break;
              }
              body += chr;
            }
          }
          return new Frame(command, headers, body);
        };
    
        Frame.unmarshall = function(datas) {
          var frame, frames, last_frame, r;
          frames = datas.split(RegExp("" + Byte.NULL + Byte.LF + "*"));
          r = {
            frames: [],
            partial: ''
          };
          r.frames = (function() {
            var _i, _len, _ref, _results;
            _ref = frames.slice(0, -1);
            _results = [];
            for (_i = 0, _len = _ref.length; _i < _len; _i++) {
              frame = _ref[_i];
              _results.push(unmarshallSingle(frame));
            }
            return _results;
          })();
          last_frame = frames.slice(-1)[0];
          if (last_frame === Byte.LF || (last_frame.search(RegExp("" + Byte.NULL + Byte.LF + "*$"))) !== -1) {
            r.frames.push(unmarshallSingle(last_frame));
          } else {
            r.partial = last_frame;
          }
          return r;
        };
    
        Frame.marshall = function(command, headers, body) {
          var frame;
          frame = new Frame(command, headers, body);
          return frame.toString() + Byte.NULL;
        };
    
        return Frame;
    
      })();
    
      Client = (function() {
        var now;
    
        function Client(ws) {
          this.ws = ws;
          this.ws.binaryType = "arraybuffer";
          this.counter = 0;
          this.connected = false;
          this.heartbeat = {
            outgoing: 10000,
            incoming: 10000
          };
          this.maxWebSocketFrameSize = 16 * 1024;
          this.subscriptions = {};
          this.partialData = '';
        }
    
        Client.prototype.debug = function(message) {
          var _ref;
          return typeof window !== "undefined" && window !== null ? (_ref = window.console) != null ? _ref.log(message) : void 0 : void 0;
        };
    
        now = function() {
          if (Date.now) {
            return Date.now();
          } else {
            return new Date().valueOf;
          }
        };
    
        Client.prototype._transmit = function(command, headers, body) {
          var out;
          out = Frame.marshall(command, headers, body);
          if (typeof this.debug === "function") {
            this.debug(">>> " + out);
          }
          while (true) {
            if (out.length > this.maxWebSocketFrameSize) {
              this.ws.send(out.substring(0, this.maxWebSocketFrameSize));
              out = out.substring(this.maxWebSocketFrameSize);
              if (typeof this.debug === "function") {
                this.debug("remaining = " + out.length);
              }
            } else {
              return this.ws.send(out);
            }
          }
        };
    
        Client.prototype._setupHeartbeat = function(headers) {
          var serverIncoming, serverOutgoing, ttl, v, _ref, _ref1;
          if ((_ref = headers.version) !== Stomp.VERSIONS.V1_1 && _ref !== Stomp.VERSIONS.V1_2) {
            return;
          }
          _ref1 = (function() {
            var _i, _len, _ref1, _results;
            _ref1 = headers['heart-beat'].split(",");
            _results = [];
            for (_i = 0, _len = _ref1.length; _i < _len; _i++) {
              v = _ref1[_i];
              _results.push(parseInt(v));
            }
            return _results;
          })(), serverOutgoing = _ref1[0], serverIncoming = _ref1[1];
          if (!(this.heartbeat.outgoing === 0 || serverIncoming === 0)) {
            ttl = Math.max(this.heartbeat.outgoing, serverIncoming);
            if (typeof this.debug === "function") {
              this.debug("send PING every " + ttl + "ms");
            }
            this.pinger = Stomp.setInterval(ttl, (function(_this) {
              return function() {
                _this.ws.send(Byte.LF);
                return typeof _this.debug === "function" ? _this.debug(">>> PING") : void 0;
              };
            })(this));
          }
          if (!(this.heartbeat.incoming === 0 || serverOutgoing === 0)) {
            ttl = Math.max(this.heartbeat.incoming, serverOutgoing);
            if (typeof this.debug === "function") {
              this.debug("check PONG every " + ttl + "ms");
            }
            return this.ponger = Stomp.setInterval(ttl, (function(_this) {
              return function() {
                var delta;
                delta = now() - _this.serverActivity;
                if (delta > ttl * 2) {
                  if (typeof _this.debug === "function") {
                    _this.debug("did not receive server activity for the last " + delta + "ms");
                  }
                  return _this.ws.close();
                }
              };
            })(this));
          }
        };
    
        Client.prototype._parseConnect = function() {
          var args, connectCallback, errorCallback, headers;
          args = 1 <= arguments.length ? __slice.call(arguments, 0) : [];
          headers = {};
          switch (args.length) {
            case 2:
              headers = args[0], connectCallback = args[1];
              break;
            case 3:
              if (args[1] instanceof Function) {
                headers = args[0], connectCallback = args[1], errorCallback = args[2];
              } else {
                headers.login = args[0], headers.passcode = args[1], connectCallback = args[2];
              }
              break;
            case 4:
              headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3];
              break;
            default:
              headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3], headers.host = args[4];
          }
          return [headers, connectCallback, errorCallback];
        };
    
        Client.prototype.connect = function() {
          var args, errorCallback, headers, out;
          args = 1 <= arguments.length ? __slice.call(arguments, 0) : [];
          out = this._parseConnect.apply(this, args);
          headers = out[0], this.connectCallback = out[1], errorCallback = out[2];
          if (typeof this.debug === "function") {
            this.debug("Opening Web Socket...");
          }
          this.ws.onmessage = (function(_this) {
            return function(evt) {
              var arr, c, client, data, frame, messageID, onreceive, subscription, unmarshalledData, _i, _len, _ref, _results;
              data = typeof ArrayBuffer !== 'undefined' && evt.data instanceof ArrayBuffer ? (arr = new Uint8Array(evt.data), typeof _this.debug === "function" ? _this.debug("--- got data length: " + arr.length) : void 0, ((function() {
                var _i, _len, _results;
                _results = [];
                for (_i = 0, _len = arr.length; _i < _len; _i++) {
                  c = arr[_i];
                  _results.push(String.fromCharCode(c));
                }
                return _results;
              })()).join('')) : evt.data;
              _this.serverActivity = now();
              if (data === Byte.LF) {
                if (typeof _this.debug === "function") {
                  _this.debug("<<< PONG");
                }
                return;
              }
              if (typeof _this.debug === "function") {
                _this.debug("<<< " + data);
              }
              unmarshalledData = Frame.unmarshall(_this.partialData + data);
              _this.partialData = unmarshalledData.partial;
              _ref = unmarshalledData.frames;
              _results = [];
              for (_i = 0, _len = _ref.length; _i < _len; _i++) {
                frame = _ref[_i];
                switch (frame.command) {
                  case "CONNECTED":
                    if (typeof _this.debug === "function") {
                      _this.debug("connected to server " + frame.headers.server);
                    }
                    _this.connected = true;
                    _this._setupHeartbeat(frame.headers);
                    _results.push(typeof _this.connectCallback === "function" ? _this.connectCallback(frame) : void 0);
                    break;
                  case "MESSAGE":
                    subscription = frame.headers.subscription;
                    onreceive = _this.subscriptions[subscription] || _this.onreceive;
                    if (onreceive) {
                      client = _this;
                      messageID = frame.headers["message-id"];
                      frame.ack = function(headers) {
                        if (headers == null) {
                          headers = {};
                        }
                        return client.ack(messageID, subscription, headers);
                      };
                      frame.nack = function(headers) {
                        if (headers == null) {
                          headers = {};
                        }
                        return client.nack(messageID, subscription, headers);
                      };
                      _results.push(onreceive(frame));
                    } else {
                      _results.push(typeof _this.debug === "function" ? _this.debug("Unhandled received MESSAGE: " + frame) : void 0);
                    }
                    break;
                  case "RECEIPT":
                    _results.push(typeof _this.onreceipt === "function" ? _this.onreceipt(frame) : void 0);
                    break;
                  case "ERROR":
                    _results.push(typeof errorCallback === "function" ? errorCallback(frame) : void 0);
                    break;
                  default:
                    _results.push(typeof _this.debug === "function" ? _this.debug("Unhandled frame: " + frame) : void 0);
                }
              }
              return _results;
            };
          })(this);
          this.ws.onclose = (function(_this) {
            return function() {
              var msg;
              msg = "Whoops! Lost connection to " + _this.ws.url;
              if (typeof _this.debug === "function") {
                _this.debug(msg);
              }
              _this._cleanUp();
              return typeof errorCallback === "function" ? errorCallback(msg) : void 0;
            };
          })(this);
          return this.ws.onopen = (function(_this) {
            return function() {
              if (typeof _this.debug === "function") {
                _this.debug('Web Socket Opened...');
              }
              headers["accept-version"] = Stomp.VERSIONS.supportedVersions();
              headers["heart-beat"] = [_this.heartbeat.outgoing, _this.heartbeat.incoming].join(',');
              return _this._transmit("CONNECT", headers);
            };
          })(this);
        };
    
        Client.prototype.disconnect = function(disconnectCallback, headers) {
          if (headers == null) {
            headers = {};
          }
          this._transmit("DISCONNECT", headers);
          this.ws.onclose = null;
          this.ws.close();
          this._cleanUp();
          return typeof disconnectCallback === "function" ? disconnectCallback() : void 0;
        };
    
        Client.prototype._cleanUp = function() {
          this.connected = false;
          if (this.pinger) {
            Stomp.clearInterval(this.pinger);
          }
          if (this.ponger) {
            return Stomp.clearInterval(this.ponger);
          }
        };
    
        Client.prototype.send = function(destination, headers, body) {
          if (headers == null) {
            headers = {};
          }
          if (body == null) {
            body = '';
          }
          headers.destination = destination;
          return this._transmit("SEND", headers, body);
        };
    
        Client.prototype.subscribe = function(destination, callback, headers) {
          var client;
          if (headers == null) {
            headers = {};
          }
          if (!headers.id) {
            headers.id = "sub-" + this.counter++;
          }
          headers.destination = destination;
          this.subscriptions[headers.id] = callback;
          this._transmit("SUBSCRIBE", headers);
          client = this;
          return {
            id: headers.id,
            unsubscribe: function() {
              return client.unsubscribe(headers.id);
            }
          };
        };
    
        Client.prototype.unsubscribe = function(id) {
          delete this.subscriptions[id];
          return this._transmit("UNSUBSCRIBE", {
            id: id
          });
        };
    
        Client.prototype.begin = function(transaction) {
          var client, txid;
          txid = transaction || "tx-" + this.counter++;
          this._transmit("BEGIN", {
            transaction: txid
          });
          client = this;
          return {
            id: txid,
            commit: function() {
              return client.commit(txid);
            },
            abort: function() {
              return client.abort(txid);
            }
          };
        };
    
        Client.prototype.commit = function(transaction) {
          return this._transmit("COMMIT", {
            transaction: transaction
          });
        };
    
        Client.prototype.abort = function(transaction) {
          return this._transmit("ABORT", {
            transaction: transaction
          });
        };
    
        Client.prototype.ack = function(messageID, subscription, headers) {
          if (headers == null) {
            headers = {};
          }
          headers["message-id"] = messageID;
          headers.subscription = subscription;
          return this._transmit("ACK", headers);
        };
    
        Client.prototype.nack = function(messageID, subscription, headers) {
          if (headers == null) {
            headers = {};
          }
          headers["message-id"] = messageID;
          headers.subscription = subscription;
          return this._transmit("NACK", headers);
        };
    
        return Client;
    
      })();
    
      Stomp = {
        VERSIONS: {
          V1_0: '1.0',
          V1_1: '1.1',
          V1_2: '1.2',
          supportedVersions: function() {
            return '1.1,1.0';
          }
        },
        client: function(url, protocols) {
          var klass, ws;
          if (protocols == null) {
            protocols = ['v10.stomp', 'v11.stomp'];
          }
          klass = Stomp.WebSocketClass || WebSocket;
          ws = new klass(url, protocols);
          return new Client(ws);
        },
        over: function(ws) {
          return new Client(ws);
        },
        Frame: Frame
      };
    
      if (typeof exports !== "undefined" && exports !== null) {
        exports.Stomp = Stomp;
      }
    
      if (typeof window !== "undefined" && window !== null) {
        Stomp.setInterval = function(interval, f) {
          return window.setInterval(f, interval);
        };
        Stomp.clearInterval = function(id) {
          return window.clearInterval(id);
        };
        window.Stomp = Stomp;
      } else if (!exports) {
        self.Stomp = Stomp;
      }
    
    }).call(this);


  • 相关阅读:
    atitit.nfc 身份证 银行卡 芯片卡 解决方案 attilax总结
    atitit.php 流行框架 前三甲为:Laravel、Phalcon、Symfony2 attilax 总结
    Atitit.执行cmd 命令行 php
    Atitit. 图像处理jpg图片的压缩 清理垃圾图片 java版本
    atitit。企业组织与软件工程的策略 战略 趋势 原则 attilax 大总结
    atitit. 管理哲学 大毁灭 如何防止企业的自我毁灭
    Atitit.java的浏览器插件技术 Applet japplet attilax总结
    Atitit.jquery 版本新特性attilax总结
    Atitit. 软件开发中的管理哲学一个伟大的事业必然是过程导向为主 过程导向 vs 结果导向
    (转)获取手机的IMEI号
  • 原文地址:https://www.cnblogs.com/gmhappy/p/9472419.html
Copyright © 2011-2022 走看看