两种消息模式
消息列队有两种消息模式,一种是点对点的消息模式,还有一种就是订阅的模式.;下面来说说这两种模式。
1、点对点的消息模式
点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息
2、订阅模式
订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么ActiveMQ将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是,发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到
什么情况下使用ActiveMQ
紧耦合应用系统存在许多问题,但是,要将紧耦合系统重构成松耦合系统是一件值得但比较繁琐的事情。使用松耦合的主要优势体现在将同步改为异步。使用异步通信,应用程序将从接收者反馈的等待中解放出来,其他的任务可以得到执行,这样提高了应用程序的效率。
只要是两个应用程序间需要通信的情况,都可以考虑使用JMS,不论这种通信是在本地的(就是通信的两个应用程序在同一台主机上),还是分布在不同机器上。尽管是在同一个主机上的两个应用程序需要通信也可以使用ActiveMQ。ActiveMQ可以确保消息投递成功并采用异步方式通信。
多个需要通信的应用程序在同一个机器上的情况下,您可以考虑在执行机上独立运行ActiveMQ或者将ActiveMQ嵌入到Java应用服务中。无论采用哪种方式,都可以确保应用程序能够发送和接收消息。您可以选择订阅模式(pub/sub)或者采用PTP(point to point)模式,这两种模式都无需等待执行反馈信息。每一个应用程序都可以简单的将消息发送给ActiveMQ,然后继续做其他的工作;应用程序无需阻塞式等待消息的返回。
对于分布在多台主机上的应用程序来说,可以使用多种布置策略。主要包括单一ActiveMQ实例和多ActiveMQ实例。单一ActiveMQ实例是一个简单解决方案。所有的应用程序都向同一个ActiveMQ中介发送和接收消息,这与上面提到的单机多服务雷同。单一的ActiveMQ可以布置到一台单独的主机上,也可以和其中的一些服务布置在一起。重要的是,所有的应用必须能够直接与ActiveMQ中介进行交互,所以,你必须考虑到你的网络设计。
第二种情况比较复杂,但是有ActiveMQ来负责远程通信,而不是应用程序自身。在这种场景下,每一个应用程序都会实例化一个ActiveMQ(无论是嵌入式的还是独立式的),应用程序从其本地的ActiveMQ发送和接收消息。之后这些ActiveMQ实例将会以一种联合的方式协同工作。消息将会基于每一个应用的要求在多个ActiveMQ中介间传递到远程的处理者。在ActiveMQ中,这种模式被称为netWork of brokers。采用这种模式对于处理大量的ActiveMQ消息是可行的,但是,我们往往需要减轻网络拓扑的复杂性,这样直接将消息投递到远程接收者的ActiveMQ是不可行的。在后一种情况下,不同的协议使用可以使ActiveMQ更轻松的传递消息。
ActiveMQ配置传输连接
ActiveMQ提供了广泛的连接模式,包括HTTP/S、JGroups、JXTA、muticast、SSL、TCP、UDP、XMPP等。提供了如此多的连接模式表明了ActiveMQ具有较高的灵活性。
配置格式如下:
1 <transportConnectors> 2 3 <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 4 5 <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> 6 7 <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> 8 <transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> 9 <transportConnector name="stomp" uri="stomp://0.0.0.0:61618?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> 10 <transportConnector name="xmpp" uri="xmpp://0.0.0.0:61619?maximumConnections=1000&wireformat.maxFrameSize=104857600"/> 11 </transportConnectors>
生产者和消费着可以使用不同的传输协议来传输信息。比如生产者用nio协议生产消息,消费者用tcp协议接收消息。
ActiveMQ配置网络连接
当应用到Broker的集群时,Borker与Broker的通信就用到了网络连接。
配置格式如下:
1 <networkConnectors> 2 <!-- 动态连接方式 3 <networkConnector name="default-nc" uri="multicast://default" 4 dynamicOnly="true" networkTTL="3" prefetchSize="1" decreaseNetworkConsumerPriority="true" 5 /> --> 6 <!-- 静态连接方式 <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/> --> 7 </networkConnectors>
ActiveMQ持久化存储模式
ActiveMq主要实现了如下几种存储:
1.4.1. AMQ消息存储—默认的消息存储
它是一种基于文件存储的消息数据库并且不依赖第三方数据库。配置如下
<amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/>
1.4.2. KahaDB 消息存储—提供容量的提升和恢复能力
它是一种新的消息存储机制,配置如下
<kahaDB directory="${activemq.data}/kahadb" />
1.4.3. JDBC 消息存储—消息基于JDBC存储
1 <persistenceAdapter> 2 <jdbcPersistenceAdapter dataSource="#mysql-ds" /> 3 </persistenceAdapter> 4 <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> 5 <property name="driverClassName" value="com.mysql.jdbc.Driver" /> 6 <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true" /> 7 <property name="username" value="activemq" /> <property name="password" value="activemq" /> 8 <property name="maxActive" value="200" /> <property name="poolPreparedStatements" value="true" /> 9 </bean>
1.4.4. Memory 消息存储—基于内容的消息存储
ActiveMQ支持将消息保存到内存中,这种情况没有动态的缓存存在。
这种情况的配置很简单,只要将Broker的“prsistent” 属性设置为“false”即可。
ActiveMQ拦截器使用
在ActiveMQ中使用拦截器和过滤器的使用多采用插件的形式实现,继承BrokerFilter实现BrokerPlugin接口类。BrokerFilter实质一个实现Broker接口的类。
日志拦截
日志拦截器是Broker的一个拦截器,默认的日志级别为INFO。你如你想改变日志的级别。这个日志拦截器支持Commons-log和Log4j两种日志。
<plugins> <loggingBrokerPlugin logAll="true" logConnectionEvents="false"/> </plugins>
部分参数如下:
属性名称 | 默认值 | 描述 |
logAll | false | 记录所有事件的日志 |
logMessageEvents | false | 记录消息事件日志 |
logConnectionEvents | true | 记录连接事件日志 |
logTransactionEvents | false | 记录消息事务事件日志 |
logConsumerEvents | false | 记录消息消费者事件日志 |
logProducerEvents | false | 记录消息生产者事件日志 |
logInternalEvents | false |
ActiveMQ安全配置
ActiveMQ也可以对各个主题和队列设置用户名和密码,配置如下:
1 <plugins> 2 <!-- Configure authentication; Username, passwords and groups --> 3 <simpleAuthenticationPlugin> 4 <users> 5 <authenticationUser username="system" password="manager" groups="users,admins" /> 6 <authenticationUser username="user" password="password" groups="users" /> 7 <authenticationUser username="guest" password="password" groups="guests" /> 8 <authenticationUser username="testUser" password="123456" groups="testGroup" /> 9 </users> 10 </simpleAuthenticationPlugin> 11 <!-- Lets configure a destination based authorization mechanism --> 12 <authorizationPlugin> 13 <map> 14 <authorizationMap> 15 <authorizationEntries> 16 <authorizationEntry queue="queue.group.uum" read="users" write="users" admin="users" /> 17 <authorizationEntry queue=">" read="admins" write="admins" admin="admins" /> 18 <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" /> 19 <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" /> 20 <authorizationEntry queue="TEST.Q" read="guests" write="guests" /> 21 <authorizationEntry queue="test" read="testGroup" write="testGroup" /> 22 <authorizationEntry topic=">" read="admins" write="admins" admin="admins" /> 23 <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" /> 24 <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" /> 25 <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users ,testGroup" write="guests,users ,testGroup " admin="guests,users ,testGroup " /> 26 </authorizationEntries> 27 </authorizationMap> 28 </map> 29 </authorizationPlugin> 30 </plugins>
ActiveMQ Async Sends
Acivemq 支持异步和同步发送消息。在 ActiveMQ4.0 以上,所有的异步或同步对
于 Consumer 来说是变得可配置了。默认是在 ConnectionFactory、Connection、
Connection URI等方面配置对于一个基于 Destination 的Consumer来说。
众所周之,如果你想传递给 Slow Consumer 那么你可能使用异步的消息传递,但是对于 Fast Consumer 你可能使用同步发送消息。(这样可以避免同步和上下文切换额外的增加Queue 堵塞花费。如果对于一个 Slow Consumer,你使用同步发送消息可能出现Producer 堵塞等显现。
ActiveMQ默认设置 dispatcheAsync=true是最好的性能设置。如果你处理的是
Slow Consumer 则使用 dispatcheAsync=true,反之,那你使用的是 Fast Consumer则使用dispatcheAsync=false。
用Connection URI 来配置Async如下:
ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
用ConnectionFactory 配置Async如下:
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
用Connection 配置Async 如下:
((ActiveMQConnection)connection).setUseAsyncSend(true);