zoukankan      html  css  js  c++  java
  • RabbitMQ基于Stomp实现与MQTT客户端通信

    请参照RabbitMQ应用SpringBoot集成RabbitMQ并实现消息确认机制

    详情参照官方文档https://www.rabbitmq.com/stomp.htmlhttps://github.com/rabbitmq/rabbitmq-web-stomp-examples

    安装插件

    rabbitmq-plugins enable rabbitmq_stomp  
    rabbitmq-plugins enable rabbitmq_web_stomp_examples  

    官方说明

    MQTT配置


    网页STOMP配置

    注意监听的主题及持久化配置

    <!DOCTYPE html>
    <html><head>
      <script src="jquery-1.9.1.min.js"></script>
      <script src="stomp.js"></script>
      <style>
          .box {
               440px;
              float: left;
              margin: 0 20px 0 20px;
          }
    
          .box div, .box input {
              border: 1px solid;
              -moz-border-radius: 4px;
              border-radius: 4px;
               100%;
              padding: 5px;
              margin: 3px 0 10px 0;
          }
    
          .box div {
              border-color: grey;
              height: 300px;
              overflow: auto;
          }
    
          div code {
              display: block;
          }
    
          #first div code {
              -moz-border-radius: 2px;
              border-radius: 2px;
              border: 1px solid #eee;
              margin-bottom: 5px;
          }
    
          #second div {
              font-size: 0.8em;
          }
      </style>
      <title>RabbitMQ Web STOMP Examples : Temporary Queue</title>
      <link href="main.css" rel="stylesheet" type="text/css"/>
    </head><body lang="en">
    
        <div id="first" class="box">
          <h2>Received</h2>
          <div></div>
          <form><input autocomplete="off" placeholder="Type here..."></input></form>
        </div>
    
        <div id="second" class="box">
          <h2>Logs</h2>
          <div></div>
        </div>
    
        <script>
          var ws = new WebSocket('ws://127.0.0.1:15674/ws');
          var client = Stomp.over(ws);
    
          client.debug = function(e) {
            $('#second div').append($("<code>").text(e));
          };
    
          // default receive callback to get message from temporary queues
          client.onreceive = function(m) {
            $('#first div').append($("<code>").text(m.body));
          }
    
          var on_connect = function(x) {
              var headers = new Object();
              
              headers.id='1234';
              headers.durable=true;
              headers['auto-delete']=false;
              id = client.subscribe("/exchange/amq.topic/topic.baqgl.#", function(m) {
                // reply by sending the reversed text to the temp queue defined in the "reply-to" header
                var reversedText = m.body.split("").reverse().join("");
                alert(reversedText);
                //client.send(m.headers['reply-to'], {"content-type":"text/plain"}, reversedText);
              },headers);
          };
          var on_error =  function() {
            console.log('error');
          };
          client.connect('guest', 'guest', on_connect, on_error, '/');
    
          $('#first form').submit(function() {
            var text = $('#first form input').val();
            if (text) {
                client.send('/exchange/amq.topic/topic.baqgl.admin.1', {'reply-to': '/temp-queue/foo'}, text);
                alert(text);
                $('#first form input').val("");
              }
              return false;
          });
        </script>
    </body></html>

    读取持久化配置

    官方说明






    stomp.js

    // Generated by CoffeeScript 1.7.1
    
    /*
       Stomp Over WebSocket http://www.jmesnil.net/stomp-websocket/doc/ | Apache License V2.0
    
       Copyright (C) 2010-2013 [Jeff Mesnil](http://jmesnil.net/)
       Copyright (C) 2012 [FuseSource, Inc.](http://fusesource.com)
     */
    
    (function() {
      var Byte, Client, Frame, Stomp,
        __hasProp = {}.hasOwnProperty,
        __slice = [].slice;
    
      Byte = {
        LF: 'x0A',
        NULL: 'x00'
      };
    
      Frame = (function() {
        var unmarshallSingle;
    
        function Frame(command, headers, body) {
          this.command = command;
          this.headers = headers != null ? headers : {};
          this.body = body != null ? body : '';
        }
    
        Frame.prototype.toString = function() {
          var lines, name, skipContentLength, value, _ref;
          lines = [this.command];
          skipContentLength = this.headers['content-length'] === false ? true : false;
          if (skipContentLength) {
            delete this.headers['content-length'];
          }
          _ref = this.headers;
          for (name in _ref) {
            if (!__hasProp.call(_ref, name)) continue;
            value = _ref[name];
            lines.push("" + name + ":" + value);
          }
          if (this.body && !skipContentLength) {
            lines.push("content-length:" + (Frame.sizeOfUTF8(this.body)));
          }
          lines.push(Byte.LF + this.body);
          return lines.join(Byte.LF);
        };
    
        Frame.sizeOfUTF8 = function(s) {
          if (s) {
            return encodeURI(s).match(/%..|./g).length;
          } else {
            return 0;
          }
        };
    
        unmarshallSingle = function(data) {
          var body, chr, command, divider, headerLines, headers, i, idx, len, line, start, trim, _i, _j, _len, _ref, _ref1;
          divider = data.search(RegExp("" + Byte.LF + Byte.LF));
          headerLines = data.substring(0, divider).split(Byte.LF);
          command = headerLines.shift();
          headers = {};
          trim = function(str) {
            return str.replace(/^s+|s+$/g, '');
          };
          _ref = headerLines.reverse();
          for (_i = 0, _len = _ref.length; _i < _len; _i++) {
            line = _ref[_i];
            idx = line.indexOf(':');
            headers[trim(line.substring(0, idx))] = trim(line.substring(idx + 1));
          }
          body = '';
          start = divider + 2;
          if (headers['content-length']) {
            len = parseInt(headers['content-length']);
            body = ('' + data).substring(start, start + len);
          } else {
            chr = null;
            for (i = _j = start, _ref1 = data.length; start <= _ref1 ? _j < _ref1 : _j > _ref1; i = start <= _ref1 ? ++_j : --_j) {
              chr = data.charAt(i);
              if (chr === Byte.NULL) {
                break;
              }
              body += chr;
            }
          }
          return new Frame(command, headers, body);
        };
    
        Frame.unmarshall = function(datas) {
          var frame, frames, last_frame, r;
          frames = datas.split(RegExp("" + Byte.NULL + Byte.LF + "*"));
          r = {
            frames: [],
            partial: ''
          };
          r.frames = (function() {
            var _i, _len, _ref, _results;
            _ref = frames.slice(0, -1);
            _results = [];
            for (_i = 0, _len = _ref.length; _i < _len; _i++) {
              frame = _ref[_i];
              _results.push(unmarshallSingle(frame));
            }
            return _results;
          })();
          last_frame = frames.slice(-1)[0];
          if (last_frame === Byte.LF || (last_frame.search(RegExp("" + Byte.NULL + Byte.LF + "*$"))) !== -1) {
            r.frames.push(unmarshallSingle(last_frame));
          } else {
            r.partial = last_frame;
          }
          return r;
        };
    
        Frame.marshall = function(command, headers, body) {
          var frame;
          frame = new Frame(command, headers, body);
          return frame.toString() + Byte.NULL;
        };
    
        return Frame;
    
      })();
    
      Client = (function() {
        var now;
    
        function Client(ws) {
          this.ws = ws;
          this.ws.binaryType = "arraybuffer";
          this.counter = 0;
          this.connected = false;
          this.heartbeat = {
            outgoing: 10000,
            incoming: 10000
          };
          this.maxWebSocketFrameSize = 16 * 1024;
          this.subscriptions = {};
          this.partialData = '';
        }
    
        Client.prototype.debug = function(message) {
          var _ref;
          return typeof window !== "undefined" && window !== null ? (_ref = window.console) != null ? _ref.log(message) : void 0 : void 0;
        };
    
        now = function() {
          if (Date.now) {
            return Date.now();
          } else {
            return new Date().valueOf;
          }
        };
    
        Client.prototype._transmit = function(command, headers, body) {
          var out;
          out = Frame.marshall(command, headers, body);
          if (typeof this.debug === "function") {
            this.debug(">>> " + out);
          }
          while (true) {
            if (out.length > this.maxWebSocketFrameSize) {
              this.ws.send(out.substring(0, this.maxWebSocketFrameSize));
              out = out.substring(this.maxWebSocketFrameSize);
              if (typeof this.debug === "function") {
                this.debug("remaining = " + out.length);
              }
            } else {
              return this.ws.send(out);
            }
          }
        };
    
        Client.prototype._setupHeartbeat = function(headers) {
          var serverIncoming, serverOutgoing, ttl, v, _ref, _ref1;
          if ((_ref = headers.version) !== Stomp.VERSIONS.V1_1 && _ref !== Stomp.VERSIONS.V1_2) {
            return;
          }
          _ref1 = (function() {
            var _i, _len, _ref1, _results;
            _ref1 = headers['heart-beat'].split(",");
            _results = [];
            for (_i = 0, _len = _ref1.length; _i < _len; _i++) {
              v = _ref1[_i];
              _results.push(parseInt(v));
            }
            return _results;
          })(), serverOutgoing = _ref1[0], serverIncoming = _ref1[1];
          if (!(this.heartbeat.outgoing === 0 || serverIncoming === 0)) {
            ttl = Math.max(this.heartbeat.outgoing, serverIncoming);
            if (typeof this.debug === "function") {
              this.debug("send PING every " + ttl + "ms");
            }
            this.pinger = Stomp.setInterval(ttl, (function(_this) {
              return function() {
                _this.ws.send(Byte.LF);
                return typeof _this.debug === "function" ? _this.debug(">>> PING") : void 0;
              };
            })(this));
          }
          if (!(this.heartbeat.incoming === 0 || serverOutgoing === 0)) {
            ttl = Math.max(this.heartbeat.incoming, serverOutgoing);
            if (typeof this.debug === "function") {
              this.debug("check PONG every " + ttl + "ms");
            }
            return this.ponger = Stomp.setInterval(ttl, (function(_this) {
              return function() {
                var delta;
                delta = now() - _this.serverActivity;
                if (delta > ttl * 2) {
                  if (typeof _this.debug === "function") {
                    _this.debug("did not receive server activity for the last " + delta + "ms");
                  }
                  return _this.ws.close();
                }
              };
            })(this));
          }
        };
    
        Client.prototype._parseConnect = function() {
          var args, connectCallback, errorCallback, headers;
          args = 1 <= arguments.length ? __slice.call(arguments, 0) : [];
          headers = {};
          switch (args.length) {
            case 2:
              headers = args[0], connectCallback = args[1];
              break;
            case 3:
              if (args[1] instanceof Function) {
                headers = args[0], connectCallback = args[1], errorCallback = args[2];
              } else {
                headers.login = args[0], headers.passcode = args[1], connectCallback = args[2];
              }
              break;
            case 4:
              headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3];
              break;
            default:
              headers.login = args[0], headers.passcode = args[1], connectCallback = args[2], errorCallback = args[3], headers.host = args[4];
          }
          return [headers, connectCallback, errorCallback];
        };
    
        Client.prototype.connect = function() {
          var args, errorCallback, headers, out;
          args = 1 <= arguments.length ? __slice.call(arguments, 0) : [];
          out = this._parseConnect.apply(this, args);
          headers = out[0], this.connectCallback = out[1], errorCallback = out[2];
          if (typeof this.debug === "function") {
            this.debug("Opening Web Socket...");
          }
          this.ws.onmessage = (function(_this) {
            return function(evt) {
              var arr, c, client, data, frame, messageID, onreceive, subscription, unmarshalledData, _i, _len, _ref, _results;
              data = typeof ArrayBuffer !== 'undefined' && evt.data instanceof ArrayBuffer ? (arr = new Uint8Array(evt.data), typeof _this.debug === "function" ? _this.debug("--- got data length: " + arr.length) : void 0, ((function() {
                var _i, _len, _results;
                _results = [];
                for (_i = 0, _len = arr.length; _i < _len; _i++) {
                  c = arr[_i];
                  _results.push(String.fromCharCode(c));
                }
                return _results;
              })()).join('')) : evt.data;
              _this.serverActivity = now();
              if (data === Byte.LF) {
                if (typeof _this.debug === "function") {
                  _this.debug("<<< PONG");
                }
                return;
              }
              if (typeof _this.debug === "function") {
                _this.debug("<<< " + data);
              }
              unmarshalledData = Frame.unmarshall(_this.partialData + data);
              _this.partialData = unmarshalledData.partial;
              _ref = unmarshalledData.frames;
              _results = [];
              for (_i = 0, _len = _ref.length; _i < _len; _i++) {
                frame = _ref[_i];
                switch (frame.command) {
                  case "CONNECTED":
                    if (typeof _this.debug === "function") {
                      _this.debug("connected to server " + frame.headers.server);
                    }
                    _this.connected = true;
                    _this._setupHeartbeat(frame.headers);
                    _results.push(typeof _this.connectCallback === "function" ? _this.connectCallback(frame) : void 0);
                    break;
                  case "MESSAGE":
                    subscription = frame.headers.subscription;
                    onreceive = _this.subscriptions[subscription] || _this.onreceive;
                    if (onreceive) {
                      client = _this;
                      messageID = frame.headers["message-id"];
                      frame.ack = function(headers) {
                        if (headers == null) {
                          headers = {};
                        }
                        return client.ack(messageID, subscription, headers);
                      };
                      frame.nack = function(headers) {
                        if (headers == null) {
                          headers = {};
                        }
                        return client.nack(messageID, subscription, headers);
                      };
                      _results.push(onreceive(frame));
                    } else {
                      _results.push(typeof _this.debug === "function" ? _this.debug("Unhandled received MESSAGE: " + frame) : void 0);
                    }
                    break;
                  case "RECEIPT":
                    _results.push(typeof _this.onreceipt === "function" ? _this.onreceipt(frame) : void 0);
                    break;
                  case "ERROR":
                    _results.push(typeof errorCallback === "function" ? errorCallback(frame) : void 0);
                    break;
                  default:
                    _results.push(typeof _this.debug === "function" ? _this.debug("Unhandled frame: " + frame) : void 0);
                }
              }
              return _results;
            };
          })(this);
          this.ws.onclose = (function(_this) {
            return function() {
              var msg;
              msg = "Whoops! Lost connection to " + _this.ws.url;
              if (typeof _this.debug === "function") {
                _this.debug(msg);
              }
              _this._cleanUp();
              return typeof errorCallback === "function" ? errorCallback(msg) : void 0;
            };
          })(this);
          return this.ws.onopen = (function(_this) {
            return function() {
              if (typeof _this.debug === "function") {
                _this.debug('Web Socket Opened...');
              }
              headers["accept-version"] = Stomp.VERSIONS.supportedVersions();
              headers["heart-beat"] = [_this.heartbeat.outgoing, _this.heartbeat.incoming].join(',');
              return _this._transmit("CONNECT", headers);
            };
          })(this);
        };
    
        Client.prototype.disconnect = function(disconnectCallback, headers) {
          if (headers == null) {
            headers = {};
          }
          this._transmit("DISCONNECT", headers);
          this.ws.onclose = null;
          this.ws.close();
          this._cleanUp();
          return typeof disconnectCallback === "function" ? disconnectCallback() : void 0;
        };
    
        Client.prototype._cleanUp = function() {
          this.connected = false;
          if (this.pinger) {
            Stomp.clearInterval(this.pinger);
          }
          if (this.ponger) {
            return Stomp.clearInterval(this.ponger);
          }
        };
    
        Client.prototype.send = function(destination, headers, body) {
          if (headers == null) {
            headers = {};
          }
          if (body == null) {
            body = '';
          }
          headers.destination = destination;
          return this._transmit("SEND", headers, body);
        };
    
        Client.prototype.subscribe = function(destination, callback, headers) {
          var client;
          if (headers == null) {
            headers = {};
          }
          if (!headers.id) {
            headers.id = "sub-" + this.counter++;
          }
          headers.destination = destination;
          this.subscriptions[headers.id] = callback;
          this._transmit("SUBSCRIBE", headers);
          client = this;
          return {
            id: headers.id,
            unsubscribe: function() {
              return client.unsubscribe(headers.id);
            }
          };
        };
    
        Client.prototype.unsubscribe = function(id) {
          delete this.subscriptions[id];
          return this._transmit("UNSUBSCRIBE", {
            id: id
          });
        };
    
        Client.prototype.begin = function(transaction) {
          var client, txid;
          txid = transaction || "tx-" + this.counter++;
          this._transmit("BEGIN", {
            transaction: txid
          });
          client = this;
          return {
            id: txid,
            commit: function() {
              return client.commit(txid);
            },
            abort: function() {
              return client.abort(txid);
            }
          };
        };
    
        Client.prototype.commit = function(transaction) {
          return this._transmit("COMMIT", {
            transaction: transaction
          });
        };
    
        Client.prototype.abort = function(transaction) {
          return this._transmit("ABORT", {
            transaction: transaction
          });
        };
    
        Client.prototype.ack = function(messageID, subscription, headers) {
          if (headers == null) {
            headers = {};
          }
          headers["message-id"] = messageID;
          headers.subscription = subscription;
          return this._transmit("ACK", headers);
        };
    
        Client.prototype.nack = function(messageID, subscription, headers) {
          if (headers == null) {
            headers = {};
          }
          headers["message-id"] = messageID;
          headers.subscription = subscription;
          return this._transmit("NACK", headers);
        };
    
        return Client;
    
      })();
    
      Stomp = {
        VERSIONS: {
          V1_0: '1.0',
          V1_1: '1.1',
          V1_2: '1.2',
          supportedVersions: function() {
            return '1.1,1.0';
          }
        },
        client: function(url, protocols) {
          var klass, ws;
          if (protocols == null) {
            protocols = ['v10.stomp', 'v11.stomp'];
          }
          klass = Stomp.WebSocketClass || WebSocket;
          ws = new klass(url, protocols);
          return new Client(ws);
        },
        over: function(ws) {
          return new Client(ws);
        },
        Frame: Frame
      };
    
      if (typeof exports !== "undefined" && exports !== null) {
        exports.Stomp = Stomp;
      }
    
      if (typeof window !== "undefined" && window !== null) {
        Stomp.setInterval = function(interval, f) {
          return window.setInterval(f, interval);
        };
        Stomp.clearInterval = function(id) {
          return window.clearInterval(id);
        };
        window.Stomp = Stomp;
      } else if (!exports) {
        self.Stomp = Stomp;
      }
    
    }).call(this);


  • 相关阅读:
    修改ESMAP中的LOGO
    AppCompatActivity隐藏顶部任务栏
    Android原生开发,view标签导致RecyleView Adapter 中报错的奇葩问题
    用UPUPW做服务器,通过thinkphp5完美搭建APP后台
    hhh
    Redis 学习
    无刷新文件上传
    mongoDB 学习
    MySQL读写分离-架构
    Mysql 通用二进制包安装
  • 原文地址:https://www.cnblogs.com/gmhappy/p/9472419.html
Copyright © 2011-2022 走看看