zoukankan      html  css  js  c++  java
  • RocketMQ(三)——————javaAPI (5.过滤消息)

    SQL表达式过滤:

    消费者将收到包含TAGA或TAGB消息. 但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。
    在这种情况下,可以使用SQL表达式筛选出消息.

    配置:

    在`broker.conf `中添加配置

    enablePropertyFilter = true

    启动broker 加载指定配置文件


    ../bin/mqbroker -n 127.0.0.1:9876 -c broker.conf 


    语法:

    RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它.

    1. 数字比较, 像 `>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
    2. 字符比较, 像 `=`, `<>`, `IN`;
    3. `IS NULL` 或者 `IS NOT NULL`;
    4. 逻辑运算`AND`, `OR`, `NOT`;

    常量类型是:

    1. 数字, 像123, 3.1415;
    2. 字符串, 像‘abc’,必须使用单引号;
    3. `NULL`, 特殊常数;
    4. 布尔常量, `TRUE` 或`FALSE`;

     


    1、生产者样例
    发送消息时,你能通过putUserProperty来设置消息的属性

        public static void main(String[] args) throws Exception {
    
            DefaultMQProducer producer = new DefaultMQProducer("rocketMq1");
    
            //设置nameserver地址:
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
    
    
            for(int i=1;i<=100;i++){
    
                Message message1 = new Message("myTopic001","TAG-B","KEY-B",("rocketMq1 第一次发送 TAG-B"+i).getBytes());
                
                //num属性会作为过滤条件
                message1.putUserProperty("num",String.valueOf(i));
                
                producer.send(message1);
            }
    
    
            producer.shutdown();
            System.out.println("生产者下线!");
    
        }

    2、消费者样例
    用MessageSelector.bySql来使用sql筛选消息

        public static void main(String[] args) throws Exception {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketMq1");
    
            //设置nameserver地址:
            consumer.setNamesrvAddr("127.0.0.1:9876");
    
            //每个cconsumer只能关注一个topic
            // tag selector 在一个group中的消费者,都不能随便变,要保持统一
            //根据num过滤
            MessageSelector selector =  MessageSelector.bySql("num >= 18 and num <= 28");
            
            consumer.subscribe("myTopic001",selector);
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    
                    for(MessageExt message: list){
                        System.out.println(new String(message.getBody()));
                    }
    
                    // 默认情况下 这条消息只会被 一个consumer 消费到 点对点
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.start();
    
            System.out.println("消费者  start....");
    
        }
  • 相关阅读:
    函数与导数部分的题型梳理
    构造函数习题1
    破解构造函数问题
    函数的值域
    函数的定义域
    高三数学微课堂
    Redux Todos Example
    Linux下查看Nginx安装目录、版本号信息及当前运行的配置文件
    antd的Tree控件实现点击展开功能
    Redux Counter example
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14597939.html
Copyright © 2011-2022 走看看