问题描述:
MQTT.js提供了连接MQTT的一套javascipt的方法,可供前端连接到MQTT服务器,也可以作为脚本测试。
以脚本形式,用nodejs运行,是没有问题的,能够正常连接并且发送报文。
但是如果把js代码放到HTML文件中,就不能正常完成连接,提示:
客户端提示:
服务器提示:
问题解决;
根据客户端提示,是无法完成握手连接,根据服务器提示,是因为解包的时候,包不符合格式,导致了连接拒绝。
通过查阅文献,发现普通的socket和websocket是不一样的。
直接运行脚本使用的是socket.io 而 浏览器使用的是websocket,而处理这两种报文是不一样的。
所以猜测是MQTT并没有打开websocket支持,而是把过来的包当做普通的socket包处理了。
查阅代码发现:
我服务器使用的是moquette,里面的nettyAcceptor已经明确说明了TCP和websocket的不同处理,可以看到代码中websocket是关闭的:
private void initializePlainTCPTransport(final NettyMQTTHandler handler, IConfig props) throws IOException { final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME); String tcpPortProp = props.getProperty(PORT_PROPERTY_NAME, DISABLED_PORT_BIND); if (DISABLED_PORT_BIND.equals(tcpPortProp)) { LOG.info("tcp MQTT is disabled because the value for the property with key {}", BrokerConstants.PORT_PROPERTY_NAME); return; } int port = Integer.parseInt(tcpPortProp); initFactory(host, port, new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) { pipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, Constants.DEFAULT_CONNECT_TIMEOUT)); pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler); // pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR)); pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector)); pipeline.addLast("decoder", new MQTTDecoder()); pipeline.addLast("encoder", new MQTTEncoder()); pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector)); // pipeline.addLast("messageLogger", new MQTTMessageLogger()); pipeline.addLast("handler", handler); } }); } private void initializeWebSocketTransport(final NettyMQTTHandler handler, IConfig props) throws IOException { String webSocketPortProp = props.getProperty(WEB_SOCKET_PORT_PROPERTY_NAME, DISABLED_PORT_BIND); if (DISABLED_PORT_BIND.equals(webSocketPortProp)) { //Do nothing no WebSocket configured LOG.info("WebSocket is disabled"); return; } else{ LOG.info("WebSocket is enable"); } int port = Integer.valueOf(webSocketPortProp); final MoquetteIdleTimeoutHandler timeoutHandler = new MoquetteIdleTimeoutHandler(); String host = props.getProperty(BrokerConstants.HOST_PROPERTY_NAME); initFactory(host, port, new PipelineInitializer() { @Override void init(ChannelPipeline pipeline) { pipeline.addLast("httpEncoder", new HttpResponseEncoder()); pipeline.addLast("httpDecoder", new HttpRequestDecoder()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); pipeline.addLast("webSocketHandler", new WebSocketServerProtocolHandler("/mqtt", MQTT_SUBPROTOCOL_CSV_LIST)); pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder()); pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder()); pipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, Constants.DEFAULT_CONNECT_TIMEOUT)); pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler); pipeline.addFirst("bytemetrics", new BytesMetricsHandler(m_bytesMetricsCollector)); pipeline.addLast("decoder", new MQTTDecoder()); pipeline.addLast("encoder", new MQTTEncoder()); pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector)); pipeline.addLast("handler", handler); } }); }
解决方法:
设置一个端口,我设为1885,将websocket服务打开就可以了。
String webSocketPortProp = props.getProperty(WEB_SOCKET_PORT_PROPERTY_NAME); if (DISABLED_PORT_BIND.equals(webSocketPortProp)) { //Do nothing no WebSocket configured LOG.info("WebSocket is disabled"); return; } else{ LOG.info("WebSocket is enable"); } int port = Integer.valueOf(webSocketPortProp);