zoukankan      html  css  js  c++  java
  • ActiveMQ 的安装与使用

    消息中间件简介

    消息中间件(MOM:Message Orient middleware)

    消息中间件有很多的用途和优点:

    1. 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块。

    2. 负责建立网络通信的通道,进行数据的可靠传送。

    3. 保证数据不重发,不丢失。

    4. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务。

    ActiveMQ安装

    MQ英文名MessageQueue,中文名也就是消息队列,是一个消息的接受和转发的容器,可用于消息推送。

    ActiveMQ是由Apache出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能。

    1、下载

    官方网站:http://activemq.apache.org/ 

    2、安装

    下载完毕后进行解压缩。

    目录介绍:

    bin存放的是脚本文件

    conf存放的是基本配置文件

    data存放的是日志文件

    docs存放的是说明文档

    examples存放的是简单的实例

    lib存放的是activemq所需jar包

    webapps用于存放项目的目录

    3、启动

    根据你的系统位数,选择相应的启动脚本,我的是64位,双击 E:apache-activemq-5.15.2inwin64 目录下的 activemq.bat 脚本文件,可以看下图的效果。 

    4、测试

    ActiveMQ默认使用的TCP连接端口是61616, 通过查看该端口的信息可以测试ActiveMQ是否成功启动

    netstat -an|find "61616"

    5、监控

    ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。

    地址:http://127.0.0.1:8161/admin/

    用户名和密码都是admin,可以对ActiveMQ服务进行监控。

    需要停止服务器,只需要按着Ctrl+Shift+C,之后输入Y即可。

    ActiveMQ特性

    1、多种语言和协议编写客户端。语言:Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议:OpenWire,Stomp REST,WS Notification,XMPP,AMQP。

    2、完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)。

    3、对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性。

    4、通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。

    5、支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。

    6、支持通过JDBC和journal提供高速的消息持久化。

    7、从设计上保证了高性能的集群,客户端-服务器,点对点。

    8、支持Ajax。

    9、支持与Axis的整合。

    10、可以很容易得调用内嵌JMS provider进行测试。

    ActiveMQ使用

    使用场景

    1、多个项目之间集成,可以实现跨平台、多语言、多项目集成。

    2、降低系统间模块的耦合度,解耦,增强软件扩展性。

    3、系统前后端隔离,屏蔽高安全区。

    使用示例

    开发时候,需要将安装包解压缩后里面的activemq-all-5.15.2.jar包加入到classpath下面,这个包包含了所有jms接口api的实现。

    点对点的消息模型,只需要一个消息生成者和消息消费者,下面我们编写代码。

    生产者

    package com.wangbo.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息生产者(发送者)
     * @author wangbo
     *
     */
    public class JMSProducer {
        
        //默认连接用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
        //发送的消息数量
        private static final int SENDNUM = 10;
        
        public static void main(String[] args) {
            
            //连接工厂
            ConnectionFactory connectionFactory;
            //连接
            Connection connection = null;
            //会话,接受或者发送消息的线程
            Session session;
            //消息的目的地
            Destination destination;
            //消息的生产者
            MessageProducer messageProducer;
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);
            
            try {
                
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建Session
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                //创建一个名为“HelloWorld”的消息队列
                destination = session.createQueue("HelloWorld");
                //创建消息生产者
                messageProducer = session.createProducer(destination);
                //发送消息
                sendMessage(session, messageProducer);
                
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
            
        }
        
        /**
         * 发送消息
         * @param session
         * @param messageProducer  消息生产者
         * @throws Exception
         */
        public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
            for (int i = 0; i < JMSProducer.SENDNUM; i++) {
                //创建一条文本消息 
                TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
                System.out.println("发送消息:Activemq 发送消息" + i);
                //通过消息生产者发出消息 
                messageProducer.send(message);
            }
    
        }
    
    }

    消费者

    package com.wangbo.activemq;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消息的消费者(接收者)
     * 
     * @author wangbo
     *
     */
    public class JMSConsumer {
    
        // 默认连接用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        // 默认连接密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        // 默认连接地址
        private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        public static void main(String[] args) {
    
            // 连接工厂
            ConnectionFactory connectionFactory;
            // 连接
            Connection connection = null;
            // 会话,接受或者发送消息的线程
            Session session;
            // 消息的目的地
            Destination destination;
            // 消息的消费者
            MessageConsumer messageConsumer;
            // 实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
                    JMSConsumer.BROKEURL);
    
            try {
    
                // 通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                // 启动连接
                connection.start();
                // 创建session
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 创建一个连接HelloWorld的消息队列
                destination = session.createQueue("HelloWorld");
                // 创建消息消费者
                messageConsumer = session.createConsumer(destination);
    
                while (true) {
                    TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                    if (textMessage != null) {
                        System.out.println("收到的消息:" + textMessage.getText());
                    } else {
                        break;
                    }
                }
    
            } catch (JMSException e) {
                e.printStackTrace();
            }
    
        }
    }

    运行结果

    1、首先,启动ActiveMQ

    2、运行生产者,eclipse控制台显示:

    然后看一下监控显示:

    我们可以看到创建了一个名称为HelloWorld的消息队列,队列中有10条消息未被消费,我们也可以通过Browse查看是哪些消息

    如果这些队列中的消息,被删除,消费者则无法消费。

    3、运行消费者,eclipse控制台显示:

    我们再看一下ActiveMQ服务器,Queues内容如下:

    我们可以看到HelloWorld的消息队列发生变化,多一个消息者,队列中的10条消息被消费了,点击Browse查看,已经为空了。 点击Active Consumers,我们可以看到这个消费者的详细信息: 

    参考地址:https://blog.csdn.net/jiuqiyuliang/article/category/5617711

  • 相关阅读:
    BETA版使用说明
    项目冲刺第二阶段Fifth Day
    第二阶段 项目冲刺Forth Day
    项目冲刺第二阶段Third Day
    项目冲刺第二阶段Second Day
    “渴了么”使用说明(供用户发表评论)
    项目冲刺第二阶段 每日站立会议First Day
    Alpha版总结会议
    alpha版使用说明书
    绩效考核
  • 原文地址:https://www.cnblogs.com/wbxk/p/10364614.html
Copyright © 2011-2022 走看看