zoukankan      html  css  js  c++  java
  • JavaWeb之ActiveMQ消息过滤

    前言

    ActiveMQ提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。 

    消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。可以将消息选择器作为MessageConsumer 创建的一部分。

    实现对MapMessage和TextMessage两种消息的过滤条件的设置和消费

    Producer

    在消息的属性中设置过滤条件

    package com.tgb.activemqFilter;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Producer {
        // 单例模式
        // 1、连接工厂
        private ConnectionFactory connectionFactory;
        // 2、连接对象
        private Connection connection;
        // 3、Session对象
        private Session session;
        // 4、生产者
        private MessageProducer messageProducer;
        private Destination destination;
    
        public Producer() {
            try {
                this.connectionFactory = new ActiveMQConnectionFactory("admin",
                        "admin", "tcp://127.0.0.1:61616");
                this.connection = connectionFactory.createConnection();
                this.connection.start();
                // 设置自动签收模式
                this.session = this.connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);
                this.destination = this.session.createQueue("first");
                this.messageProducer = this.session.createProducer(null);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
    
        }
    
        public Session getSession() {
            return this.session;
        }
    
        public void send1(/* String QueueName, Message message */) {
            try {
    
                Destination destination = this.session.createQueue("first");
                MapMessage msg1 = this.session.createMapMessage();
                msg1.setString("name", "张三");
                msg1.setInt("age", 20);
                // 设置用于消息过滤器的条件
                msg1.setStringProperty("name", "张三");
                msg1.setIntProperty("age", 20);
                msg1.setStringProperty("color", "bule");
    
                MapMessage msg2 = this.session.createMapMessage();
                msg2.setString("name", "李四");
                msg2.setInt("age", 25);
                // 设置用于消息过滤器的条件
                msg2.setStringProperty("name", "李四");
                msg2.setIntProperty("age", 25);
                msg2.setStringProperty("color", "white");
    
                MapMessage msg3 = this.session.createMapMessage();
                msg3.setString("name", "赵六");
                msg3.setInt("age", 30);
                // 设置用于消息过滤器的条件
                msg3.setStringProperty("name", "赵六");
                msg3.setIntProperty("age", 30);
                msg3.setStringProperty("color", "black");
                // 发送消息
                this.messageProducer.send(destination, msg1,
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
                this.messageProducer.send(destination, msg2,
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
                this.messageProducer.send(destination, msg3,
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    
        public void send2() {
            try {
                Destination destination = this.session.createQueue("first");
                TextMessage message = this.session.createTextMessage("我是一个字符串");
                message.setIntProperty("age", 25);
                // 发送消息
                this.messageProducer.send(destination, message,
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
    
        }
    
        public static void main(String[] args) {
            Producer producer = new Producer();
            producer.send1();
            // producer.send2();
    
        }
    }

    Conmuser

    消费消息时,直接在session创建MessageConsumer时,将过滤条件作为参数传入(过滤条件的写法和SQL的写法是很像的)

    package com.tgb.activemqFilter;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Conmuser {
        // 单例模式
        // 1、连接工厂
        private ConnectionFactory connectionFactory;
        // 2、连接对象
        private Connection connection;
        // 3、Session对象
        private Session session;
        // 4、生产者
        private MessageConsumer messageConsumer;
        // 5、目的地址
        private Destination destination;
        // 消息选择器
        public final String SELECTOR_1 = "age > 25";
        public final String SELECTOR_2 = " age > 20 and color='black'";
    
        public Conmuser() {
            try {
                this.connectionFactory = new ActiveMQConnectionFactory("admin",
                        "admin", "tcp://127.0.0.1:61616");
                this.connection = connectionFactory.createConnection();
                this.connection.start();
                // 设置自动签收模式
                this.session = this.connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);
                this.destination = this.session.createQueue("first");
                // 在构造消费者的时候,指定了 消息选择器
                // 有选择性的消费消息
                this.messageConsumer = this.session.createConsumer(destination,
                        SELECTOR_1);
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    
        public Session getSession() {
            return this.session;
        }
    
        // 用于监听消息队列的消息
        class MyLister implements MessageListener {
    
            @Override
            public void onMessage(Message message) {
                try {
                    if (message instanceof TextMessage) {
                        TextMessage ret = (TextMessage) message;
                        System.out.println("results;" + ret.getText());
                    }
                    if (message instanceof MapMessage) {
                        MapMessage ret = (MapMessage) message;
                        System.out.println(ret.toString());
                        System.out.println(ret.getString("name"));
                        System.out.println(ret.getInt("age"));
                    }
                } catch (JMSException e) {
                    throw new RuntimeException(e);
                }
            }
    
        }
    
        // 用于异步监听消息
        public void receiver() {
            try {
                this.messageConsumer.setMessageListener(new MyLister());
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }
    
        public static void main(String[] args) {
            Conmuser conmuser = new Conmuser();
            conmuser.receiver();
    
        }
    }

    测试

    Messages Enqueued: 张三 20 | 李四 25 | 赵六 30

    消息过滤条件:age>25

  • 相关阅读:
    服务器时间同步
    DataX部署安装
    Mysql ERROR 1205 (HY000): Lock wait timeout exceeded; try restarting transaction 解决方法
    mysql 使用需要注意的问题
    利用mysqldump 将一个表按条件导出数据
    mysql存储引擎分类
    MySQL数据备份之mysqldump使用
    count(1)、count(*)与count(列名)的执行区别
    rsync + sersync 实现实时数据同步
    ipmitool 工具使用
  • 原文地址:https://www.cnblogs.com/YSPXIZHEN/p/6892266.html
Copyright © 2011-2022 走看看