zoukankan      html  css  js  c++  java
  • ActiveMQ的使用以及应用场景

    两种消息模式

    消息列队有两种消息模式,一种是点对点的消息模式,还有一种就是订阅的模式.;下面来说说这两种模式。

    1、点对点的消息模式

    点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息

    2、订阅模式

    订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么ActiveMQ将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是,发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到

    什么情况下使用ActiveMQ

    紧耦合应用系统存在许多问题,但是,要将紧耦合系统重构成松耦合系统是一件值得但比较繁琐的事情。使用松耦合的主要优势体现在将同步改为异步。使用异步通信,应用程序将从接收者反馈的等待中解放出来,其他的任务可以得到执行,这样提高了应用程序的效率。

    只要是两个应用程序间需要通信的情况,都可以考虑使用JMS,不论这种通信是在本地的(就是通信的两个应用程序在同一台主机上),还是分布在不同机器上。尽管是在同一个主机上的两个应用程序需要通信也可以使用ActiveMQ。ActiveMQ可以确保消息投递成功并采用异步方式通信。

    多个需要通信的应用程序在同一个机器上的情况下,您可以考虑在执行机上独立运行ActiveMQ或者将ActiveMQ嵌入到Java应用服务中。无论采用哪种方式,都可以确保应用程序能够发送和接收消息。您可以选择订阅模式(pub/sub)或者采用PTP(point to point)模式,这两种模式都无需等待执行反馈信息。每一个应用程序都可以简单的将消息发送给ActiveMQ,然后继续做其他的工作;应用程序无需阻塞式等待消息的返回。

    对于分布在多台主机上的应用程序来说,可以使用多种布置策略。主要包括单一ActiveMQ实例和多ActiveMQ实例。单一ActiveMQ实例是一个简单解决方案。所有的应用程序都向同一个ActiveMQ中介发送和接收消息,这与上面提到的单机多服务雷同。单一的ActiveMQ可以布置到一台单独的主机上,也可以和其中的一些服务布置在一起。重要的是,所有的应用必须能够直接与ActiveMQ中介进行交互,所以,你必须考虑到你的网络设计。

    第二种情况比较复杂,但是有ActiveMQ来负责远程通信,而不是应用程序自身。在这种场景下,每一个应用程序都会实例化一个ActiveMQ(无论是嵌入式的还是独立式的),应用程序从其本地的ActiveMQ发送和接收消息。之后这些ActiveMQ实例将会以一种联合的方式协同工作。消息将会基于每一个应用的要求在多个ActiveMQ中介间传递到远程的处理者。在ActiveMQ中,这种模式被称为netWork of brokers。采用这种模式对于处理大量的ActiveMQ消息是可行的,但是,我们往往需要减轻网络拓扑的复杂性,这样直接将消息投递到远程接收者的ActiveMQ是不可行的。在后一种情况下,不同的协议使用可以使ActiveMQ更轻松的传递消息。

    ActiveMQ配置传输连接

    ActiveMQ提供了广泛的连接模式,包括HTTP/S、JGroups、JXTA、muticast、SSL、TCP、UDP、XMPP等。提供了如此多的连接模式表明了ActiveMQ具有较高的灵活性。

    配置格式如下:

     1  <transportConnectors>
     2 
     3             <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
     4 
     5             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
     6 
     7             <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
     8             <transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
     9             <transportConnector name="stomp" uri="stomp://0.0.0.0:61618?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
    10             <transportConnector name="xmpp" uri="xmpp://0.0.0.0:61619?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600"/>
    11         </transportConnectors>
    View Code

    生产者和消费着可以使用不同的传输协议来传输信息。比如生产者用nio协议生产消息,消费者用tcp协议接收消息。

    ActiveMQ配置网络连接

    当应用到Broker的集群时,Borker与Broker的通信就用到了网络连接。

    配置格式如下:

    1 <networkConnectors>
    2 <!-- 动态连接方式
    3 <networkConnector name="default-nc" uri="multicast://default"
    4                    dynamicOnly="true" networkTTL="3" prefetchSize="1" decreaseNetworkConsumerPriority="true"
    5          /> -->
    6          <!-- 静态连接方式 <networkConnector name="host1 and host2" uri="static://(tcp://host1:61616,tcp://host2:61616)"/> -->
    7 </networkConnectors>
    View Code

     ActiveMQ持久化存储模式

    ActiveMq主要实现了如下几种存储:

    1.4.1.   AMQ消息存储—默认的消息存储

    它是一种基于文件存储的消息数据库并且不依赖第三方数据库。配置如下

    <amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/>
    

      

    1.4.2.   KahaDB 消息存储—提供容量的提升和恢复能力

      它是一种新的消息存储机制,配置如下

    <kahaDB directory="${activemq.data}/kahadb" />
    

    1.4.3.   JDBC 消息存储—消息基于JDBC存储

    1 <persistenceAdapter>
    2     <jdbcPersistenceAdapter dataSource="#mysql-ds" />
    3 </persistenceAdapter>
    4 <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    5     <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    6     <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true" />
    7     <property name="username" value="activemq" /> <property name="password" value="activemq" />
    8     <property name="maxActive" value="200" /> <property name="poolPreparedStatements" value="true" />
    9 </bean>
    View Code

    1.4.4.   Memory 消息存储—基于内容的消息存储

      ActiveMQ支持将消息保存到内存中,这种情况没有动态的缓存存在。

      这种情况的配置很简单,只要将Broker的“prsistent” 属性设置为“false”即可。

    ActiveMQ拦截器使用

    在ActiveMQ中使用拦截器和过滤器的使用多采用插件的形式实现,继承BrokerFilter实现BrokerPlugin接口类。BrokerFilter实质一个实现Broker接口的类。

    日志拦截

    日志拦截器是Broker的一个拦截器,默认的日志级别为INFO。你如你想改变日志的级别。这个日志拦截器支持Commons-log和Log4j两种日志。

    <plugins>
          <loggingBrokerPlugin logAll="true" logConnectionEvents="false"/>
    </plugins>
    

      部分参数如下:

    属性名称 默认值 描述
    logAll false 记录所有事件的日志
    logMessageEvents false 记录消息事件日志
    logConnectionEvents true 记录连接事件日志
    logTransactionEvents false 记录消息事务事件日志
    logConsumerEvents false 记录消息消费者事件日志
    logProducerEvents false 记录消息生产者事件日志
    logInternalEvents false  

    ActiveMQ安全配置

    ActiveMQ也可以对各个主题和队列设置用户名和密码,配置如下:

     1 <plugins>
     2     <!-- Configure authentication; Username, passwords and groups -->
     3     <simpleAuthenticationPlugin>
     4         <users>
     5             <authenticationUser username="system" password="manager" groups="users,admins" />
     6             <authenticationUser username="user" password="password" groups="users" />
     7             <authenticationUser username="guest" password="password" groups="guests" />
     8             <authenticationUser username="testUser" password="123456" groups="testGroup" />
     9         </users>
    10     </simpleAuthenticationPlugin>
    11     <!--  Lets configure a destination based authorization mechanism -->
    12     <authorizationPlugin>
    13         <map>
    14             <authorizationMap>
    15                 <authorizationEntries>
    16                     <authorizationEntry queue="queue.group.uum" read="users" write="users" admin="users" />
    17                     <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
    18                     <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
    19                     <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
    20                     <authorizationEntry queue="TEST.Q" read="guests" write="guests" />
    21                     <authorizationEntry queue="test" read="testGroup" write="testGroup" />
    22                     <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
    23                     <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
    24                     <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
    25                     <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users ,testGroup" write="guests,users ,testGroup " admin="guests,users ,testGroup " />
    26                 </authorizationEntries>
    27             </authorizationMap>
    28         </map>
    29     </authorizationPlugin>
    30 </plugins>
    View Code

    ActiveMQ Async Sends

    Acivemq 支持异步和同步发送消息。在 ActiveMQ4.0 以上,所有的异步或同步对

    于 Consumer 来说是变得可配置了。默认是在 ConnectionFactory、Connection、

    Connection URI等方面配置对于一个基于 Destination  的Consumer来说。

    众所周之,如果你想传递给 Slow Consumer 那么你可能使用异步的消息传递,但是对于 Fast Consumer 你可能使用同步发送消息。(这样可以避免同步和上下文切换额外的增加Queue 堵塞花费。如果对于一个 Slow Consumer,你使用同步发送消息可能出现Producer 堵塞等显现。

    ActiveMQ默认设置 dispatcheAsync=true是最好的性能设置。如果你处理的是

    Slow Consumer 则使用 dispatcheAsync=true,反之,那你使用的是 Fast Consumer则使用dispatcheAsync=false。

    用Connection URI 来配置Async如下:

    ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

    用ConnectionFactory 配置Async如下:

    ((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

    用Connection 配置Async 如下:

    ((ActiveMQConnection)connection).setUseAsyncSend(true);
  • 相关阅读:
    js_css_dl.dt实现列表展开、折叠效果
    property_自己编写一个读取Property文件的Util类
    HttpClient_002_中文乱码、HttpClient中文乱码透析、总结
    HttpClient_001_初步实现项目01的servlet,与项目02的servlet,之间数据访问
    jsp:中文乱码解决
    linux命令
    js 监测from表单中的input和select,时时监测,没有输入或选择信息报错,不允许提交数据
    数据库的那些事
    待参考
    layer.open多次触发,遮罩层覆盖content的解决办法
  • 原文地址:https://www.cnblogs.com/bangguo/p/13092979.html
Copyright © 2011-2022 走看看