zoukankan      html  css  js  c++  java
  • ActiveMQ(5.10.0)

    Sending a JMS message

    public class MyMessageProducer {
        ...
    
        // 创建连接工厂实例
        ConnectionFactory connFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, 
                "tcp://localhost:61616");
    
        Connection conn = null;
        try {
            // 取得连接对象实例
            conn = connFactory.createConnection();
            // 启动连接
            conn.start();
            // 创建会话对象实例
            Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // 创建消息目的地
            Destination destination = session.createQueue("hello_queue");
            // 创建消息生产者
            MessageProducer msgProducer = session.createProducer(destination);
            // 创建消息对象实例
            Message textMsg = session.createTextMessage("This is a test message.");
            // 发送消息
            msgProducer.send(textMsg);
            // 提交会话
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 关闭连接
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
        
        ...
    }

    Receiving a JMS message synchronously

    public class MySynMessageConsumer {
        ...
    
        // 创建连接工厂实例
        ConnectionFactory connFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, 
                "tcp://localhost:61616");
    
        Connection conn = null;
        try {
            // 取得连接对象实例
            conn = connFactory.createConnection();
            // 启动连接,当调用此方法后才能接收到消息
            conn.start();
            // 创建会话对象实例
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建消息目的地
            Destination destination = session.createQueue("hello_queue");
            // 创建消息消费者
            MessageConsumer msgConsumer = session.createConsumer(destination);
            
            // 接收消息
            TextMessage textMsg = (TextMessage) msgConsumer.receive(10 * 1000);
            if (textMsg != null) {
                System.out.println(textMsg.getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    
        ...
    }

    Receiving a JMS message asynchronously

    public class MyAsynMessageConsumer {
        ...
    
        // 创建连接工厂实例
        ConnectionFactory connFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD, 
                "tcp://localhost:61616");
    
        Connection conn = null;
        try {
            // 取得连接对象实例
            conn = connFactory.createConnection();
            // 启动连接,当调用此方法后才能接收到消息
            conn.start();
            // 创建会话对象实例
            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建消息目的地
            Destination destination = session.createQueue("hello_queue");
            // 创建消息消费者
            MessageConsumer msgConsumer = session.createConsumer(destination);
            // 注册消息监听器
            msgConsumer.setMessageListener(new MessageListener() {                
                @Override
                public void onMessage(Message msg) {
                    TextMessage textMsg = (TextMessage) msg;
                    try {
                        System.out.println(textMsg.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            Thread.sleep(60 * 1000); 
            
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 关闭连接
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    
        ...
    }

    Others

    • 对于 Pub/Sub 模型,使用 session.createTopic 方法创建 Destination。
    • MessageConsumer 同步消费消息时,receive() 方法会阻塞线程直到接收到下一条消息;receive(long timeout) 方法在指定的时间内阻塞线程直到接收到下一条消息,如果超时,则返回 null 值;receiveNoWait() 方法立刻接收下一条消息,如果消息源中没有消息,则返回 null 值。
  • 相关阅读:
    上篇用到的matcher函数
    lambdaj学习
    Redis高级应用——2
    Redis入门
    从gitee 下载代码到本地
    CSS中对图片(background)的一些设置心得总结
    nodejs 安装Ionic 和cordova
    Spring MVC内容协商机制详解
    基于Servlet3.0的编程式SpringMVC实例详解
    基于Junit的Spring集成测试方法
  • 原文地址:https://www.cnblogs.com/huey/p/4682454.html
Copyright © 2011-2022 走看看