zoukankan      html  css  js  c++  java
  • RocketMQ(三)——————javaAPI (5.过滤消息)

    SQL表达式过滤:

    消费者将收到包含TAGA或TAGB消息. 但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。
    在这种情况下,可以使用SQL表达式筛选出消息.

    配置:

    在`broker.conf `中添加配置

    enablePropertyFilter = true

    启动broker 加载指定配置文件


    ../bin/mqbroker -n 127.0.0.1:9876 -c broker.conf 


    语法:

    RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它.

    1. 数字比较, 像 `>`, `>=`, `<`, `<=`, `BETWEEN`, `=`;
    2. 字符比较, 像 `=`, `<>`, `IN`;
    3. `IS NULL` 或者 `IS NOT NULL`;
    4. 逻辑运算`AND`, `OR`, `NOT`;

    常量类型是:

    1. 数字, 像123, 3.1415;
    2. 字符串, 像‘abc’,必须使用单引号;
    3. `NULL`, 特殊常数;
    4. 布尔常量, `TRUE` 或`FALSE`;

     


    1、生产者样例
    发送消息时,你能通过putUserProperty来设置消息的属性

        public static void main(String[] args) throws Exception {
    
            DefaultMQProducer producer = new DefaultMQProducer("rocketMq1");
    
            //设置nameserver地址:
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
    
    
            for(int i=1;i<=100;i++){
    
                Message message1 = new Message("myTopic001","TAG-B","KEY-B",("rocketMq1 第一次发送 TAG-B"+i).getBytes());
                
                //num属性会作为过滤条件
                message1.putUserProperty("num",String.valueOf(i));
                
                producer.send(message1);
            }
    
    
            producer.shutdown();
            System.out.println("生产者下线!");
    
        }

    2、消费者样例
    用MessageSelector.bySql来使用sql筛选消息

        public static void main(String[] args) throws Exception {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketMq1");
    
            //设置nameserver地址:
            consumer.setNamesrvAddr("127.0.0.1:9876");
    
            //每个cconsumer只能关注一个topic
            // tag selector 在一个group中的消费者,都不能随便变,要保持统一
            //根据num过滤
            MessageSelector selector =  MessageSelector.bySql("num >= 18 and num <= 28");
            
            consumer.subscribe("myTopic001",selector);
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    
                    for(MessageExt message: list){
                        System.out.println(new String(message.getBody()));
                    }
    
                    // 默认情况下 这条消息只会被 一个consumer 消费到 点对点
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.start();
    
            System.out.println("消费者  start....");
    
        }
  • 相关阅读:
    搭建SpringCloud之注册中心Eureka
    学习角色管理模块错误总结---基于SpringMVC框架
    【转】Eclipse 单步调试
    [转]MyBatis的foreach语句详解
    解决pom.xml文件 ---- web.xml is missing and <failOnMissingWebXml> is set to true
    解决Dynamic Web Module 3.0 Requires Java 1.6 or newer
    用maven在eclipse用spring建javaweb工程(一)
    【转载】Eclipse 断点调试
    学习大神笔记之“MyBatis学习总结(三)”
    学习大神笔记之“MyBatis学习总结(二)”
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14597939.html
Copyright © 2011-2022 走看看