zoukankan      html  css  js  c++  java
  • MQTT协议笔记之mqtt.io项目TCP协议支持

    前言

    MQTT定义了物联网传输协议,其标准倾向于原始TCP实现。构建于TCP的上层协议堆栈,诸如HTTP等,在空间上多了一些处理路径,稍微耗费了CPU和内存,虽看似微乎其微,但对很多处理能力不足的嵌入式设备而言,选择原始的TCP却是最好的选择。

    但单纯TCP不是所有物件联网的最佳选择,提供构建与TCP基础之上的传统的HTTP通信支持,尤其是浏览器、性能富裕的桌面涉及领域,还是企业最 可信赖、最可控的传输方式之一。支持多种多样的连接通道,让目前所有一切皆可联网,除了原始TCP Socket,还要支持构建于其之上的HTTP、HTML5 Websocket,就很有必要。

    mqtt.io,Pub/Sub中间件,也可以称之为推送服务器,涵盖所有主流桌面系统、浏览器平台,并且倾斜 于移动互联网,以及物联网的广阔适应天地。使用一句英文概括可能更为合适:"Make everything connect”,让所有物件都可连接。其业务目标,可用下图概括:

    mqtt.io致力于做下一代支持所有主流桌面平台、所有主流浏览器、所有可联网物件都可以联网的PUB/SUB消息推送系统。

    构建此系统,在于降低传统企业各自分散的推送系统,统一运营,统一管理,节省人员、运维开支。

    注意事项

    1. mqtt.io是一个项目名称,没有官网,http://www.mqtt.io,和这个项目没有一毛钱关系。
    2. 项目地址:https://github.com/yongboy/mqtt.io
    3. 项目名称启发于 http://socket.io http://netty.io 等知名framework。
    4. 目前只实现QoS 0基本特性,实现概览,后期会根据反馈,做出一些调整

    依赖

    1. netty 4,目前JAVA IO界明星
    2. mqtt-library 二进制和MQTT对象的转换,这种苦活累活都是它来做,真心让人喜欢。

    数据流转

    解码器

    用于转换二进制流到JAVA对象的过程:

    123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
    package io.mqtt.handler.coder;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageDecoder;
     
    import java.io.ByteArrayInputStream;
    import java.util.List;
     
    import org.meqantt.message.Message;
    import org.meqantt.message.MessageInputStream;
     
    public class MqttMessageNewDecoder extends MessageToMessageDecoder<ByteBuf> {
     
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf buf,
    List<Object> out) throws Exception {
    if (buf.readableBytes() < 2) {
    return;
    }
    buf.markReaderIndex();
    buf.readByte(); // read away header
    int msgLength = 0;
    int multiplier = 1;
    int digit;
    int lengthSize = 0;
    do {
    lengthSize++;
    digit = buf.readByte();
    msgLength += (digit & 0x7f) * multiplier;
    multiplier *= 128;
    if ((digit & 0x80) > 0 && !buf.isReadable()) {
    buf.resetReaderIndex();
    return;
    }
    } while ((digit & 0x80) > 0);
    if (buf.readableBytes() < msgLength) {
    buf.resetReaderIndex();
    return;
    }
    byte[] data = new byte[1 + lengthSize + msgLength];
    buf.resetReaderIndex();
    buf.readBytes(data);
    MessageInputStream mis = new MessageInputStream(
    new ByteArrayInputStream(data));
    Message msg = mis.readMessage();
    mis.close();
     
    out.add(msg);
    }
    }

    编码器

    对所有要写入网卡缓冲区的JAVA对象转换成二进制:

    12345678910111213141516171819202122232425
    package io.mqtt.handler.coder;
     
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageEncoder;
     
    import java.util.List;
     
    import org.meqantt.message.Message;
     
    @Sharable
    public class MqttMessageNewEncoder extends MessageToMessageEncoder<Object> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg,
    List<Object> out) throws Exception {
    if (!(msg instanceof Message)) {
    return;
    }
     
    byte[] data = ((Message) msg).toBytes();
     
    out.add(Unpooled.wrappedBuffer(data));
    }
    }

    借助于mqtt-library项目,编解码不复杂。

    MQTT的消息处理

    1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
    package io.mqtt.handler;
     
    import io.mqtt.processer.ConnectProcesser;
    import io.mqtt.processer.DisConnectProcesser;
    import io.mqtt.processer.PingReqProcesser;
    import io.mqtt.processer.Processer;
    import io.mqtt.processer.PublishProcesser;
    import io.mqtt.processer.SubscribeProcesser;
    import io.mqtt.processer.UnsubscribeProcesser;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.ReadTimeoutException;
     
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
     
    import org.meqantt.message.ConnAckMessage;
    import org.meqantt.message.ConnAckMessage.ConnectionStatus;
    import org.meqantt.message.DisconnectMessage;
    import org.meqantt.message.Message;
    import org.meqantt.message.Message.Type;
    import org.meqantt.message.PingRespMessage;
     
    public class MqttMessageHandler extends ChannelInboundHandlerAdapter {
    private static PingRespMessage PINGRESP = new PingRespMessage();
     
    private static final Map<Message.Type, Processer> processers;
    static {
    Map<Message.Type, Processer> map = new HashMap<Message.Type, Processer>(
    6);
     
    map.put(Type.CONNECT, new ConnectProcesser());
    map.put(Type.PUBLISH, new PublishProcesser());
    map.put(Type.SUBSCRIBE, new SubscribeProcesser());
    map.put(Type.UNSUBSCRIBE, new UnsubscribeProcesser());
    map.put(Type.PINGREQ, new PingReqProcesser());
    map.put(Type.DISCONNECT, new DisConnectProcesser());
     
    processers = Collections.unmodifiableMap(map);
    }
     
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e)
    throws Exception {
    try {
    if (e.getCause() instanceof ReadTimeoutException) {
    ctx.write(PINGRESP).addListener(
    ChannelFutureListener.CLOSE_ON_FAILURE);
    } else {
    ctx.channel().close();
    }
    } catch (Throwable t) {
    t.printStackTrace();
    ctx.channel().close();
    }
     
    e.printStackTrace();
    }
     
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object obj)
    throws Exception {
    Message msg = (Message) obj;
    Processer p = processers.get(msg.getType());
    if (p == null) {
    return;
    }
    Message rmsg = p.proc(msg, ctx);
    if (rmsg == null) {
    return;
    }
     
    if (rmsg instanceof ConnAckMessage
    && ((ConnAckMessage) rmsg).getStatus() != ConnectionStatus.ACCEPTED) {
    ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE);
    } else if (rmsg instanceof DisconnectMessage) {
    ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE);
    } else {
    ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    }
    }
     
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
    }
    }

    更具体的可以查看项目。

    小结

    简单介绍了一个简单的不能再简单的MQTT Server,只具有最基本的QoS 0类型的消息订阅等。

    后面,对HTML 5 Websocket,会在现有基础代码之上,不做多大改动,增加对MQTT Over WebSocket的支持。

  • 相关阅读:
    搭建armlinuxgcc交叉编译工具链环境(Android原生(JNI)开发环境搭建)
    linux vi命令详解
    Android手机在开发调试时logcat不显示输出信息的解决办法
    2012的总结和13的展望
    Gvim编码学习笔记
    vue自定义过滤器格式化时间为年、月、日、小时、分钟、刚刚 J
    学校网站群建设理念
    何为真正网站群?
    手机网站——移动互联网新趋势
    建站是浮云,We7很给力
  • 原文地址:https://www.cnblogs.com/yudar/p/4642398.html
Copyright © 2011-2022 走看看