zoukankan      html  css  js  c++  java
  • ThingsBoard 二次开发之源码分析 5-如何接收 MQTT 连接

    thingsboard聚集地

    Thingsboard 话题讨论区:https://forum.iotschool.com/topics/node8

    欢迎大家加入thingsboard 二次开发讨论群:121202538

    thingsboard交流QQ群 121202538

    ThingsBoard源码分析5-如何接收MQTT连接

    1. MQTT server

    需要接收设备的MQTT连接,那么thingsboard中必然有MQTT服务器,MQTT服务器创建的类是MqttTransportService

    基于netty的mqtt server,添加了MqttTransportServerInitializer的处理类,并向ChannelPipeline添加了netty的MqttDecoderMqttEncoder让我们可以忽略MQTT消息的编解码工作,重要的是添加了MqttTransportHandler

    2. MqttTransportHandler处理连接

    此例中,我们首先需要创建租户,租户管理员,并添加设备,使用MQTT Box模拟硬件设备,拷贝ACCESS TOKEN做为MQTT Box的Username开始连接我们的thingsboard后台

    mqtt消息处理流程

    如果图片看不清楚,请点击:

    由于没有使用ssl,收到连接请求以后,便会调用

    private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
        String userName = msg.payload().userName();
        log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
        if (StringUtils.isEmpty(userName)) {
            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
            ctx.close();
        } else {
            //取出userName,构造protobuf的类(方便传输与解析),交给transportService处理。此时会使用到源码解析第三篇DefaultTransportService的解析的相关信息了解process的处理。参阅下方①的详细解析。
            transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
                    new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
                        @Override
                        public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
                            onValidateDeviceResponse(msg, ctx);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            log.trace("[{}] Failed to process credentials: {}", address, userName, e);
                            ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
                            ctx.close();
                        }
                    });
        }
    }
    
    1. DefaultTransportServiceprocess方法构造了异步任务,成功调用onSuccessConsumer,失败调用onFailureConsumer

    2. 将验证用户的任务交由transportApiRequestTemplate.send

    public ListenableFuture<Response> send(Request request) {
        if (tickSize > maxPendingRequests) {
            return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
        }
        UUID requestId = UUID.randomUUID();
        request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
        //由第三篇文章的分析得出,此topic时tb_transport.api.responses.localHostName
        request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
        request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));
        //参阅第一篇基础知识的介绍,来自谷歌的库,settableFuture,可设置结果的完成
        SettableFuture<Response> future = SettableFuture.create();
        ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
        //将future放到pendingRequests中②
        pendingRequests.putIfAbsent(requestId, responseMetaData);
        log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
        //将消息发送给消息队列topic是tb_transport.api.requests
        requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
            @Override
            public void onSuccess(TbQueueMsgMetadata metadata) {
                log.trace("[{}] Request sent: {}", requestId, metadata);
            }
    
            @Override
            public void onFailure(Throwable t) {
                pendingRequests.remove(requestId);
                future.setException(t);
            }
        });
        return future;
    }
    
    1. 根据第三篇TbCoreTransportApiService的分析,我们发现DefaultTbQueueResponseTemplate的成员变量requestTemplateconsumer刚好是订阅的tb_transport.api.requests的消息:
    ......
    requests.forEach(request -> {
        long currentTime = System.currentTimeMillis();
        long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME));
        if (requestTime + requestTimeout >= currentTime) {
            byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER);
            if (requestIdHeader == null) {
                log.error("[{}] Missing requestId in header", request);
                return;
            }
           	//获取response的topic,可以做到消息从哪来,处理好以后回哪里去,此时的topic是tb_transport.api.responses.localHostName
            byte[] responseTopicHeader = request.getHeaders().get(RESPONSE_TOPIC_HEADER);
            if (responseTopicHeader == null) {
                log.error("[{}] Missing response topic in header", request);
                return;
            }
            UUID requestId = bytesToUuid(requestIdHeader);
            String responseTopic = bytesToString(responseTopicHeader);
            try {
                pendingRequestCount.getAndIncrement();
                //调用handler进行处理消息
                AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),
                        response -> {
                            pendingRequestCount.decrementAndGet();
                            response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
                            //handler.hande处理的结果返回给发送方topic是tb_transport.api.responses.localHostName
                            responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);
                        },
                        e -> {
                            pendingRequestCount.decrementAndGet();
                            if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
                                log.warn("[{}] Timeout to process the request: {}", requestId, request, e);
                            } else {
                                log.trace("[{}] Failed to process the request: {}", requestId, request, e);
                            }
                        },
                        requestTimeout,
                        timeoutExecutor,
                        callbackExecutor);
              .......
    
    1. 具体验证逻辑:
    @Override
    public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) {
        TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue();
        // protobuf构造的类中判定是否包含需要验证的信息块
        if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
            ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
            //调用validateCredentials,具体内容就是查询deviceInfo,并将结果交由第二个Function进行进一步处理
            return Futures.transform(validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
        } 
      ......
    
    1. 当通过设备的acess token找到了deviceInfo,便会通过消息中间件将DeviceInfo发出来,topic是tb_transport.api.responses.localHostName,在第三篇的分析中,DefaultTransportServicetransportApiRequestTemplate即订阅此topic:
    List<Response> responses = responseTemplate.poll(pollInterval);
    if (responses.size() > 0) {
        log.trace("Polling responses completed, consumer records count [{}]", responses.size());
    } else {
        continue;
    }
    responses.forEach(response -> {
        byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
        UUID requestId;
        if (requestIdHeader == null) {
            log.error("[{}] Missing requestId in header and body", response);
        } else {
            requestId = bytesToUuid(requestIdHeader);
            log.trace("[{}] Response received: {}", requestId, response);
            //参见上②,将验证的future放入到pendingRequests中,现在通过设置的requestId取出来
            ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
            if (expectedResponse == null) {
                log.trace("[{}] Invalid or stale request", requestId);
            } else {
                //设置settableFuture的结果
                expectedResponse.future.set(response);
            }
        }
    ......
    
    1. DefaultTransportServiceprocess异步请求获得了返回的结果,此时调用onSuccess回调,即调用MqttTransportHandleronValidateDeviceResponse
    private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) {
        if (!msg.hasDeviceInfo()) {
            ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
            ctx.close();
        } else {
            deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
            sessionInfo = SessionInfoProto.newBuilder()
                    .setNodeId(context.getNodeId())
                    .setSessionIdMSB(sessionId.getMostSignificantBits())
                    .setSessionIdLSB(sessionId.getLeastSignificantBits())
                    .setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB())
                    .setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
                    .setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
                    .setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
                    .setDeviceName(msg.getDeviceInfo().getDeviceName())
                    .setDeviceType(msg.getDeviceInfo().getDeviceType())
                    .build();
            //创建SessionEvent.OPEN的消息,调用sendToDeviceActor方法,包含sessionInfo
            transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {
               .......
    
    1. sendToDeviceActor的实现:
    protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
        //创建tpi,此时会选择一个固定的partition Id,组成的topic是tb_core, fullTopicName是tb_core.(int) 如: tb_core.1
        TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo));
    ......
        //使用tbCoreMsgProducer发送到消息队列,设置了toDeviceActorMsg
        tbCoreMsgProducer.send(tpi,
                new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
                        ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?
                        new TransportTbQueueCallback(callback) : null);
    }
    
    1. 此时第二篇基于DefaultTbCoreConsumerService可以知道DefaultTbCoreConsumerService 的消费者订阅该主题的消息:
    try {
        ToCoreMsg toCoreMsg = msg.getValue();
        if (toCoreMsg.hasToSubscriptionMgrMsg()) {
            log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());
            forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
        } else if (toCoreMsg.hasToDeviceActorMsg()) {
            log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
            //交由此方法进行处理
            forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
        }
    
    1. forwardToDeviceActor对消息的处理

      private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbCallback callback) {
          if (statsEnabled) {
              stats.log(toDeviceActorMsg);
          }
          //创建type为TRANSPORT_TO_DEVICE_ACTOR_MSG的消息,并交给AppActor处理
          actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback));
      }
      
    2. 通过第四篇的总结3,我们可以直接去看AppActordoProcess方法对此类型消息的处理,跟踪发现AppActor将消息转给了TenantActor, TenantActor创建了DeviceActor,并将消息转给了DeviceActor;

    3. DeviceActor拿到此类型的消息,进行了如下的处理:

      protected boolean doProcess(TbActorMsg msg) {
          switch (msg.getMsgType()) {
              case TRANSPORT_TO_DEVICE_ACTOR_MSG:
                  //包装成TransportToDeviceActorMsgWrapper交由processor处理,并继续调用processSessionStateMsgs
                  processor.process(ctx, (TransportToDeviceActorMsgWrapper) msg);
                  break;
              case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
      
    4. processSessionStateMsgs的处理:

      private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
          UUID sessionId = getSessionId(sessionInfo);
          if (msg.getEvent() == SessionEvent.OPEN) {
           .....
              sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId())));
              if (sessions.size() == 1) {
                 // 将调用pushRuleEngineMessage(stateData, CONNECT_EVENT);
                  reportSessionOpen();
              }
              //将调用pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
              systemContext.getDeviceStateService().onDeviceActivity(deviceId, System.currentTimeMillis());
              dumpSessions();
          }
      ....
      
    5. 由于CONNECT_EVENTACTIVITY_EVENT仅仅类型不同,以下暂时只分析CONNECT_EVENT

      public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) {
          if (tenantId.isNullUid()) {
              if (entityId.getEntityType().equals(EntityType.TENANT)) {
                  tenantId = new TenantId(entityId.getId());
              } else {
                  log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg);
                  return;
              }
          }
          //和第7点类似,创建的tpi的fullTopicName的例子 tb_rule_engine.main.1
          TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
          log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
          ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
                  .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
                  .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
                  .setTbMsg(TbMsg.toByteString(tbMsg)).build();
          producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback);
          toRuleEngineMsgs.incrementAndGet();
      }
      
    6. 通过第二篇的分析DefaultTbRuleEngineConsumerService订阅了此topic: tb_rule_engine.main.1的消息,收到消息以后,调用forwardToRuleEngineActor方法,包裹成QUEUE_TO_RULE_ENGINE_MSG类型的消息,交由AppActor进行分发处理;

    7. AppActor交给TenantActor处理,TenantActor交给RootRuleChain处理,RuleChainActor交给firstRuleNode处理,也就是某一个RuleNodeActor;

    8. 打开前端RULE CHAINS的界面,会发现,MESSAGE TYPE SWITCH是接收input的第一个节点,其实数据库的配置中,rule_chain表中配置的first_rule_node_id就是TbMsgTypeSwitchNode

    9. 进入TbMsgTypeSwitchNodeonMsg方法(实际上所有的ruleNode处理消息的方法都是onMsg),发现根据messageType(此时是CONNECT_EVENT)定义了relationtype并调用ctx.tellNext(msg, relationType);

    10. 此时DefaultTbContext创建一个RuleNodeToRuleChainTellNextMsg,类型是RULE_TO_RULE_CHAIN_TELL_NEXT_MSG,交给RuleChainActor处理;

    11. 接下来将会进入到RuleChainActorMessageProcessoronTellNext方法:

      private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) {
          try {
              checkActive(msg);
              //消息来源
              EntityId entityId = msg.getOriginator();
              //创建一个tpi,可能会使用
              TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
             //查询有关系的RuleNode,其实就是从relation表中查询,该消息来源的id,relation_type和在TbMsgTypeSwitchNode定义的relationType一直的节点id,如上Connect Event就没有找到相应的relation的RuleNodeId
              List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream()
                      .filter(r -> contains(relationTypes, r.getType()))
                      .collect(Collectors.toList());
              int relationsCount = relations.size();
             //Connect Event就没有找到相应的relation的RuleNodeId,消息通过规则引擎,已经处理完成
              if (relationsCount == 0) {
                  log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId());
                  if (relationTypes.contains(TbRelationTypes.FAILURE)) {
                      RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId);
                      if (ruleNodeCtx != null) {
                          msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf()));
                      } else {
                          log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId());
                          msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]"));
                      }
                  } else {
                      msg.getCallback().onSuccess();
                  }
               //举例:Post telemetry的type可以找到相应的ruleNode,实现类是:TbMsgTimeseriesNode,那么此消息将会交给TbMsgTimeseriesNode处理
              } else if (relationsCount == 1) {
                  for (RuleNodeRelation relation : relations) {
                      log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut());
                      pushToTarget(tpi, msg, relation.getOut(), relation.getType());
                  }
              } else {
                  MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback());
                  log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations);
                  for (RuleNodeRelation relation : relations) {
                      EntityId target = relation.getOut();
                      putToQueue(tpi, msg, callbackWrapper, target);
                  }
              }
          } catch (RuleNodeException rne) {
              msg.getCallback().onFailure(rne);
          } catch (Exception e) {
              msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage()));
          }
      }
      

      What's more:

      如上面的举例,比如是遥测数据Post telemetry,将会使用TbMsgTimeseriesNodeonMsg做进一步的处理,比如存储数据,再通过webSocket进行数据的更新如果有webSocket的session的话,或者其他通知消息,就不详细展开了。

    总结:

    1. 处理MQTT的连接其实就是走完了整个规则引擎的逻辑,其他类型的消息,比如遥测数据,属性更新,RPC请求发送与接收,大体流程大同小异;

    2. 在处理消息流向的时候,我们一定要清楚其订阅或者发布的主题是什么,这样我们才不会丢失方向;

    3. Actor的模型就是根据消息的类型,使用AppActor进行一步步的分发,最终交由合适的RuleNode进行处理;

    4. Protobuf类型的消息容易序列化传输与解析,所以在thingsboard中大量使用,但是生成的类可读性不是很高,可以选择直接读queue.proto文件,对类有感性的认知。

      ​ 由于作者水平有限,只是梳理了大致的流程,文章难免出现纰漏,望谅解并指正。

  • 相关阅读:
    ConvertUtils的理解
    mysql存储过程 详细注释
    线程方法
    集合的方法
    StringStringBufferStringBuilder
    Java基础知识点1
    Java基础知识点
    索引+sql优化
    Oracle数据库02
    Oracle数据库01
  • 原文地址:https://www.cnblogs.com/iotschool/p/13758357.html
Copyright © 2011-2022 走看看