zoukankan      html  css  js  c++  java
  • JMS学习四(ActiveMQ消息过滤)

    一、消息的选择器

    不管是在消息发送端设置消息过期时间还是在接收端设置等待时间,都是对不满足的消息有过滤的作用,那消息选择器就是为过滤消息而生的下面来看看消息选择器:

    ActiveMQ提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。 消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。可以将消息选择器作为MessageConsumer 创建的一部分。

     消息选择器的用法
          MessageConsumer是一个Session创建的对象,用来从Destination接收消息


          关于消息选择器
          MessageConsumer createConsumer( Destination destination, String messageSelector )
          MessageConsumer createConsumer( Destination destination, String messageSelector, boolean noLocal )

          其中,messageSelector为消息选择器; 
          noLocal标志默认为false,当设置为true时,限制消费者只能接收和自己相同的连接(Connection)所发布的消息,此标志只适用于主题,不适用于队列。

          public final String SELECTOR="JMS_TYPE='MY_TAG1'" ; 
          选择器检查传入消息的JMS_TYPE的属性,并确定这个属性的值是否等于MY_TAG1;
          如果相等,消息报消费;如果不相等,那么消息就会被忽略;

    1、消息生产者:

    package mqtest3;  
      
    import javax.jms.Connection;  
    import javax.jms.ConnectionFactory;  
    import javax.jms.DeliveryMode;  
    import javax.jms.Destination;  
    import javax.jms.JMSException;  
    import javax.jms.MapMessage;  
    import javax.jms.MessageProducer;  
    import javax.jms.Session;  
    import javax.jms.TextMessage;  
      
    import org.apache.activemq.ActiveMQConnectionFactory;  
      
    public class Producer {  
        // 单例模式  
        // 1、连接工厂  
        private ConnectionFactory connectionFactory;  
        // 2、连接对象  
        private Connection connection;  
        // 3、Session对象  
        private Session session;  
        // 4、生产者  
        private MessageProducer messageProducer;  
      
        public Producer() {  
            try {  
                this.connectionFactory = new ActiveMQConnectionFactory("admin",  
                        "admin", "tcp://127.0.0.1:61616");  
                this.connection = connectionFactory.createConnection();  
                this.connection.start();  
                // 设置自动签收模式  
                this.session = this.connection.createSession(false,  
                        Session.AUTO_ACKNOWLEDGE);  
                this.messageProducer = this.session.createProducer(null);  
            } catch (JMSException e) {  
                throw new RuntimeException(e);  
            }  
      
        }  
      
        public Session getSession() {  
            return this.session;  
        }  
      
        public void send1(/* String QueueName, Message message */) {  
            try {  
      
                Destination destination = this.session.createQueue("first");  
                MapMessage msg1 = this.session.createMapMessage();  
                msg1.setString("name", "张三");  
                msg1.setInt("age", 20);  
                // 设置用于消息过滤器的条件  
                msg1.setStringProperty("name", "张三");  
                msg1.setIntProperty("age", 20);  
                msg1.setStringProperty("color", "bule");  
      
                MapMessage msg2 = this.session.createMapMessage();  
                msg2.setString("name", "李四");  
                msg2.setInt("age", 25);  
                // 设置用于消息过滤器的条件  
                msg2.setStringProperty("name", "李四");  
                msg2.setIntProperty("age", 25);  
                msg2.setStringProperty("color", "white");  
      
                MapMessage msg3 = this.session.createMapMessage();  
                msg3.setString("name", "赵六");  
                msg3.setInt("age", 30);  
                // 设置用于消息过滤器的条件  
                msg3.setStringProperty("name", "赵六");  
                msg3.setIntProperty("age", 30);  
                msg3.setStringProperty("color", "black");  
                // 发送消息  
                this.messageProducer.send(destination, msg1,  
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
                this.messageProducer.send(destination, msg2,  
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
                this.messageProducer.send(destination, msg3,  
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
            } catch (JMSException e) {  
                throw new RuntimeException(e);  
            }  
        }  
      
        public void send2() {  
            try {  
                Destination destination = this.session.createQueue("first");  
                TextMessage message = this.session.createTextMessage("我是一个字符串");  
                message.setIntProperty("age", 25);  
                // 发送消息  
                this.messageProducer.send(destination, message,  
                        DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);  
            } catch (JMSException e) {  
                throw new RuntimeException(e);  
            }  
      
        }  
      
        public static void main(String[] args) {  
            Producer producer = new Producer();  
            producer.send1();  
            // producer.send2();  
      
        }  
    }  

    2、消息消费者:

    package mqtest3;  
      
    import javax.jms.Connection;  
    import javax.jms.ConnectionFactory;  
    import javax.jms.Destination;  
    import javax.jms.JMSException;  
    import javax.jms.MapMessage;  
    import javax.jms.Message;  
    import javax.jms.MessageConsumer;  
    import javax.jms.MessageListener;  
    import javax.jms.Session;  
    import javax.jms.TextMessage;  
    import org.apache.activemq.ActiveMQConnectionFactory;  
      
    public class Conmuser {  
        // 单例模式  
        // 1、连接工厂  
        private ConnectionFactory connectionFactory;  
        // 2、连接对象  
        private Connection connection;  
        // 3、Session对象  
        private Session session;  
        // 4、生产者  
        private MessageConsumer messageConsumer;  
        // 5、目的地址  
        private Destination destination;  
        // 消息选择器  
        public final String SELECTOR_1 = "age > 25";  
        public final String SELECTOR_2 = " age > 20 and color='black'";  
      
        public Conmuser() {  
            try {  
                this.connectionFactory = new ActiveMQConnectionFactory("admin",  
                        "admin", "tcp://127.0.0.1:61616");  
                this.connection = connectionFactory.createConnection();  
                this.connection.start();  
                // 设置自动签收模式  
                this.session = this.connection.createSession(false,  
                        Session.AUTO_ACKNOWLEDGE);  
                this.destination = this.session.createQueue("first");  
                // 在构造消费者的时候,指定了 消息选择器  
                // 有选择性的消费消息  
                this.messageConsumer = this.session.createConsumer(destination,  
                        SELECTOR_1);  
            } catch (JMSException e) {  
                throw new RuntimeException(e);  
            }  
        }  
      
        public Session getSession() {  
            return this.session;  
        }  
      
        // 用于监听消息队列的消息  
        class MyLister implements MessageListener {  
      
            @Override  
            public void onMessage(Message message) {  
                try {  
                    if (message instanceof TextMessage) {  
                        TextMessage ret = (TextMessage) message;  
                        System.out.println("results;" + ret.getText());  
                    }  
                    if (message instanceof MapMessage) {  
                        MapMessage ret = (MapMessage) message;  
                        System.out.println(ret.toString());  
                        System.out.println(ret.getString("name"));  
                        System.out.println(ret.getInt("age"));  
                    }  
                } catch (JMSException e) {  
                    throw new RuntimeException(e);  
                }  
            }  
      
        }  
      
        // 用于异步监听消息  
        public void receiver() {  
            try {  
                this.messageConsumer.setMessageListener(new MyLister());  
            } catch (JMSException e) {  
                throw new RuntimeException(e);  
            }  
        }  
      
        public static void main(String[] args) {  
            Conmuser conmuser = new Conmuser();  
            conmuser.receiver();  
      
        }  
    }  

    上面的demo是对MapMessage和TextMessage两种消息的过滤条件的设置和消费,过滤条件的设置使在消息的属性中设置,而消费消息的时候直接是在session创建MessageConsumer时传入的参数即过滤条件(过滤条件的写法和SQL的写法是很像的)

    在写过滤条件的时候要注意设置的是什么类型的条件即: int 、string 如果是int 则加引号而如果是String则要加哦!!!

    需要注意的地方

         注意消息过滤器的过滤条件的设置

    // 设置用于消息过滤器的条件  
    msg3.setStringProperty("name", "赵六");  
    msg3.setIntProperty("age", 30);  
    msg3.setStringProperty("color", "black");  

    消息过滤器的写法(类似于SQL语句的写法)

    // 消息选择器  
    public final String SELECTOR_1 = "age > 20";  
    public final String SELECTOR_2 = " age > 20 and color='bule'";  
  • 相关阅读:
    在Windows QT下使用ZeroMQ
    libpng warning: iCCP: known incorrect sRGB profile告警处理
    qt 程序发布
    Qtcreator 之中文目录
    windows下kafka配置入门 示例
    CentOS-7安装Mysql集群
    zookeeper 集群安装(单点与分布式成功安装)摘录
    Linux下安装Redis2.6.17
    Hadoop集群Hadoop安装配置
    lvs/dr+keepalived应用测试实施文档
  • 原文地址:https://www.cnblogs.com/alter888/p/8975356.html
Copyright © 2011-2022 走看看