zoukankan      html  css  js  c++  java
  • ActiveMQ_消费者编码

    public class JmsConsumer {
    
        public static final String ACTIVEMQ_URL = "tcp://192.168.xx.xx:61616";
    
        public static void main(String[] args) throws Exception{
            //创建activemq连接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //获得连接
            Connection connection = factory.createConnection();
            //打开连接
            connection.start();
            //创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //创建目的地对象
            Destination queue = session.createQueue("queue01");
    
            //创建消费者
            MessageConsumer consumer = session.createConsumer(queue);
    
            //消费消息
            while (true){
    
                //接收消息
                TextMessage message = (TextMessage) consumer.receive();
                //判断消息是否为空
                if(null != message){
                    System.out.println(message.getText());
                }else{
                    break;
                }
    
            }
            //关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    
    }

      强调:consumer的receive有两个,无参receive():表示一直等着,程序挂着那;有参receive(timeout):在规定时间内等待;所以我们的receive它是同步阻塞的

      上面是通过receive来消费消息,现在采用监听器的方式(异步非阻塞):

    public class JmsConsumer2 {
    
        public static final String ACTIVEMQ_URL = "tcp://192.168.xx.xxx:61616";
    
        public static void main(String[] args) throws Exception{
            //创建activemq连接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //获得连接
            Connection connection = factory.createConnection();
            //打开连接
            connection.start();
            //创建session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            //创建目的地对象
            Destination queue = session.createQueue("queue01");
    
            //创建消费者
            MessageConsumer consumer = session.createConsumer(queue);
    
            //监听消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(null != message && message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println(textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            System.in.read();
            //关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    
    }
  • 相关阅读:
    后勤数据抽取流程图 Logistic Data Extraction
    WINDOWS两条线路上网的解决办法
    Zabbix Agent for Linux部署
    Java项目的自动更新并构建脚本
    使用Goole搜索引擎
    golang程序性能优化方法----不断更新
    golang性能分析策略
    问题分析:引入新elastic api导致的TIME_WAIT堆积
    数据权限限定办法
    MariaDB集群搭建
  • 原文地址:https://www.cnblogs.com/ibcdwx/p/14048020.html
Copyright © 2011-2022 走看看