zoukankan      html  css  js  c++  java
  • ActiveMQ的环境搭建及使用

    一:环境搭建

    ActiveMQ官网下载mq在windows上的安装包:http://activemq.apache.org/,解压到某个磁盘下。

    运行要环境条件:jdk安装1.8,(本人这里安装版本)。

    到达解压目录,进入inwin64(测试机器是x64,如果你的机器是x86,请进入inwin32),双击运行“confactivemq.bat”。

    此时我这里抛出了一个异常:

    jvm 1    |  WARN | Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: 
      Error creating bean with name 'org.apache.activemq.xbean.XBeanBrokerService#0' defined in class path resource [activemq.xml]: Invocation of init method failed;
      nested exception is java.net.URISyntaxException: Illegal character in hostname atindex 7: ws://PC_DX_20151117:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600 jvm 1 | ERROR: java.lang.RuntimeException: Failed to execute start task. Reason: java.lang.IllegalStateException:
      BeanFactory not initialized or already closed - call 'refresh' before accessing beans via the ApplicationContext

    从上边错误信息可以得知这里是发布ws服务是失败,解决方案:修改activemq.xml文件:

            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <!--注释掉抛出错误的默认的配置:错误原因,计算机系统原因不支持发布ws服务。-->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <!--<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>-->
            </transportConnectors>

    备注:这里是把发布ws服务给注释掉了。

    之后,重新启动,启动成功。

    然后:http://127.0.0.1:8161/admin/ 验证服务启动情况,用户名/密码 admin/admin。界面如下:

    ActiveMQ 点对点消息实现 - 直接receive方式

    新建一个java project工程,引入activemq安装包根目录的activemq-all-5.15.3.jar包。

     消息生产者:

    package com.activemq.producer;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class JMSProducer {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
        private static final String BROKEURL = "tcp://127.0.0.1:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
        private static final int SENDNUM = 10; // 发送的消息数量
    
        public static void main(String[] args) {
    
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageProducer messageProducer; // 消息生产者
    
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
    
            try {
                connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
                destination = session.createQueue("FirstQueue1"); // 创建消息队列
                messageProducer = session.createProducer(destination); // 创建消息生产者
                sendMessage(session, messageProducer); // 发送消息
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 发送消息
         */
        public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
            for (int i = 0; i < JMSProducer.SENDNUM; i++) {
                TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i);
                System.out.println("发送消息:" + "ActiveMQ 发送的消息" + i);
                messageProducer.send(message);
            }
        }
    }

     消息消费者:

    package com.activemq.consumer.receive;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息消费者
     */
    public class JMSConsumer {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
        private static final String BROKEURL = "tcp://127.0.0.1:61616"; // ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageConsumer messageConsumer; // 消息的消费者
    
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
    
            try {
                connection = connectionFactory.createConnection();  // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
                destination = session.createQueue("FirstQueue1");  // 创建连接的消息队列
                messageConsumer = session.createConsumer(destination); // 创建消息消费者
                while (true) {
                    TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                    if (textMessage != null) {
                        System.out.println("收到的消息:" + textMessage.getText());
                    } else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    ActiveMQ 点对点消息实现 - 使用Listener监听方式 

     生产者代码同上.

     监听器类:

    package com.activemq.consumer.listener;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * 消息监听
     */
    public class Listener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            // TODO Auto-generated method stub
            try {
                System.out.println("收到的消息:" + ((TextMessage) message).getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

     消息消费者中注册监听器:

    package com.activemq.consumer.listener;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息消费者
     *
     * @author Administrator
     */
    public class JMSConsumer {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
        private static final String BROKEURL = "tcp://127.0.0.1:61616";//ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageConsumer messageConsumer; // 消息的消费者
    
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
    
            try {
                connection = connectionFactory.createConnection();  // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
                destination = session.createQueue("FirstQueue1");  // 创建连接的消息队列
                messageConsumer = session.createConsumer(destination); // 创建消息消费者
                messageConsumer.setMessageListener(new Listener()); // 注册消息监听
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    ActiveMQ 点对点消息实现 - ActiveMQ 发布-订阅消息模式实现

    一个生产者发布对应多个消费者订阅。
    消息的生产者:
    package com.activemqmulti.producer;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息生产者-消息发布者*/
    public class JMSProducer {
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
        private static final String BROKEURL ="tcp://127.0.0.1:61616"; //  ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
        private static final int SENDNUM = 10; // 发送的消息数量
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageProducer messageProducer; // 消息生产者
    
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
    
            try {
                connection = connectionFactory.createConnection(); // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
                // destination=session.createQueue("FirstQueue1");
                destination = session.createTopic("FirstTopic1"); // 创建消息队列
                messageProducer = session.createProducer(destination); // 创建消息生产者
                sendMessage(session, messageProducer); // 发送消息
                session.commit();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 发送消息*/
        public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
            for (int i = 0; i < JMSProducer.SENDNUM; i++) {
                TextMessage message = session.createTextMessage("ActiveMQ 发送的消息" + i);
                System.out.println("发送消息:" + "ActiveMQ 发布的消息" + i);
                messageProducer.send(message);
            }
        }
    }

    消息的监听器一:

    package com.activemqmulti.consumer;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * 消息监听-订阅者一
     */
    public class Listener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            // TODO Auto-generated method stub
            try {
                System.out.println("订阅者一收到的消息:" + ((TextMessage) message).getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    消息的监听器二:

    package com.activemqmulti.consumer;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * 消息监听-订阅者二
     */
    public class Listener2 implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            // TODO Auto-generated method stub
            try {
                System.out.println("订阅者二收到的消息:" + ((TextMessage) message).getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

    消息的订阅者一:

    package com.activemqmulti.consumer;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息消费者-消息订阅者一
     */
    public class JMSConsumer {
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
        private static final String BROKEURL ="tcp://127.0.0.1:61616"; //  ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageConsumer messageConsumer; // 消息的消费者
    
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
    
            try {
                connection = connectionFactory.createConnection();  // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
                // destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
                destination = session.createTopic("FirstTopic1");
                messageConsumer = session.createConsumer(destination); // 创建消息消费者
                messageConsumer.setMessageListener(new Listener()); // 注册消息监听
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    消息的订阅者二:

    package com.activemqmulti.consumer;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息消费者-消息订阅者二
     */
    public class JMSConsumer2 {
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
        private static final String BROKEURL = "tcp://127.0.0.1:61616"; // ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory; // 连接工厂
            Connection connection = null; // 连接
            Session session; // 会话 接受或者发送消息的线程
            Destination destination; // 消息的目的地
            MessageConsumer messageConsumer; // 消息的消费者
    
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
    
            try {
                connection = connectionFactory.createConnection();  // 通过连接工厂获取连接
                connection.start(); // 启动连接
                session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
                // destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
                destination = session.createTopic("FirstTopic1");
                messageConsumer = session.createConsumer(destination); // 创建消息消费者
                messageConsumer.setMessageListener(new Listener2()); // 注册消息监听
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
     
  • 相关阅读:
    几种比较好看的颜色代码
    浅谈Express的put与del
    关于Linux软连接
    logstash根据日志关键词报警
    linux历史命令审计
    showdoc升级问题,showdoc错误日志
    以Docker容器的形式运行GVM-11
    主机标准化配置文档
    网络设备标准化配置文档
    Zabbix日常监控之lvs监控
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/5823953.html
Copyright © 2011-2022 走看看