zoukankan      html  css  js  c++  java
  • RocketMQ(三)——————javaAPI (5.过滤消息)

    SQL表达式过滤:

    消费者将收到包含TAGA或TAGB消息. 但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。
    在这种情况下,可以使用SQL表达式筛选出消息.

    配置:

    在`broker.conf `中添加配置

    enablePropertyFilter = true

    启动broker 加载指定配置文件


    ../bin/mqbroker -n 127.0.0.1:9876 -c broker.conf 


    语法:

    RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它.

    1. 数字比较, 像 `>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
    2. 字符比较, 像 `=`, `<>`, `IN`;
    3. `IS NULL` 或者 `IS NOT NULL`;
    4. 逻辑运算`AND`, `OR`, `NOT`;

    常量类型是:

    1. 数字, 像123, 3.1415;
    2. 字符串, 像‘abc’,必须使用单引号;
    3. `NULL`, 特殊常数;
    4. 布尔常量, `TRUE` 或`FALSE`;

     


    1、生产者样例
    发送消息时,你能通过putUserProperty来设置消息的属性

        public static void main(String[] args) throws Exception {
    
            DefaultMQProducer producer = new DefaultMQProducer("rocketMq1");
    
            //设置nameserver地址:
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
    
    
            for(int i=1;i<=100;i++){
    
                Message message1 = new Message("myTopic001","TAG-B","KEY-B",("rocketMq1 第一次发送 TAG-B"+i).getBytes());
                
                //num属性会作为过滤条件
                message1.putUserProperty("num",String.valueOf(i));
                
                producer.send(message1);
            }
    
    
            producer.shutdown();
            System.out.println("生产者下线!");
    
        }

    2、消费者样例
    用MessageSelector.bySql来使用sql筛选消息

        public static void main(String[] args) throws Exception {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketMq1");
    
            //设置nameserver地址:
            consumer.setNamesrvAddr("127.0.0.1:9876");
    
            //每个cconsumer只能关注一个topic
            // tag selector 在一个group中的消费者,都不能随便变,要保持统一
            //根据num过滤
            MessageSelector selector =  MessageSelector.bySql("num >= 18 and num <= 28");
            
            consumer.subscribe("myTopic001",selector);
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    
                    for(MessageExt message: list){
                        System.out.println(new String(message.getBody()));
                    }
    
                    // 默认情况下 这条消息只会被 一个consumer 消费到 点对点
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.start();
    
            System.out.println("消费者  start....");
    
        }
  • 相关阅读:
    算法----(1)冒泡排序
    淘宝爬虫
    爬虫_豆瓣电影top250 (正则表达式)
    爬虫_猫眼电影top100(正则表达式)
    Android 简单调用摄像头
    Android 简单天气预报
    思维模型
    This view is not constrained, it only has designtime positions, so it will jump to (0,0) unless you
    Android studio preview界面无法预览,报错render problem
    Android studio 3.1.2报错,no target device found
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14597939.html
Copyright © 2011-2022 走看看