zoukankan      html  css  js  c++  java
  • 跟踪mqttv3源码(二)

    对于spring-mqtt.xml中的标签:

    <int-mqtt:message-driven-channel-adapter>

    <int-mqtt:outbound-channel-adapter>

    <int:channel>

    <int:transformer>

    <int:service-activator>

    <int:outbound-channel-adapter>

    了解Spring自定义标签的应该都知道标签的构建过程:

    1、首先我们进入spring-integration-mqtt-4.3.5 --->META-INF路径查看spring.handlers

    http://www.springframework.org/schema/integration/mqtt=org.springframework.integration.mqtt.config.xml.MqttNamespaceHandler

    得知命名处理类MqttNamespaceHandler,进入config/xml :

    public class MqttNamespaceHandler extends AbstractIntegrationNamespaceHandler {
    
        @Override
        public void init() {
            this.registerBeanDefinitionParser("message-driven-channel-adapter",  new MqttMessageDrivenChannelAdapterParser());
            this.registerBeanDefinitionParser("outbound-channel-adapter", new MqttOutboundChannelAdapterParser());
        }
    
    }
    public class MqttMessageDrivenChannelAdapterParser extends AbstractChannelAdapterParser {
    
    
        @Override
        protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) {
         //包装类MqttPahoMessageDrivenChannelAdapter
            BeanDefinitionBuilder builder = BeanDefinitionBuilder
                    .genericBeanDefinition(MqttPahoMessageDrivenChannelAdapter.class);
        
         //用类MqttParserUtils设置builder属性:url、client-id、client-factory、converter、send-timeout MqttParserUtils.parseCommon(element, builder, parserContext);
         //解析spring-mqtt.xml中配置的属性topics、qos、recovery-interval builder.addConstructorArgValue(element.getAttribute(
    "topics")); builder.addPropertyReference("outputChannel", channelName); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "qos"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "recovery-interval"); return builder.getBeanDefinition(); } }

    查看MqttPahoMessageDrivenChannelAdapter:builder.addPropertyReference("outputChannel", channelName);

    跟踪channelName,进入AbstractChannelAdapterParser.parseInternal()可以看到channelName来源于element;

    同理进入IntegrationNamespaceHandler,找到registerBeanDefinitionParser("service-activator", new ServiceActivatorParser());跟踪AbstractDelegatingConsumerEndpointParser-->AbstractConsumerEndpointParser:

    protected String getInputChannelAttributeName() {
            return "input-channel";
    }
    
    String inputChannelAttributeName = this.getInputChannelAttributeName();
    String inputChannelName = element.getAttribute(inputChannelAttributeName);
    builder.addPropertyValue("inputChannelName", inputChannelName);

    然后查看spring.tooling就可以得知标签int-mqtt的来源:

    # Tooling related information for the integration mqtt namespace
    http://www.springframework.org/schema/integration/mqttadapter@name=integration mqtt Namespace
    http://www.springframework.org/schema/integration/mqttadapter@prefix=int-mqtt
    http://www.springframework.org/schema/integration/mqttadapter@icon=org/springframework/integration/config/xml/spring-integration-mqtt.gif

    跟踪进spring-integration-4.3.xsd,可以看到标签<xsd:element service-activator>包含inputOutputChannelGroup:

        <xsd:element name="service-activator">
            <xsd:annotation>
                <xsd:documentation>
                    Defines an endpoint for the 'org.springframework.integration.handler.ServiceActivatingHandler'
                    for exposing any bean reference as a service that
                    receives request Messages
                    from an 'input-channel' and may send reply
                    Messages to an 'output-channel'. The 'ref' may point to an instance
                    that
                    has either a single public method or a method with the
                    @ServiceActivator annotation. Otherwise, the 'method'
                    attribute
                    should be provided along with 'ref'.
                </xsd:documentation>
            </xsd:annotation>
            <xsd:complexType>
                <xsd:complexContent>
                    <xsd:extension base="serviceActivatorType">
                        <xsd:attributeGroup ref="inputOutputChannelGroup" />
                    </xsd:extension>
                </xsd:complexContent>
            </xsd:complexType>
        </xsd:element>

    查看标签<xsd:attributeGroup name="inputOutputChannelGroup">可以看到包含output-channel、input-channel、send-timeout、order:

    <xsd:attributeGroup name="inputOutputChannelGroup">
            <xsd:attribute name="output-channel" type="xsd:string">
                <xsd:annotation>
                    <xsd:appinfo>
                        <tool:annotation kind="ref">
                            <tool:expected-type type="org.springframework.messaging.MessageChannel" />
                        </tool:annotation>
                    </xsd:appinfo>
                    <xsd:documentation>
                    Identifies the Message channel where Message will be sent after it's being processed by this endpoint
                    </xsd:documentation>
                </xsd:annotation>
            </xsd:attribute>
            <xsd:attribute name="send-timeout" type="xsd:string">
                <xsd:annotation>
                    <xsd:documentation>
                        Specify the maximum amount of time in milliseconds to wait when sending a reply
                        Message to the output channel. Defaults to '-1' - blocking indefinitely.
                        It is applied only if the output channel has some 'sending' limitations, e.g. QueueChannel with
                        fixed a 'capacity'. In this case a MessageDeliveryException is thrown. The 'send-timeout'
                        is ignored in case of AbstractSubscribableChannel implementations.
                    </xsd:documentation>
                </xsd:annotation>
            </xsd:attribute>
            <xsd:attribute name="input-channel" type="xsd:string">
                <xsd:annotation>
                    <xsd:appinfo>
                        <tool:annotation kind="ref">
                            <tool:expected-type type="org.springframework.messaging.MessageChannel" />
                        </tool:annotation>
                    </xsd:appinfo>
                    <xsd:documentation>
                    The receiving Message channel of this endpoint
                    </xsd:documentation>
                </xsd:annotation>
            </xsd:attribute>
            <xsd:attribute name="order" type="xsd:string">
                <xsd:annotation>
                    <xsd:documentation><![CDATA[
    Specifies the order for invocation when this endpoint is connected as a
    subscriber to a channel. This is particularly relevant when that channel
    is using a "failover" dispatching strategy. It has no effect when this
    endpoint itself is a Polling Consumer for a channel with a queue.
                        ]]></xsd:documentation>
                </xsd:annotation>
            </xsd:attribute>
            <xsd:attributeGroup ref="smartLifeCycleAttributeGroup"/>
        </xsd:attributeGroup>

     可以看到配置文件中:org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory用来配置通信基础信息

    将DefaultMqttPahoClientFactory注入到类org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler,类中messageArrived()方法用来获取消息

  • 相关阅读:
    为什么人们普遍选择城市而非农村
    风物长宜放眼量-创业潮比雾霾消散的要快
    一眼看请考研的目的-本质上的第二次高考
    京都城门考
    翻译的很好的一篇android mediaplayer
    Android MediaProvider数据库模式
    android 多媒体数据库详解
    android usb挂载分析---vold处理内核消息
    android usb挂载分析
    android usb挂载分析---MountService启动
  • 原文地址:https://www.cnblogs.com/wangwanchao/p/7517589.html
Copyright © 2011-2022 走看看