zoukankan      html  css  js  c++  java
  • mica-mqtt 1.0.2 发布,完善 stater 和 example

    一、简介

    mica-mqtt 1.0.2 发布,完善 stater 和 example

    mica mqtt

    mica-mqtt 基于 t-io 实现的简单、低延迟、高性能 的 mqtt 物联网开源组件。使用详见 mica-mqtt gitee 源码 mica-mqtt-example 模块。

    mica-mqtt: 基于 t-io 实现的低延迟、高性能的 mqtt 组件。 记得右上角点个star 关注更新!

    二、功能

    • 支持 MQTT v3.1、v3.1.1 以及 v5.0 协议。
    • 支持 MQTT client 客户端。
    • 支持 MQTT server 服务端。
    • 支持 MQTT 遗嘱消息。
    • 支持 MQTT 保留消息。
    • 支持自定义消息(mq)处理转发实现集群。
    • MQTT 客户端 阿里云 mqtt 连接 demo。
    • 支持 GraalVM 编译成本机可执行程序。
    • 支持 Spring boot 项目快速接入(mica-mqtt-spring-boot-starter)。
    • mica-mqtt-spring-boot-starter 支持对接 Prometheus + Grafana。

    三、待办

    • 添加 websocket 支持(已预研成功)。
    • 优化处理 mqtt session,以及支持 v5.0

    四、更新记录

    •  文档添加集群处理步骤说明,添加遗嘱消息、保留消息的使用场景。
    • ✨ 去除演示中的 qos2 参数,性能损耗大避免误用。
    • ✨ 遗嘱、保留消息内部消息转发抽象。
    • ✨ 添加 mica-mqtt-spring-boot-example 。感谢 wsq( @冷月宫主 )pr。
    • ✨ mica-mqtt-spring-boot-starter 支持客户端接入和服务端优化。感谢 wsq( @冷月宫主 )pr。
    • ✨ mica-mqtt-spring-boot-starter 服务端支持指标收集。可对接 Prometheus + Grafana 监控。
    • ✨ mqtt server 接受连接时,先判断该 clientId 是否存在其它连接,有则解绑并关闭其他连接。
    • ⬆️ 升级 mica-auto 到 2.1.3 修复 ide 多模块增量编译问题。

    五、Spring boot 快速接入

    5.1 添加依赖

    <dependency>
        <groupId>net.dreamlu</groupId>
        <artifactId>mica-mqtt-spring-boot-starter</artifactId>
        <version>1.0.2</version>
    </dependency>

    5.2 服务端 yml 配置

    mqtt:
      server:
        enabled: true               # 是否开启,默认:true
        ip: 127.0.0.1               # 服务端 ip 默认:127.0.0.1
        port: 5883                  # 端口,默认:1883
        name: Mica-Mqtt-Server      # 名称,默认:Mica-Mqtt-Server
        buffer-allocator: HEAP      # 堆内存和堆外内存,默认:堆内存
        heartbeat-timeout: 120000   # 心跳超时,单位毫秒,默认: 1000 * 120
        read-buffer-size: 8092      # 接收数据的 buffer size,默认:8092
        max-bytes-in-message: 8092  # 消息解析最大 bytes 长度,默认:8092
        debug: true                 # 如果开启 prometheus 指标收集建议关闭

    5.3 服务端可实现接口(注册成 Spring Bean 即可)

    接口

    是否必须

    说明

    IMqttServerAuthHandler

    用于客户端认证

    IMqttMessageListener

    消息监听

    IMqttConnectStatusListener

    连接状态监听

    IMqttSessionManager

    session 管理

    IMqttMessageStore

    集群是,单机否

    遗嘱和保留消息存储

    AbstractMqttMessageDispatcher

    集群是,单机否

    消息转发,(遗嘱、保留消息转发)

    IpStatListener

    t-io ip 状态监听

    5.4 服务端自定义配置(可选)

    @Configuration(proxyBeanMethods = false)
    public class MqttServerCustomizerConfiguration {
    
        @Bean
        public MqttServerCustomizer activeRecordPluginCustomizer() {
            return new MqttServerCustomizer() {
                @Override
                public void customize(MqttServerCreator creator) {
                    // 此处可自定义配置 creator,会覆盖 yml 中的配置
                    System.out.println("----------------MqttServerCustomizer-----------------");
                }
            };
        }
    
    }

    5.5 MqttServerTemplate 使用示例

    import net.dreamlu.iot.mqtt.codec.MqttQoS;
    import net.dreamlu.iot.mqtt.spring.server.MqttServerTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.nio.ByteBuffer;
    
    /**
     * @author wsq
     */
    @Service
    public class ServerService {
        @Autowired
        private MqttServerTemplate server;
    
        public boolean publish(String body) {
            server.publishAll("/test/123", ByteBuffer.wrap(body.getBytes()));
            return true;
        }
    }

    5.6 基于 mq 消息广播集群处理

    • 实现 IMqttConnectStatusListener 处理设备状态存储。
    • 实现 IMqttMessageListener 将消息转发到 mq,业务按需处理 mq 消息。
    • 实现 IMqttMessageStore 存储遗嘱和保留消息。
    • 实现 AbstractMqttMessageDispatcher 将消息发往 mq,mq 再广播回 mqtt 集群,mqtt 将消息发送到设备。
    • 业务消息发送到 mq,mq 广播到 mqtt 集群,mqtt 将消息发送到设备。

    5.7 Prometheus + Grafana 监控对接

    得益于 t-io 良好的设计,监控指标直接对接的 t-io stat,目前支持下列指标,后期会不断完善。

    支持得指标

    说明

    mqtt_connections_accepted

    共接受过连接数

    mqtt_connections_closed

    关闭过的连接数

    mqtt_connections_size

    当前连接数

    mqtt_messages_handled_packets

    已处理消息数

    mqtt_messages_handled_bytes

    已处理消息字节数

    mqtt_messages_received_packets

    已接收消息数

    mqtt_messages_received_bytes

    已处理消息字节数

    mqtt_messages_send_packets

    已发送消息数

    mqtt_messages_send_bytes

    已发送消息字节数

    mica-mqtt 1.0.2 发布,完善 stater 和 example

     

    关于 
    mica-mqtt-spring-boot-starter 客户端等更多使用方式请查看 gitee 
    mica-mqtt-spring-boot-starter 模块 readme 文档。

    六、普通 java 项目接入

    6.1 maven 依赖

     <dependency>
       <groupId>net.dreamlu</groupId>
       <artifactId>mica-mqtt-core</artifactId>
       <version>1.0.2</version>
     </dependency>

    6.2 mica-mqtt 客户端

     // 初始化 mqtt 客户端
     MqttClient client = MqttClient.create()
         .ip("127.0.0.1")
         .port(1883)                     // 默认:1883
         .username("admin")
         .password("123456")
         .version(MqttVersion.MQTT_5)    // 默认:3_1_1
         .clientId("xxxxxx")             // 默认:MICA-MQTT- 前缀和 36进制的纳秒数
         .connect();                     // 连接
     
         // 消息订阅,同类方法 subxxx
         client.subQos0("/test/#", (topic, payload) -> {
             logger.info(topic + '	' + ByteBufferUtil.toString(payload));
         });
         // 取消订阅
         client.unSubscribe("/test/#");
     
         // 发送消息
         client.publish("/test/client", ByteBuffer.wrap("mica最牛皮".getBytes(StandardCharsets.UTF_8)));
     
         // 断开连接
         client.disconnect();
         // 重连
         client.reconnect();
         // 停止
         client.stop();

    6.3 mica-mqtt 服务端

     // 注意:为了能接受更多链接(降低内存),请添加 jvm 参数 -Xss129k
     MqttServer mqttServer = MqttServer.create()
         // 默认:127.0.0.1
         .ip("127.0.0.1")
         // 默认:1883
         .port(1883)
         // 默认为: 8092(mqtt 默认最大消息大小),为了降低内存可以减小小此参数,如果消息过大 t-io 会尝试解析多次(建议根据实际业务情况而定)
         .readBufferSize(512)
         // 自定义认证
         .authHandler((clientId, userName, password) -> true)
         // 消息监听
         .messageListener((clientId, topic, mqttQoS, payload) -> {
             logger.info("clientId:{} topic:{} mqttQoS:{} message:{}", clientId, topic, mqttQoS, ByteBufferUtil.toString(payload));
         })
         // ssl 配置
         .useSsl("", "", "")
         // 自定义客户端上下线监听
         .connectStatusListener(new IMqttConnectStatusListener() {
             @Override
             public void online(String clientId) {
     
             }
     
             @Override
             public void offline(String clientId) {
     
             }
         })
         // 自定义消息转发,可用 mq 广播实现集群化处理
         .messageDispatcher(new IMqttMessageDispatcher() {
             @Override
             public void config(MqttServer mqttServer) {
     
             }
     
             @Override
             public boolean send(Message message) {
                 return false;
             }
     
             @Override
             public boolean send(String clientId, Message message) {
                 return false;
             }
         })
         .debug() // 开启 t-io debug 信息日志
         .start();
     
     // 发送给某个客户端
     mqttServer.publish("clientId","/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
     
     // 发送给所有在线监听这个 topic 的客户端
     mqttServer.publishAll("/test/123", ByteBuffer.wrap("mica最牛皮".getBytes()));
     
     // 停止服务
     mqttServer.stop();

    七、效果演示

    mica-mqtt 1.0.2 发布,完善 stater 和 example

    mica-mqtt-example 效果演示

    JUST DO IT!
  • 相关阅读:
    Elasticsearch的介绍与安装配置启动问题
    代码发布项目
    gitpython模块
    Paramiko模块
    gojs插件的介绍与使用
    django中如何实现websocket,真正通过websocket实现群聊功能
    如何实现服务端主动给客户端推送消息,websocket详解,以及django如何使用websocket问题
    简单爬取汽车之家新闻(requests模块+bs4)
    http协议版本,响应状态码,正反向代理的区别,与伪静态
    web开发经验——富头像上传编辑器的使用
  • 原文地址:https://www.cnblogs.com/caicz/p/15127908.html
Copyright © 2011-2022 走看看