zoukankan      html  css  js  c++  java
  • ActiveMQ安装配置及使用 转发 https://www.cnblogs.com/hushaojun/p/6016709.html

    ActiveMQ安装配置及使用

    ActiveMQ介绍

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。 
    特性列表: 
    ⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP 
    ⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) 
    ⒊ 对spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性 
    ⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上 
    ⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA 
    ⒍ 支持通过JDBC和journal提供高速的消息持久化 
    ⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点 
    ⒏ 支持Ajax 
    ⒐ 支持与Axis的整合 
    ⒑ 可以很容易的调用内嵌JMS provider,进行测试


    (二)ActiveMQ安装、配置、启动、可视化界面

    1、安装 
    下载地址:http://activemq.apache.org/download.html 
    2、配置(conf目录下) 
    1)用户名密码设置 
    设置用户名密码 
    2)开启jmx监控 
    activemq.xml中进行如下修改 
    这里写图片描述
    注:这里的配置不是必须,根据需要自行配置 
    3、启动 
    直接运行bin目录下:activemq.bat 
    4、可视化界面 
    浏览器中:http://localhost:8161/admin/index.jsp 
    用户名,密码在:jetty-realm.properties中设置


    (三)点对点式消息队列(Queue)

    消息生产者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class QueueProducer {
    
        public static void main(String[] args) {
            //连接信息设置
            String username = "system";
            String password = "manager";
            String brokerURL = "failover://tcp://localhost:61616";
            //连接工厂
            ConnectionFactory connectionFactory = null;
            //连接
            Connection connection = null;
            //会话 接受或者发送消息的线程
            Session session = null;
            //消息的目的地
            Destination destination = null;
            //消息生产者
            MessageProducer messageProducer = null;
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(username, password, brokerURL);
    
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                //创建一个名称为QueueTest的消息队列
                destination = session.createQueue("QueueTest");
                //创建消息生产者
                messageProducer = session.createProducer(destination);
                //发送消息
                TextMessage message = null;
                for (int i=0; i<10; i++) {
                    //创建要发送的文本信息
                    message = session.createTextMessage("Queue消息测试" +(i+1));
                    //通过消息生产者发出消息 
                    messageProducer.send(message);
                    System.out.println("发送成功:" + message.getText());
                }
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            }finally{
                if(null != connection){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
    }
    

    消息消费者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class QueueConsumer {
    
        public static void main(String[] args) {
            //连接信息设置
            String username = "system";
            String password = "manager";
            String brokerURL = "failover://tcp://localhost:61616";
            //连接工厂
            ConnectionFactory connectionFactory = null;
            //连接
            Connection connection = null;
            //会话 接受或者发送消息的线程
            Session session = null;
            //消息的目的地
            Destination destination = null;
            //消息消费者
            MessageConsumer messageConsumer = null;
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(username, password, brokerURL);
    
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个连接QueueTest的消息队列
                destination = session.createQueue("QueueTest");
                //创建消息消费者
                messageConsumer = session.createConsumer(destination);
    
                while (true) {
                    TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                    if(textMessage != null){
                        System.out.println("成功接收消息:" + textMessage.getText());
                    }else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

    (四)主题发布订阅式(Topic)

    主题发布者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class TopicProducer {
    
        public static void main(String[] args) {
            //连接信息设置
            String username = "system";
            String password = "manager";
            String brokerURL = "failover://tcp://localhost:61616";
            //连接工厂
            ConnectionFactory connectionFactory = null;
            //连接
            Connection connection = null;
            //会话 接受或者发送消息的线程
            Session session = null;
            //消息的主题
            Topic topic = null;
            //消息生产者
            MessageProducer messageProducer = null;
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(username, password, brokerURL);
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                //创建名为TopicTest的主题
                topic = session.createTopic("TopicTest");  
                //创建主题生产者
                messageProducer = session.createProducer(topic);
                messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//不将数据持久化
                //发送主题
                TextMessage message = null;
                for (int i=0; i<10; i++) {
                    //创建要发送的文本信息
                    message = session.createTextMessage("Topic主题测试" +(i+1));
                    //通过主题生产者发出消息 
                    messageProducer.send(message);
                    System.out.println("发送成功:" + message.getText());
                }
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            }finally{
                if(null != connection){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }  
        }
    
    }

    主题订阅者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class TopicConsumer {
    
        public static void main(String[] args) {
            //连接信息设置
            String username = "system";
            String password = "manager";
            String brokerURL = "failover://tcp://localhost:61616";
            //连接工厂
            ConnectionFactory connectionFactory = null;
            //连接
            Connection connection = null;
            //会话 接受或者发送消息的线程
            Session session = null;
            //主题的目的地
            Topic topic = null;
            //主题消费者
            MessageConsumer messageConsumer = null;
            //实例化连接工厂
            connectionFactory = new ActiveMQConnectionFactory(username, password, brokerURL);
    
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个连接TopicTest的主题
                topic = session.createTopic("TopicTest");  
                //创建主题消费者
                messageConsumer = session.createConsumer(topic);
    
                messageConsumer.setMessageListener(new MyMessageListener());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    class MyMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {  
            TextMessage textMessage = (TextMessage) message;  
            try {  
                System.out.println("接收订阅主题:" + textMessage.getText());  
            } catch (JMSException e) {  
                e.printStackTrace();  
            }  
        } 
    
    }
    

    注: 
    1、代码中所需额外jar包在下载的mq文件夹中,例如我使用的:activemq-all-5.9.0.jar 
    2、对于消息队列,异步生产和消费;对于主题发布订阅要先启动订阅者进行监听,然后在发布方可接收到订阅主题 
    3、关于Queue与Topic的具体区别,详见http://blog.csdn.net/qq_21033663/article/details/52458305

  • 相关阅读:
    Microsoft Enterprise Library 5.0 系列(二) Cryptography Application Block (初级)
    Microsoft Enterprise Library 5.0 系列(五) Data Access Application Block
    Microsoft Enterprise Library 5.0 系列(八) Unity Dependency Injection and Interception
    Microsoft Enterprise Library 5.0 系列(九) Policy Injection Application Block
    Microsoft Enterprise Library 5.0 系列(三) Validation Application Block (高级)
    软件研发打油诗祝大家节日快乐
    从挖井的故事中想到开发管理中最容易忽视的几个简单道理
    ITIL管理思想的执行工具发布
    管理类软件设计“渔”之演化
    20070926日下午工作流与ITILQQ群 事件管理 讨论聊天记录
  • 原文地址:https://www.cnblogs.com/Jeely/p/10789062.html
Copyright © 2011-2022 走看看