zoukankan      html  css  js  c++  java
  • 记录:ActiveMQ实例(一)

    ps: ActiveMQ 5.10 以上版本需要使用 JDK1.8

    1.启动activemq

    1.1  进入activemq解压后的bin目录,执行命令  ./activemq start

    1.2  查看activemq进程   ps -aux | grep activemq

    1.3  访问管理界面, 地址  http:// ip地址: 8161/admin,  默认用户名和密码都是 admin

    ps : 8161 是默认端口,如果要修改默认端口,需要编辑  /conf/jetty.xml 文件 (activemq内置jetty服务器)

    2. 创建项目,导入jar包

     3. 处理文本消息

     3.1  创建 TextMessageProducer.java  (文本消息提供者)

    package cn.demo.producer;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class TextMessageProducer {
    
        /**
         * 将文本写入消息队列
         * @param text
         * @param destName
         */
        public void sendMsgToActiveMQ(String text, String destName) {
    //        链接工厂
            ConnectionFactory factory = null;
    //        链接
            Connection connection = null;
    //        会话
            Session session = null;
    //        目的队列
            Destination dest = null;
    //        消息生产者
            MessageProducer producer = null;
    //        定义消息
            Message message = null;
            
            try {
                
                /**
                 * 1. 创建连接工厂
                 * 构造方法: public ActiveMQConnectionFactory(String userName,String password)
                 * userName - 访问ActiveMQ服务的用户名, 用户名可以通过jetty-realm.properties配置文件配置.
                 * password - 访问ActiveMQ服务的密码, 密码可以通过jetty-realm.properties配置文件配置.
                 * brokerURL - 访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号
                 *             此链接基于TCP/IP.
                 */
                factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.230.4:61616");
                
    //            2. 创建连接
                connection = factory.createConnection();
    //            开启连接
                connection.start();
                
                /**
                 * 创建会话connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
                 * 第一个参数: 事务的提交的方式
                 *         true  : 表示手动提交事务,则第二个参数失效
                 *         false : 不提交事务,则必须指定消息确认机制
                 * 
                 * 第二个参数: 消息确认机制  acknowledgeMode  -消息确认机制,可选值为:
                 *         Session.AUTO_ACKNOLEDGE  -  自动确认消息机制
                 *         Session.CLIENT_ACKNOLEDGE - 客户端确认消息机制
                 *         Session.DUPS_OK_ACKNOLEDGE - 有副本的客户端确认消息机制
                 */
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                
    //            4. 指定目的地,即队列命名,消息消费者 需要通过此命名访问对应的队列
                dest = session.createQueue(destName);
                
    //            5. 创建消息生产者,参数为目的地队列名
                producer = session.createProducer(dest);
                
    //            6. 创建一个文本消息,此消息对象中保存要传递的文本信息
                message = session.createTextMessage(text);
                
    //            7. 发送
                producer.send(message);
                System.out.println("消息发送成功");
                
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("消息发送失败");
            } finally {
    //            关闭资源
                if(null != producer) {
                    try {
                        producer.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                if(null != session) {
                    try {
                        session.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                if(null != connection) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
            
        }
        
    }
    View Code

    3.2   创建测试类,测试消息是否发送成功

    public class TestMQ {
    
        @Test
        public void testTextProducer(){
            
            TextMessageProducer producer = new TextMessageProducer();
            producer.sendMsgToActiveMQ("嘻嘻嘻嘻嘻","text-mq");
            System.out.println("消息发送成功");
        }
    
    }
    View Code

    3.3  然后就会看见一条消息待处理(等待出队列的消息数量)

    3.4  创建通用消息消费者 ConsumerMQ.java

    public class ConsumerMQ {
    
        public Message recieveMessageFromMQ(String destName) {
    
            ConnectionFactory factory = null;
            Connection connection = null;
            Session session = null;
            Destination dest = null;
            
            MessageConsumer consumer = null;
            
            Message message = null;
            
            try {
                
                factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.230.4:61616");
                
    //            2. 创建连接
                connection = factory.createConnection();
    //            开启连接
                connection.start();
                
    //            3. 创建会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                
    //            4.指定队列地址
                dest = session.createQueue(destName);
                
    //            5.创建消费者
                consumer = session.createConsumer(dest);
                
    //            6.读取消息
                message = consumer.receive();
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭资源
                if(null!=consumer){
                    try {
                        consumer.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                if(null!=session){
                    try {
                        session.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                if(null!=connection){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
            return message;
        }
        
    }
    View Code

    3.4  消息消费者测试方法

        @Test
        public void testTextConsumer(){
            try {
                ConsumerMQ c = new ConsumerMQ();
                Message message = c.recieveMessageFromMq("text-mq");
            
                    TextMessage msg = (TextMessage) message;
                System.out.println("从队列中获取的消息是:"+msg.getText());
    
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }    
    View Code

    结果:

    Number Of Pending Messages(等待出队列的消息数)-1,

    Messages Enqueued(进队列数) +1

    Messages Dequeued (出队列数) +1

     

    4. 处理对象消息

     4.1 创建pojo,该对象必须实现序列化

    public class Item implements Serializable {
        
        private Long id;
        
        private String title;
        
        private Long price;
    
        public Item() {
            super();
            // TODO Auto-generated constructor stub
        }
    
        public Long getId() {
            return id;
        }
    
        public void setId(Long id) {
            this.id = id;
        }
    
        public String getTitle() {
            return title;
        }
    
        public void setTitle(String title) {
            this.title = title;
        }
    
        public Long getPrice() {
            return price;
        }
    
        public void setPrice(Long price) {
            this.price = price;
        }
        
    }
    View Code

    4.2 创建ObjectMessageProducer.java(对象消息提供者)

    public class ObjectMessageProducer {
    
        public void sendObjectToMQ(Item item, String destName) {
            ConnectionFactory factory = null;
            Connection connection = null;
            Session session = null;
            Destination dest = null;
            
            MessageProducer producer = null;
            
            Message message = null;
            
            try {
                factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.230.4:61616");
                
    //            2.开启连接
                connection = factory.createConnection();
                
    //            3.创建会话
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                
    //            4.指定队列
                dest = session.createQueue(destName);
                
    //            5.创建发送者
                producer = session.createProducer(dest);
                
    //            6.创建对象消息,对象必须实现序列化
                message = session.createObjectMessage(item);
                
    //            7.发送
                producer.send(message);
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //关闭资源
                if(null!=producer){
                    try {
                        producer.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                if(null!=session){
                    try {
                        session.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                if(null!=connection){
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
            
        }
        
    }
    View Code

    4.3  测试消息发送

        @Test
        public void testObjectMessageProducer(){
            ObjectMessageProducer producer = new ObjectMessageProducer();
            
            Item item = new Item();
            item.setId(1L);
            item.setPrice(10000000L);
            item.setTitle("这是个对象消息");
            
            producer.sendObjectToMQ(item, "obj-mq");
            System.out.println("将对象消息写入消息队列");
        }
    View Code

    4.4  测试消息读取  ConsumerMQ.java 是上面处理文本信息通用的那个

        @Test
        public void testObjConsumser(){
            ConsumerMQ c = new ConsumerMQ();
            Message message = c.recieveMessageFromMq("obj-mq");
            ObjectMessage objMsg = (ObjectMessage) message;
            
            try {
                Item item = (Item) objMsg.getObject();
                System.out.println("读取对象消息:" + item.getId() + item.getTitle());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    View Code

     

    5.监听器实现

    监听指定的队列,持续获取消息

    创建 MyListener.java ,实现 MessageListener,自定义一个监听器

    public class MyListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
    
            if(null != message) {
                ObjectMessage objMsg = (ObjectMessage) message;
                try {
                    
                    Item item = (Item) objMsg.getObject();
                    System.out.println("商品的id:"+item.getId()+",商品的名称:"+item.getTitle()+",商品的价格:"+item.getPrice());
                    
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            
        }
    
    }
    View Code

    当Java程序结束时,监听器无法继续加载

    解决方案:1.将监听器放在web程序中  2. 阻塞线程

    嗯……这里先用方法二

    修改 ConsumerMQ.java

    当使用监听器时,不用调用receive()方法,而是加载监听器

    //            5.创建消费者
                consumer = session.createConsumer(dest);
                
    //            6.读取消息
    //            message = consumer.receive();
                
    //            7.加载监听器
                consumer.setMessageListener(new MyListener());
    
    //            强制阻塞线程,让监听器持续加载
                System.in.read();
    View Code

    测试

    @Test
        public void testObjConsumser(){
            ConsumerMQ c = new ConsumerMQ();
            c.recieveMessageFromMq("obj-mq");
        }
    View Code

    访问ActiveMQ管理界面,会看到 待读取消息一出现就会被读取。

  • 相关阅读:
    删除当前目录下除了system目录的其他文件
    单例设计模式
    系统工程师
    字符串翻转
    教育
    得到b相对于a的路径
    一段处理事务的代码
    搭讪
    win 8 ,vs2011 编程环境下,动软生成器无法连接上 sql server 2008 r2
    从asp网站编程转行到asp.net网站编程的过程
  • 原文地址:https://www.cnblogs.com/islz/p/9554493.html
Copyright © 2011-2022 走看看