zoukankan      html  css  js  c++  java
  • ActiveMQ 介绍安装使用入门

    安装和入门
     

    官网地址:http://activemq.apache.org/components/classic/documentation

    特性列表

    1. 面向消息的中间件(Message-oridented middleware MON)
    2. 支持多种语言(Java,C,C++,C#,Ruby,Python)
    3. 支持多种协议(HTTP,TCP,SSL,NIO,UDP)
      • OpenWire协议,Stomp,AMQPv1.0,MQTTv3.1(IoT环境)
      • in-VM,多播,JGroups,JXTA
    4. Spring的支持
    5. 支持异步通信.
    6. 一个支持JMS1.1和J2EE1.4的JMS Provider实现(瞬态,持久化,XA消息和事务),内嵌的JMS Provider进行单元测试,支持JMS客户端和Message Broker的企业集成模式。
    7. 高性能的journal使JDBC高速持久化
    8. 高级特性:Message Groups,Virtual Destinations,Wildcards和Composite Destinations
    9. 高性能集群,客户端-服务器模式,点对点通信的设计
    10. 支持REST API 提供技术无关,语言无关的web API
    11. 支持Ajax
    12. 支持CXF和Axis

    安装

    OSX

    • 使用homebrew安装,执行命令:brew install apache-activemq
    • 安装目录:/usr/local/Cellar/activemq/5.15.9

    Centos

    • 先安装firefox: yum -y install firefox
    • 去activemq官网下载安装包,并解压到指定目录

    使用

    启动和运行

    • 安装目录下的bin启动./activemq start,使用brew可以直接启动:activemq start
      启动日志输出:加载的配置路径,使用的jdk路径,pid路径等

    • activemq监控控制台:http://127.0.0.1:8161/admin,用户名密码都默认是admin

    • osx输出的activemq日志路径为:/usr/local/Cellar/activemq/5.15.9/libexec/data,日志文件activemq.log
    • 默认监听端口:61616
      • 查看方式windows:netstat -an | find “61616”`
      • linux:netstat -an | grep 61616

    配置

    • activemq的默认配置:conf/ , 详细信息
      配置列表配图:

    生产者-消费者模式

    • 运行JMS Broker : ./activemq console
    • 运行生产者,消费者./activemq producer./activemq consumer

    生产者

    • 发送自定义文本:./activemq producer --message "my message" --messageCount 1
    • 发送自定义长度消息:./activemq producer --messageSize 100 --messageCount 1
    • 发送文本消息,从url获取:./activemq producer --payloadUrl http://fubin.org.cn --messageCount 1

    • 生产消息日志

      //连接监听端口61616
      2019-05-12 14:39:15,656 | INFO  | Connecting to URL: failover://tcp://localhost:61616 as user: null | org.apache.activemq.console.command.ProducerCommand | main
      //生产消息到TEST队列
      2019-05-12 14:39:15,657 | INFO  | Producing messages to queue://TEST | org.apache.activemq.console.command.ProducerCommand | main
      //使用持久化消息
      2019-05-12 14:39:15,657 | INFO  | Using persistent messages | org.apache.activemq.console.command.ProducerCommand | main
      2019-05-12 14:39:15,657 | INFO  | Sleeping between sends 0 ms | org.apache.activemq.console.command.ProducerCommand | main
      2019-05-12 14:39:15,657 | INFO  | Running 1 parallel threads | org.apache.activemq.console.command.ProducerCommand | main
      //成功连接61616
      2019-05-12 14:39:15,872 | INFO  | Successfully connected to tcp://localhost:61616 | org.apache.activemq.transport.failover.FailoverTransport | ActiveMQ Task-1
      2019-05-12 14:39:15,905 | INFO  | producer-1 Started to calculate elapsed time ...
       | org.apache.activemq.util.ProducerThread | producer-1
       //生产一条消息
      2019-05-12 14:39:15,913 | INFO  | producer-1 Produced: 1 messages | org.apache.activemq.util.ProducerThread | producer-1
      2019-05-12 14:39:15,914 | INFO  | producer-1 Elapsed time in second : 0 s | org.apache.activemq.util.ProducerThread | producer-1
      2019-05-12 14:39:15,914 | INFO  | producer-1 Elapsed time in milli second : 9 milli seconds | org.apache.activemq.util.ProducerThread | producer-1

    消费者

    • 事务模式下消费消息:./activemq consumer --transacted true
    • 使用client acknowledgment模式:./activemq consumer --ackMode CLIENT_ACKNOWLEDGE
    • 使用持久主题订阅者:./activemq consumer --durable true --clientId example --destination topic ://TEST

    使用JMX远程监控ActiveMQ

    本地的消息队列使用控制台就可以,远程的activemq我们可以使用java自带的jconsole进行远程监控activemq。

    • 修改配置 : activemq.xml

       broker: useJmx="true"
       <managementContext createConnector="true"/>
    • 打开jconsole:jconsole

    • 连接远程activemq:

    • jmx控制台账号密码配置:conf/jmx.access ,conf/jmx.password
      • 只读:monitorRole readnoly
      • 读写:controlRole readwrite
     
     

     
     

    activemq.xml详情

    <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
        <!--这个配置文件中允许我们使用系统配置作为变量-->
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations">
                <value>file:${activemq.conf}/credentials.properties</value>
            </property>
        </bean>
    
       <!-- 允许访问服务端日志
       -->
        <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
              lazy-init="false" scope="singleton"
              init-method="start" destroy-method="stop">
        </bean>
    
        <!--
            <broker>元素用来配置ActiveMQ代理服务器
        -->
        <broker xmlns="http://activemq.apache.org/schema/core" useJmx="true"  brokerName="localhost" dataDirectory="${activemq.data}">
    
            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry topic=">" >
                        <!-- constantPendingMessageLimitStrategy用来防止慢主题消费者阻塞生产者和影响其他消费者,通过限制保留的消息条数
                            更多信息请看:http://activemq.apache.org/slow-consumer-handling.html
                        -->
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>
    
    
            <!--
                managementContext用来配置ActiveMQ默认怎样暴露给JMX,ActiveMQ使用JVM启动的MBean服务,更多信息请看:
                http://activemq.apache.org/jmx.html
            -->
            <managementContext>
                <managementContext createConnector="true"/>
            </managementContext>
    
            <!--
                为broker配置消息持久化。默认持久化积滞是使用KahaDB存储的。由KahaDB标签识别,更多信息请看:
                http://activemq.apache.org/persistence.html
            -->
            <persistenceAdapter>
                <kahaDB directory="${activemq.data}/kahadb"/>
            </persistenceAdapter>
    
    
              <!--
                systemUsage标签控制broker占用的最大空间量,在禁用缓存或减慢生产者之前使用
                更多信息请看:http://activemq.apache.org/producer-flow-control.html
              -->
              <systemUsage>
                <systemUsage>
                    <memoryUsage>
                        <memoryUsage percentOfJvmHeap="70" />
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="100 gb"/>
                    </storeUsage>
                    <tempUsage>
                        <tempUsage limit="50 gb"/>
                    </tempUsage>
                </systemUsage>
            </systemUsage>
    
            <!--
                传输连接器通过给定的协议公开客户端和其他brokers
                详细信息请看:http://activemq.apache.org/configuring-transports.html
            -->
            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            </transportConnectors>
    
            <!-- 
            在停止时销毁spring上下文来关闭jetty -->
            <shutdownHooks>
                <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
            </shutdownHooks>
    
        </broker>
    
        <!--
            开启web控制台,REST和ajax API和例子,web控制台要求默认方式登录,你可以在jetty,xml文件中禁用它
            在${ACTIVEMQ_HOME}/conf/jetty.xml查看更详细的信息
        -->
        <import resource="jetty.xml"/>
    </beans>
     
     

     

    ActiveMQ:使用Java实现生产者和消费者

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.ExceptionListener;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer; 
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    /**
     * Hello world!
     */
    public class App {
    
        public static void main(String[] args) throws Exception {
            thread(new HelloWorldProducer(), false);
            thread(new HelloWorldProducer(), false);
            thread(new HelloWorldConsumer(), false);
            Thread.sleep(1000);
            thread(new HelloWorldConsumer(), false);
            thread(new HelloWorldProducer(), false);
            thread(new HelloWorldConsumer(), false);
            thread(new HelloWorldProducer(), false);
            Thread.sleep(1000);
            thread(new HelloWorldConsumer(), false);
            thread(new HelloWorldProducer(), false);
            thread(new HelloWorldConsumer(), false);
            thread(new HelloWorldConsumer(), false);
            thread(new HelloWorldProducer(), false);
            thread(new HelloWorldProducer(), false);
            Thread.sleep(1000);
            thread(new HelloWorldProducer(), false);
            thread(new HelloWorldConsumer(), false);
            thread(new HelloWorldConsumer(), false);
            thread(new HelloWorldProducer(), false);
            thread(new HelloWorldConsumer(), false);
            thread(new HelloWorldProducer(), false);
            thread(new HelloWorldConsumer(), false);
            thread(new HelloWorldProducer(), false);
            thread(new HelloWorldConsumer(), false);
            thread(new HelloWorldConsumer(), false);
            thread(new HelloWorldProducer(), false);
        }
    
        public static void thread(Runnable runnable, boolean daemon) {
            Thread brokerThread = new Thread(runnable);
            brokerThread.setDaemon(daemon);
            brokerThread.start();
        }
    
        public static class HelloWorldProducer implements Runnable {
            public void run() {
                try {
                    // 创建一个连接工厂
                    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
                    // 创建一个连接
                    Connection connection = connectionFactory.createConnection();
                    connection.start();
                    // 创建一个会话
                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE;
                    // 创建一个目标:主题或队列
                    Destination destination = session.createQueue("TEST.FOO");
                    // 基于会话创建一个消息生产者给主题或队列
                    MessageProducer producer = session.createProducer(destination);
                    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                    // 创建一个消息
                    String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
                    TextMessage message = session.createTextMessage(text);
                    // 告诉生产者发送消息
                    System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
                    producer.send(message);
                    // 关闭会话和连接
                    session.close();
                    connection.close();
                }
                catch (Exception e) {
                    System.out.println("Caught: " + e);
                    e.printStackTrace();
                }
            }
        }
    
        public static class HelloWorldConsumer implements Runnable, ExceptionListener {
            public void run() {
                try {
    
                    // 创建一个连接工厂
                    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
                    // 创建一个连接
                    Connection connection = connectionFactory.createConnection();
                    connection.start();
                    connection.setExceptionListener(this);
                    // 创建一个会话
                    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE;
                    // 创建一个目标:主题或队列
                    Destination destination = session.createQueue("TEST.FOO");
                    // 基于会话创建一个消息消费者给主题或队列
                    MessageConsumer consumer = session.createConsumer(destination);
                    // 等待一个消息
                    Message message = consumer.receive(1000);
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        String text = textMessage.getText();
                        System.out.println("Received: " + text);
                    } else {
                        System.out.println("Received: " + message);
                    }
                    consumer.close();
                    session.close();
                    connection.close();
                } catch (Exception e) {
                    System.out.println("Caught: " + e);
                    e.printStackTrace();
                }
            }
    
            public synchronized void onException(JMSException ex) {
                System.out.println("JMS 异常.  Shutting down client.");
            }
        }
    }
     
  • 相关阅读:
    Openlayers 3 热力图
    javaScript 新学习:Array.contains 函数
    将页面内容转为Excel下载
    Cookie 的设置和获取
    escape()、encodeURI()、encodeURIComponent()区别详解
    java 对象与二进制互转
    获取与当前类同级目录下的文件
    Windows下比较小巧的c/c++ ide
    保存到properties
    javafx 普通弹框提示
  • 原文地址:https://www.cnblogs.com/fubinhnust/p/11967693.html
Copyright © 2011-2022 走看看