zoukankan      html  css  js  c++  java
  • ActiveMQ消息选择器Selector

    一、前言

      消息发送到Broker,消费者通过Destination可以订阅消费某个特定的通道内的消息。一些特殊情况下,需要消费者对消息过滤下再进行消费,也就是筛选出某些特定消息。ActiveMQ提供了SQL92表达式语法的自定义消息筛选功能。非常方便快捷的能够开发出具有消息筛选功能的应用。

      ActiveMQ 支持:

    1. 数字表达式: >,>=,<,<=,BETWEEN,=.
    2. 字符表达式:=,<>,IN.
    3. IS NULL 或则 IS NOT NULL.
    4. 逻辑AND, 逻辑OR, 逻辑NOT.

      常数类型:

    1. 数字:3.1415926, 5。
    2. 字符: ‘a’,必须带有单引号。
    3. NULL,特别的常量。
    4. 布尔类型: TRUEFALSE

    二、程序案例

      生产者:

    package com.cfang.prebo.activemq.selector;
    
    import java.util.Scanner;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Producer {
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory = null;
            Connection connection = null;
            Session session = null;
            Destination destination = null;
            MessageProducer producer = null;
            Message message = null;
            try {
                Scanner scanner = new Scanner(System.in);
                connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618");
                connection = connectionFactory.createConnection(null, null);
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("TP_Q_TEST_SELECTOR00");
                producer = session.createProducer(destination);
                while(true) {
                    String line = scanner.nextLine();
                    if("exit".equals(line)) {
                        break;
                    }
                    message = session.createTextMessage(line);
                    message.setIntProperty("applicationName", line.length());
                    message.setStringProperty("result", "RT");
                    producer.send(message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if(producer != null){ // 回收消息发送者
                    try {
                        producer.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                if(session != null){ // 回收会话对象
                    try {
                        session.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                if(connection != null){ // 回收连接对象
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
    }

      如上,生产者还可以设置更多的条件,ActiveMQ也提供了全基本类型的 setXXXXXProperty方法去设置条件。

      消费者:

    package com.cfang.prebo.activemq.selector;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ConsumerA {
    
    	public static void main(String[] args) {
    		ConnectionFactory connectionFactory = null;
    		Connection connection = null;
    		Session session = null;
    		Destination destination = null;
    		MessageConsumer consumer = null;
    		try {
    			connectionFactory = new ActiveMQConnectionFactory("tcp://172.31.31.160:61618");
    			connection = connectionFactory.createConnection(null, null);
    			connection.start();
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    			destination = session.createQueue("TP_Q_TEST_SELECTOR00");
    			consumer = session.createConsumer(destination,"applicationName=2 and result='RT'");
    			consumer.setMessageListener(new MessageListener() {
    				public void onMessage(Message message) {
    					System.out.println(message);
    				}
    			});
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			
    		}
    	}
    }
    

      如上,消费者就只消费  applicationName = 2 且  result = 'RT' 的消息。

    三、小结

      1、提供了筛选功能,可以减少 destination 的数量。可以用于实现特定机器,特定消息(灰度?)。

      2、如果同时两个消费者的话,一个异常不能消费了,那么消息就会产生积压。对另一个正常的消费者而言,性能会下降,消费时间可能会变长。

  • 相关阅读:
    KVM环境搭建RHCS
    修改virt-manager默认磁盘格式
    前端基础之html
    并发编程
    网络编程
    常用模块
    模块与包
    迭代器、生成器、面向过程编程
    闭包函数和装饰器
    函数
  • 原文地址:https://www.cnblogs.com/eric-fang/p/11433837.html
Copyright © 2011-2022 走看看