zoukankan      html  css  js  c++  java
  • ActiveMQ in Action(2)

    关键字: activemq

    2.2 Transport
        ActiveMQ目前支持的transport有:VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等。以下简单介绍其中的几种,更多请参考Apache官方文档。

     

    2.2.1 VM Transport
        VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连接不是socket连接,而是直接地方法调用。 第一个创建VM 连接的客户会启动一个embed VM broker,接下来所有使用相同的broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭的时候,这个broker也会自动关闭。
        以下是配置语法:

       vm://brokerName?transportOptions

       例如:vm://broker1?marshal=false&broker.persistent=false

       Transport Options的可选值如下:

    Option Name Default Value Description
    Marshal false If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat
    wireFormat default The name of the WireFormat to use
    wireFormat.*   All the properties with this prefix are used to configure the wireFormat
    create true If the broker should be created on demand if it does not allready exist. Only supported in ActiveMQ 4.1
    broker.*   All the properties with this prefix are used to configure the broker. See Configuring Wire Formats for more information

     

       以下是高级配置语法:

       vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions

       vm:broker:(tcp://localhost)?brokerOptions

        例如:vm:(broker:(tcp://localhost:6000)?persistent=false)?marshal=false

        Transport Options的可选值如下:

    Option Name Default Value Description
    marshal false If true, forces each command sent over the transport to be marshlled and unmarshlled using a WireFormat
    wireFormat default The name of the WireFormat to use
    wireFormat.*   All the properties with this prefix are used to configure the wireFormat

     

       使用配置文件的配置语法:   
        vm://localhost?brokerConfig=xbean:activemq.xml
        例如:vm:// localhost?brokerConfig=xbean:com/test/activemq.xml

     

       使用Spring的配置:

    Xml代码 复制代码
    1. <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">  
    2.   <property name="config" value="classpath:org/apache/activemq/xbean/activemq.xml" />  
    3.   <property name="start" value="true" />  
    4. </bean>  
    5.   
    6. <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" depends-on="broker">  
    7.   <property name="brokerURL" value="vm://localhost"/>  
    8. </bean>  
       如果persistent是true,那么ActiveMQ会在当前目录下创建一个缺省值是activemq-data的目录用于持久化保存数据。需要注意的是,如果程序中启动了多个不同名字的VM broker,那么可能会有如下警告:Failed to start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException…可以通过在transportOptions中追加broker.useJmx=false来禁用JMX来避免这个警告。

     

    2.2.2 TCP Transport
        TCP transport 允许客户端通过TCP socket连接到远程的broker。以下是配置语法:
        tcp://hostname:port?transportOptions
        Transport Options的可选值如下:

    Option Name Default Value Description
    minmumWireFormatVersion 0 The minimum version wireformat that is allowed
    trace false Causes all commands that are sent over the transport to be logged
    useLocalHost true When true, it causes the local machines name to resolve to "localhost".
    socketBufferSize 64 * 1024 Sets the socket buffer size in bytes
    soTimeout 0 sets the socket timeout in milliseconds
    connectionTimeout 30000 A non-zero value specifies the connection timeout in milliseconds. A zero value means wait forever for the connection to be established. Negative values are ignored.
    wireFormat default The name of the WireFormat to use
    wireFormat.*   All the properties with this prefix are used to configure the wireFormat. See Configuring Wire Formats for more information

       例如:tcp://localhost:61616?trace=false

     

    2.2.3 Failover Transport
        Failover Transport是一种重新连接的机制,它工作于其它transport的上层,用于建立可靠的传输。它的配置语法允许制定任意多个复合的URI。Failover transport会自动选择其中的一个URI来尝试建立连接。如果没有成功,那么会选择一个其它的URI来建立一个新的连接。以下是配置语法:
        failover:(uri1,...,uriN)?transportOptions
        failover:uri1,...,uriN
        Transport Options的可选值如下:

    Option Name Default Value Description
    initialReconnectDelay 10 How long to wait before the first reconnect attempt (in ms)
    maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts (in ms)
    useExponentialBackOff true Should an exponential backoff be used between reconnect attempts
    backOffMultiplier 2 The exponent used in the exponential backoff attempts
    maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client
    randomize true use a random algorithm to choose the URI to use for reconnect from the list provided
    backup false initialize and hold a second transport connection - to enable fast failover

       例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100

     

    2.2.4 Discovery transport
        Discovery transport是可靠的tranport。它使用Discovery transport来定位用来连接的URI列表。以下是配置语法:
        discovery:(discoveryAgentURI)?transportOptions
        discovery:discoveryAgentURI
        Transport Options的可选值如下:

    Option Name Default Value Description
    initialReconnectDelay 10 How long to wait before the first reconnect attempt
    maxReconnectDelay 30000 The maximum amount of time we ever wait between reconnect attempts
    useExponentialBackOff true Should an exponential backoff be used btween reconnect attempts
    backOffMultiplier 2 The exponent used in the exponential backoff attempts
    maxReconnectAttempts 0 If not 0, then this is the maximum number of reconnect attempts before an error is sent back to the client

       例如:discovery:(multicast://default)?initialReconnectDelay=100   
        为了使用Discovery来发现broker,需要为broker启用discovery agent。 以下是XML配置文件中的一个例子:

    Xml代码 复制代码
    1. <broker name="foo">  
    2.    <transportConnectors>  
    3.       <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/>  
    4.     </transportConnectors>  
    5.     ...   
    6. </broker>  
       在使用Failover Transport或Discovery transport等能够自动重连的transport的时候,需要注意的是:设想有两个broker,它们都启用AMQ Message Store作为持久化存储,有一个producer和一个consumer连接到某个queue。当因其中一个broker失效时而切换到另一个broker的时候,如果失效的broker的queue中还有未被consumer消费的消息,那么这个queue里的消息仍然滞留在失效broker的中,直到失效的broker被修复并重新切换回这个被修复的broker后,之前被保留的消息才会被consumer消费掉。如果被处理的消息有时序限制,那么应用程序就需要处理这个问题。另外也可以通过ActiveMQ集群来解决这个问题。

       在transport重连的时候,可以在connection上注册TransportListener来获得回调,例如:

    Java代码 复制代码
    1. (ActiveMQConnection)connection).addTransportListener(new TransportListener() {   
    2.     public void onCommand(Object cmd) {   
    3.     }   
    4.   
    5.     public void onException(IOException exp) {   
    6.     }   
    7.   
    8.     public void transportInterupted() {   
    9.         // The transport has suffered an interruption from which it hopes to recover.  
    10.     }   
    11.   
    12.     public void transportResumed() {   
    13.         // The transport has resumed after an interruption.   
    14.     }   
    15. });  
  • 相关阅读:
    eclipse、idea安装lombok插件
    ContextLoaderListener加载过程
    web.xml 文件中一般包括 servlet, spring, filter, listenr的配置的加载顺序
    springboot
    root cause org.apache.ibatis.ognl.OgnlException: source is null for getProperty(null, "XXX")
    Java MyBatis 插入数据库返回主键
    Mybatis异常There is no getter for property named 'XXX' in 'class com.xxx.xxx.UserAccountDTO
    web项目,ftl文件中的路径引入问题
    mybatis No enum const class org.apache.ibatis.type.JdbcType.Integer
    Restful、Jersey和JAX-RS
  • 原文地址:https://www.cnblogs.com/ainima/p/6331704.html
Copyright © 2011-2022 走看看