zoukankan      html  css  js  c++  java
  • activemq

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 1.Destination  目的地
     *  Queue 队列
     *    队列中的消息,默认只能由唯一的一个消费者处理,一旦处理消息后消息就被删除
     *
     * 支持存在多个消费者,多个生产者,所以消费者不可能消费到已经被消费的消息
     * 当消费者不存在时,消息会一直保存,直到有消费者消费
     *
     *  Topic 目的地
     *    消息会发送给所有的消费者同时处理,只有在消息可以重复处理的业务场景中可使用
     *
     *
     */
    public class TestProducer {
    
    
        public void sendTextMessage(String mess){
    
            ConnectionFactory connectionFactory = null;
    
            Connection connection = null;
    
            Destination destination = null;
    
            Session session = null;
    
            MessageProducer messageProducer =null;
    
            Message message = null;
    
            try {
    
                connectionFactory = new ActiveMQConnectionFactory();
    
                connection = connectionFactory.createConnection();
                //发送者默认是启动的,消费者默认是不启动的,所有客户端时必须启动
                //如果有特殊配置,配置后再启动
                connection.start();
                //1 第一个参数  是否支持事务,不推荐事务,批量时建议使用  如果true  第二个参数无效(只是提供者)
                // 客户端不支持事务
                //2  消息确认机制
                /**
                 * 1 auto_acknowledge  自动确认消息,消息的消费者处理消息后,自动确认,常用,商业开发不推荐
                 * 2 client_acknowledge 客户端手动确认,消息的消费者处理后,必须手工确认
                 * 3 dups_ok_acknowledge 有副本的客户端手动确认 一个消息可以多次处理,可以降低session的消耗,在
                 *
                 * 可以容忍重复消息时使用(不推荐使用)
    //设置该消息的超时时间
                //messageProducer.setTimeToLive(1000);
                /**
                 * 过期的、处理失败的消息,将会被ActiveMQ置入“ActiveMQ.DLQ”这个队列中。这个队列是由ActiveMQ自动创建的。
                 *
                 * 如果需要查看这些未被处理的消息,可以进入这个队列中查看
                   Destination destination = session.createQueue("ActiveMQ.DLQ");
                 */ *
    */ session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE); destination = session.createQueue("first-mq"); messageProducer = session.createProducer(destination); message = session.createTextMessage(mess); messageProducer.send(message);
    //session.commit(); //启用事务时记得提交事务,不然消费端接收不到消息 System.out.println(
    "消息已发送"); }catch (Exception e){ e.printStackTrace(); }finally { if (messageProducer!=null){ try { messageProducer.close(); } catch (JMSException e) { e.printStackTrace(); } } if (session!=null){ try { session.close(); } catch (JMSException e) { e.printStackTrace(); } } if (connection!=null){ try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void main(String[] args) { TestProducer t = new TestProducer(); t.sendTextMessage("发送一条消息"); } }
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * Created by Administrator on 2019/6/27.
     */
    public class TestConsumer {
    
    
        public void rTextMessage() {
    
            ConnectionFactory connectionFactory = null;
    
            Connection connection = null;
    
            Destination destination = null;
    
            Session session = null;
    
    
            MessageConsumer messageConsumer = null;
    
            Message message = null;
    
            try {
    
                connectionFactory = new ActiveMQConnectionFactory();
    
                connection = connectionFactory.createConnection();
                //发送者默认是启动的,消费者默认是不启动的,所有客户端时必须启动
                //如果有特殊配置,配置后再启动
                connection.start();
                //1 第一个参数  是否支持事务,不推荐事务,批量时建议使用  如果true  第二个参数无效(只是提供者)
                // 客户端不支持事务
                //2  消息确认机制
                /**
                 * 1 auto_acknowledge  自动确认消息,消息的消费者处理消息后,自动确认,常用,商业开发不推荐
                 * 2 client_acknowledge 客户端手动确认,消息的消费者处理后,必须手工确认
                 * 3 dups_ok_acknowledge 有副本的客户端手动确认 一个消息可以多次处理,可以降低session的消耗,在
                 *
                 * 可以容忍重复消息时使用(不推荐使用)
                 *
                 */
                session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                destination = session.createQueue("first-mq");
                //messageProducer = session.createProducer(destination);
                messageConsumer = session.createConsumer(destination);
    
                /**
                 * 主动活动消息,执行一次,拉去一个消息,开发少用
                 * 多个消费者要用监听
                 */
    
                message =messageConsumer.receive();
    
                String mess = ((TextMessage)message).getText();
    
                System.out.println("接收到的消息----------"+mess);
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (messageConsumer != null) {
                    try {
                        messageConsumer.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                if (session != null) {
                    try {
                        session.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
    
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    
        public static void main(String[] args) {
            TestConsumer t = new TestConsumer();
            t.rTextMessage();
        }
    
    
    }
    /**
         * 监听队列消息
         */
        public void receiveMessageListener() {
            ConnectionFactory connectionFactory = null;
            Connection connection = null;
            Session session = null;
            Destination destination = null;
            MessageConsumer consumer = null;
            try {
                connectionFactory = new ActiveMQConnectionFactory(brokerURL);
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("first-quere");
                consumer = session.createConsumer(destination);
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("接收消息:"+textMessage.getText());
                            //textMessage.acknowledge();消息确认机制
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                        
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
            }
        }

    1

  • 相关阅读:
    【转】JVM 堆内存设置原理
    【转】Java八种基本数据类型的比较及其相互转化
    8月12日
    并发与竞争
    高通gpio配置输出
    创建一个字符设备的基本流程
    4月2号 字符设备驱动实验
    3.30学习遇到卡死点
    断言函数的用法
    12.02 下午
  • 原文地址:https://www.cnblogs.com/jentary/p/11099827.html
Copyright © 2011-2022 走看看