zoukankan      html  css  js  c++  java
  • ActiveMQ基础教程(二):安装与配置(单机与集群)

      因为本文会用到集群介绍,因此准备了三台虚拟机(当然读者也可以使用一个虚拟机,然后使用不同的端口来模拟实现伪集群):

        192.168.209.133 test1
        192.168.209.134 test2
        192.168.209.135 test3

      因为ActiveMQ是java编写,因此需要java的运行环境,这个不做介绍,网上有一堆的教程。

      其次,下载ActiveMQ包,官网下载地址:https://archive.apache.org/dist/activemq/ ,读者可以选择一个版本下载使用,比如,当前最新版是5.16.1,下载地址:https://archive.apache.org/dist/activemq/5.16.1/

      

      官网下载速度比较慢,可能需要一两个小时,有百度网盘会员的可以移步百度网盘:https://pan.baidu.com/s/1-ToyzCqo_ypk7FXcjgI3yg (提取码: jcd7 )

        

      单机安装与配置

      首先将tar包传到linux上去(我这里使用的是Ubuntu16.04),然后在tar包所在目录进行解压:  

       # -C 表示解压出来的文件保存目录,默认是tar包所在目录,这里我选择的是/opt目录,注意修改权限,如果你是root用户,则不需要修改
      sudo tar -zxf apache-activemq-5.16.1-bin.tar.gz -C /opt

       其实我们下载好的tar包是编译打包好的,只需解压就可以启动了:  

       # 使用后台线程启动
      sudo ./bin/activemq start
      # 使用控制台方式启动,这种方式会造成当前shell阻塞,如果想使用服务单元或者supervisor这样的工具做守护进程,那么应该采用这种启动方式
      sudo ./bin/activemq console
      # 停止应用
      sudo ./bin/activemq stop
      # 重启应用
      sudo ./bin/activemq restart

      第一次建议采用控制台console的形式启动,看时候会报错,报错则会打印异常信息,方便我们排查,启动后大概是这样的:

      

      注意输出的信息,可以看到:  

      1、ActiveMQ版本是5.16.1,使用KahaDB做持久化,版本7
      2、openwire方式连接启动,监听:tcp://test1:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600,还有ampq、stomp、mqtt、ws等分别在不同端口启动监听(注,这里的test1是在hosts中添加的,看我上面提供的三个虚拟机)
      3、Jetty服务启动,其实就是启动ActiveMQ的管理后台
      4、ActiveMQ管理后台地址:http://127.0.0.1:8161/,ActiveMQ的Jolokia API地址:http://127.0.0.1:8161/api/jolokia/,Jolokia API主要是通过API接口获取ActiveMQ的状态信息,方便第三方程序进行监控

      主要到,上面服务启动后,管理后台启动在127.0.0.1:8161,这表明我们只能在本地访问,虚拟机以外的主机是访问不了的,所以我需要修改相关配置。

      配置文件在 conf 目录下,我们常用到的配置文件就是activemq.xml以及jetty.xml,里面全是一些 java bean(有点少见的东西)。activemq.xml主要是ActiveMQ相关配置,而jetty.xml主要是管理后台的一些配置,接下来介绍一些常用的几个配置:

      1、修改Connector

      打开conf/activemq.xml,找到 transportConnectors 节点配置,注释掉多余的连接协议,只留自己需要的就行了,比如我一般用openwire,那我就将其它的注释掉。

      说明一下,为什么要这么做,主要是没必要,也可以避免端口冲突,比如,rabbitmq默认采用ampq监听5672端口,这时你会发现activemq启动不了,因为端口冲突。  

      <transportConnectors>
         <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
         <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <!--<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
         <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
      </transportConnectors>

      需要注意的是,连接端口也是在上面的uri中去配置。

      2、查看持久化方式配置

      在conf/activemq.xml找到 persistenceAdapter 节点,这时消息持久化模式配置,可以看到默认的方式是kahaDB,数据目录是data/kahadb  

      <persistenceAdapter>
          <kahaDB directory="${activemq.data}/kahadb"/>
      </persistenceAdapter>

      这个节点只是说明一下,因为后续介绍ActiveMQ集群模式就是修改这个节点kahaDB为replicatedLevelDB进行配置,所以这个暂时可以不用动,当然如果你愿意,可以修改成AMQ、JDBC等其他方式。

      3、死信队列配置

      ActiveMQ提供了众多的策略,这里以死信队列策略配置为例来介绍。

      在conf/activemq.xml找到 destinationPolicy 节点,这里配置的是一些队列和topic的策略,当然包括死信队列了,默认策略配置如下:

        

       说明:
       policyEntry 节点是配置入口
      topic=">" 表示对所有的topic生效,我们也可以增加一个policyEntry节点,用途queue属性来表示对队列生效,如下文介绍
      pendingMessageLimitStrategy  消息限制策略,只对Topic且非持久化订阅者有效,用于当通道中有大量的消息积压时,broker可以保留的消息量,这是为了防止Topic中有慢速消费者,导致整个通道消息积压
      constantPendingMessageLimitStrategy   保留固定条数的消息,如果消息量超过了限制,将使用消息剔除策略移除消息

      ActiveMQ死信队列配置策略主要有两种:共享的死信策略(SharedDeadLetterStrategy)、单独的死信策略(IndividualDeadLetterStrategy) 

      共享的死信策略(SharedDeadLetterStrategy)   

      共享的死信策略(SharedDeadLetterStrategy)表示将所有的死信消息保存在一个共享队列中,这是ActiveMQ的默认策略
      常用配置属性:
      deadLetterQueue:共享队列名称,默认的队列名为 ActiveMQ.DLQ
      processExpired:表示是否将过期消息放入死信队列,默认为true
      processNonPersistent:表示是否将“非持久化”消息放入死信队列,默认为false

      需要注意的是,之前共享的死信策略配置可以是这样子:  

        <deadLetterStrategy>
             <sharedDeadLetterStrategy deadLetterQueue="DLQ.Queue"/>
        </deadLetterStrategy>

      但是不知道从什么时候起,这样配置会抛出异常:  

        Caused by: java.lang.IllegalStateException: Cannot convert value of type 'java.lang.String' to required type 'org.apache.activemq.command.ActiveMQDestination' for property 'deadLetterQueue': no matching editors or conversion strategy found
    	at org.springframework.beans.TypeConverterDelegate.convertIfNecessary(TypeConverterDelegate.java:307)
    	at org.springframework.beans.AbstractNestablePropertyAccessor.convertIfNecessary(AbstractNestablePropertyAccessor.java:588)
    	... 52 more

      但是我们可以使用bean去配置,如:  

       <bean id="deadLetterQueue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg name="name" value="DLQ.Queue"/>
       </bean>
      ......
       <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry queue=">" >
                      <deadLetterStrategy>
                        <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy">
                           <property name="deadLetterQueue" ref="deadLetterQueue" />
                        </bean>
                      </deadLetterStrategy>
                    </policyEntry>
    </policyEntries>
    </policyMap>
    </destinationPolicy>
          ....
      </broker>

      上面的配置中,定义了一个id为deadLetterQueue的bean,然后在策略中,queue=">" 表示对所有队列生效,如果想对topic生效,改成 topic=“>” 即可,而策略时一个bean,class指向SharedDeadLetterStrategy,它的属性deadLetterQueue指向我们前面定义的那个bean的id

      其实,上面的配置就是说,对于所有的队列的死信消息,通通发到一个名为DQL.Queue的队列中去。

      单独的死信策略(IndividualDeadLetterStrategy)  

      单独的死信策略(IndividualDeadLetterStrategy)表示把死信消息放入各自的死信通道中,区分的条件时使用不同的前缀,死信通道可以是队列,也可以是topic
      常用配置属性:
      queuePrefix:当死信队列是queue时使用的前缀,默认是 ActiveMQ.DLQ.Queue.
      topicPrefix:当死信队列是topic是使用的前缀,默认是 ActiveMQ.DLQ.Topic.
      useQueueForTopicMessages:表示是否将Topic的死信消息保存在Queue中,默认为true,表示Topic的死信消息保存至Queue中
      processExpired:表示是否将过期消息放入死信队列,默认为true
      processNonPersistent:表示是否将“非持久化”消息放入死信队列,默认为false

      例如:    

            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry queue=">" >
                      <deadLetterStrategy>
                        <individualDeadLetterStrategy queuePrefix="queue.DLQ." useQueueForQueueMessages="false"/>
                      </deadLetterStrategy>
                    </policyEntry>
                    <policyEntry topic=">" >
                       <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                       </pendingMessageLimitStrategy>
                       <deadLetterStrategy>
                        <individualDeadLetterStrategy topicPrefix="topic.DLQ." useQueueForQueueMessages="true"/>
                       </deadLetterStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>

      上面的例子中,queue=">" 表示对所有队列启用策略,即对每个的队列的死信消息,保存到一个以queue.DLQ.[队列名]的topic中,比如一个名为demo的队列对应的死信通道是一个名称是queue.DLQ.demo的topic。

      同理,topic=">"表示对所有topic生效,表示将每个topic的死信消息保存在一个已topic.DLQ.[topic名]的队列中,比如一个名称为demo的topic对应的死信通道是一个名称为topic.DLQ.demo的队列

      4、管理后台启动配置

       前面说到默认情况下启动时,ActiveMQ管理后台绑定到127.0.01:8161,这表示我们只能在本地访问,这肯定是不能接受的,我们需要修改这个地址。

      打开conf/jetty.xml,找到id="jettyPort"的bean,

       

       修改这里的host成0.0.0.0,端口随意,也可以使用默认的8161:  

        <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
                 <!-- the default port number for the web console -->
            <property name="host" value="0.0.0.0"/>
            <property name="port" value="8161"/>
        </bean>

      保存后启动ActiveMQ,然后就可以在浏览器上访问管理后台了,比如我部署的服务器IP是192.168.209.133,那么就访问http://192.168.209.133:8161,接着会要求输入账号和密码,默认都是admin(至于怎么添加用户及管理权限,这就不在本文范围内了,有空再单独用博文介绍),登录后进入欢迎也,然后在点击Manage ActiveMQ broker进入管理后台:

      

      展示的管理界面如下,至于相关页面的操作信息等等,就不做介绍了:

      

      集群部署及配置

      说起ActiveMQ的集群,其实它是一种主从架构,是一种和redis集群中的主从模式很像,就是集群只有主节点提供服务,子节点只是备用,当主节点挂了,就会选用一个子节点作为主节点继续提供服务,如:

      1、正常情况下是主节点提供服务,子节点作为备用只从主节点同步数据

      

      2、当主节点挂了,就会重新选择一个子节点作为主节点重新提供服务

      

      3、如果之后主节点又起来了,此时它将变作为子节点留作备用

      

      根据ActiveMQ官网的介绍,ActiveMQ集群有三种部署方式:基于共享文件系统的主从架构(文件系统如:SAN)、基于数据库的主从架构,基于Zookeeper的主从架构(http://activemq.apache.org/masterslave.html

      这里介绍基于Zookeeper的主从架构。

      准备部署

      1、因为是基于Zookeeper的主从架构,因此我们需要一个Zookeeper集群服务,可以参考博文:Zookeeper基础教程(二):Zookeeper安装

      我这里采用集群地址就是 192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181 (其实就是在当前准备部署ActiveMQ的三台虚拟机上部署的Zookeeper集群)

      2、安装上面介绍的单机部署的方式,先在三个节点都安装部署完成ActiveMQ

      3、接着就到最重要的环节了。

      打开conf/activemq.xml,修改 persistenceAdapter 节点中的持久化方式为 replicatedLevelDB ,如:

      test1节点    

            <persistenceAdapter>
                <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
                <replicatedLevelDB
                   directory="${activemq.data}/leveldb"
                   replicas="3"
                   bind="tcp://0.0.0.0:61619"
                   zkAddress="192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181"
                   zkPath="/activemq"
                   hostname="test1"
                 />
            </persistenceAdapter>

      test2节点

            <persistenceAdapter>
                <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
                <replicatedLevelDB
                   directory="${activemq.data}/leveldb"
                   replicas="3"
                   bind="tcp://0.0.0.0:61619"
                   zkAddress="192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181"
                   zkPath="/activemq"
                   hostname="test2"
                 />
            </persistenceAdapter>

      test3节点

            <persistenceAdapter>
                <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
                <replicatedLevelDB
                   directory="${activemq.data}/leveldb"
                   replicas="3"
                   bind="tcp://0.0.0.0:61619"
                   zkAddress="192.168.209.133:2181,192.168.209.134:2181,192.168.209.135:2181"
                   zkPath="/activemq"
                   hostname="test3"
                 />
            </persistenceAdapter>

      说明一下:  

        directory:表示的是当前节点数据文件保存的目录,不存在就会自动创建,默认值是 LevelDB,注意,这个目录是根据ActiveMQ的根目录来设置,比如上面的配置,是将数据文件保存在ActiveMQ的根目录下的 data/leveldb 目录下
        replicas:集群节点数,因为采用Zookeeper,要求这个值至少为3
       bind:集群通信地址端口配置,就是说如果当前节点变成了master节点,然后slave节点会根据这个地址端口进行数据同步。
       zkAddress:Zookeeper集群地址,默认值是 127.0.0.1:2181
       zkPath:根ZNode节点配置,你总不希望Zookeeper只为一个ActiveMQ集群服务吧?默认值是 /default
       zkPassword:如果Zookeeper有认证,那这个就是认证信息,比如我这里没有,就可以不用配置
       hostname:集群内节点通信时采用的hostname,可以认为就是节点的名字,每个节点应该设置成不一样的hostname,建议采用IP作为hostname的值,如果不设置,ActiveMQ将会自动创建一个hostname

      注意,集群要求每个节点对Zookeeper的配置都一样,比如我这里,要求每个节点的 replicas、zkAddress、zkPath 配置值是一样的。replicatedLevelDB节点更多的配置可以参考官网介绍:https://activemq.apache.org/replicated-leveldb-store

      在启动Zookeeper之后,在每个节点ActiveMQ的根目录下使用下面的命令启动ActiveMQ:  

       # 这里采用console启动,因为它会输出日志,如果报错方便我们处理,不报错的话,可以在使用start后台启动: sudo ./bin/activemq start
      sudo ./bin/activemq console

      启动之后,查看Zookeeper,在上面配置的zkPath节点下会有每个broker节点的配置:

      

      管理后台

      集群启动之后,我们可以查看管理后台,但是需要注意的是,ActiveMQ集群是master-slave模式,只有master节点对外提供服务,slave节点只是备份,因此,如果要访问master节点的管理后台。

      比如上图,我这边test3被选为master,因此我访问的是:http://192.168.209.135:8161

       

      那么你问,如果master节点挂了呢?那就对不起了,原master节点的管理后台地址就访问不了了,你必须访问新的master节点的管理后台才行,如果嫌麻烦,可以自行使用nginx做处理。

      另外,如果集群部署启动报错:java.io.IOException: com/google/common/util/concurrent/internal/InternalFutureFailureAccess

      可以参考我这里的解决方法:关于ActiveMQ+Zookeeper做集群时,解决启动报错:java.io.IOException: com/google/common/util/concurrent/internal/InternalFutureFailureAccess

      结语

      ActiveMQ安装好了,我们也可以使用程序访问试试,提示一下,单机部署的ActiveMQ和集群部署的ActiveMQ在代码中的访问地址形式可能不一样,比如我用的C#使用 Apache.NMS.ActiveMQ 第三方包访问ActiveMQ:

      单机部署时访问地址是:  

        string brokerUri = "activemq:tcp://192.168.209.133:61616";
        NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

      集群部署时访问地址是:  

        string brokerUri = "activemq:failover:(tcp://192.168.209.133:61616,tcp://192.168.209.134:61616,tcp://192.168.209.135:61616)";
        NMSConnectionFactory factory = new NMSConnectionFactory(brokerUri);

      注意:集群部署时访问地址中的端口是conf/activemq.xml中transportConnectors节点下配置的端口,不是replicatedLevelDB节点bind属性配置的节点,bind属性配置的节点是如果当前节点是master时,它与其它slave的通信端口。

      另外,知道怎么部署了,我们还可以试下ActiveMQ众多的策略配置及用法,还可以尝试基于共享文件系统的主从架构(文件系统如:SAN)、基于数据库的主从架构搭建集群,想更深入点的话可以看看集群的负载均衡配置原理。

      ActiveMQ的安装部署就介绍到这里吧,主要是还有安装启动过程中还有几个报错,等之后再写吧。

      

    一个专注于.NetCore的技术小白
  • 相关阅读:
    关于UITapGestureRecognizer的一些问题
    自定义导航栏返回按钮
    mousewheel滚轮事件 浏览器的写法
    Apache日志不记录图片文件设置方法和来源日志的配置
    redis持久化配置
    haproxy实现负载均衡
    Keepalived高可用软件的安装与配置
    Sublime text2用户自定义配置
    判断各版本的最简写法
    Ubuntu防火墙 UFW 设置
  • 原文地址:https://www.cnblogs.com/shanfeng1000/p/14331748.html
Copyright © 2011-2022 走看看