zoukankan      html  css  js  c++  java
  • RocketMQ系列(六)批量发送与过滤

    今天我们再来看看RocketMQ的另外两个小功能,消息的批量发送和过滤。这两个小功能提升了我们使用RocketMQ的效率。

    批量发送

    以前我们发送消息的时候,都是一个一个的发送,这样效率比较低下。能不能一次发送多个消息呢?当然是可以的,RocketMQ为我们提供了这样的功能。但是它也有一些使用的条件:

    • 同一批发送的消息的Topic必须相同;
    • 同一批消息的waitStoreMsgOK 必须相同;
    • 批量发送的消息不支持延迟,就是上一节说的延迟消息;
    • 同一批次的消息,大小不能超过1MiB;

    好了,只要我们满足上面的这些限制,就可以使用批量发送了,我们来看看发送端的代码吧,

    @Test
    public void producerBatch() throws Exception {
    
        List<Message> messages = new ArrayList<>();
        for (int i = 0;i<3;i++) {
            MessageExt message = new MessageExt();
            message.setTopic("cluster-topic");
            message.setKeys("key-"+i);
            message.setBody(("this is batchMQ,my NO is "+i+"---"+new Date()).getBytes());
            messages.add(message);
        }
        SendResult sendResult = defaultMQProducer.send(messages);
        System.out.println("sendResult:" + sendResult.getSendStatus().toString());
    }
    
    • 其实批量发送很简单,我们只是把消息放到一个List当中,然后统一的调用send方法发送就可以了。

    再来看看消费端的代码,

    @Bean(initMethod = "start",destroyMethod = "shutdown")
    public DefaultMQPushConsumer pushConsumer()  {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultMQPushConsumer");
            consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
            consumer.subscribe("cluster-topic", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.println("msgs.size():"+msgs.size());
                    if (msgs != null && msgs.size() > 0) {
                        for (MessageExt msg : msgs) {
                            System.out.println(new String(msg.getBody()));
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            return consumer;
        }catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    
    • 消费端的代码没有任何的变化,正常的接收消息就可以了,我们只是打印出了msgs.size(),看看一次接收一个消息,还是一次可以批量的接收多个消息。

    我们启动项目,批量发送一下,看看效果吧,

    发送端的日志如下:

    sendResult:SEND_OK
    

    发送成功,看来我们批量发送的3个消息都进入到了队列中,再看看消费端,是一次消费一个,还是一次消费3个,如下:

    msgs.size():1
    this is batchMQ,my NO is 0---Mon Jun 15 09:31:04 CST 2020
    msgs.size():1
    this is batchMQ,my NO is 1---Mon Jun 15 09:31:04 CST 2020
    msgs.size():1
    this is batchMQ,my NO is 2---Mon Jun 15 09:31:04 CST 2020
    

    看样子是一次只消费了一个消息,那么能不能一次消费3个消息呢?当然是可以的,不过要进行特殊的设置,

    consumer.setConsumeMessageBatchMaxSize(5);
    

    在消费端,我们设置批量消费消息的数量是5,这个值默认是1。我们再看看消费端的日志,

    msgs.size():3
    this is batchMQ,my NO is 0---Mon Jun 15 09:35:47 CST 2020
    this is batchMQ,my NO is 1---Mon Jun 15 09:35:47 CST 2020
    this is batchMQ,my NO is 2---Mon Jun 15 09:35:47 CST 2020
    

    这次一次消费了3个消息,如果消息比较多的话,最大一次能消费5个。这就是RocketMQ的批量发送和批量消费。

    消息过滤

    其实我们在大多数情况下,使用tag标签就能够很好的实现消息过滤。虽然tag标签咱们并没有过多的介绍,其实也很好理解,就是一个子Topic的概念,咱们在构建消息message的时候,message.setTags("xxx")。然后在消费的时候,订阅Topic的时候,也可以指定订阅的tag,

    consumer.subscribe("cluster-topic", "*");
    

    看到那个"*"了吗?它就是订阅的tag,"*"代表全部的tag,如果您想订阅其中的一个或几个,可以使用这种方式"tagA || tagB || tagC",这是订阅了cluster-topic下的3个tag,其他的tag是不会被消费的。

    这里我们所说的消息过滤比tag要高级很多,是可以支持sql的,怎么样?高级吧。比如:我们订阅"a > 5 and b = 'abc'"的消息,如下图:

    但是,RocketMQ毕竟不是数据库,它只能支持一些基础的SQL语句,并不是所有的SQL都支持,

    • 数字型的支持,>, >=, <, <=, BETWEEN, =

    • 字符串支持,=, <>, IN

    • IS NULL或者IS NOT NULL

    • 逻辑判断,ANDORNOT

    字段的类型也只是简单的几种,

    • 数字型,支持123,543.123,整型、浮点都可以;
    • 字符串,必须使用单引号''括起来;
    • 空值,NULL;
    • 布尔型,TRUE或者FALSE;

    并且对消费者的类型也有一定的限制,只能使用push consumer才可以进行消息过滤。好了,说了这么多了,我们看看怎么使用吧,消费端和生产端都要进行相应的改造,先看看生产端吧,

    @Test
    public void producerBatch() throws Exception {
    
        List<Message> messages = new ArrayList<>();
        for (int i = 0;i<3;i++) {
            MessageExt message = new MessageExt();
            message.setTopic("cluster-topic");
            message.setKeys("key-"+i);
            message.setBody(("this is batchMQ,my NO is "+i+"---"+new Date()).getBytes());
    
            int a = i+4;
            message.putUserProperty("a",String.valueOf(a));
    
            messages.add(message);
        }
        SendResult sendResult = defaultMQProducer.send(messages);
        System.out.println("sendResult:" + sendResult.getSendStatus().toString());
    }
    

    我们在之前批量发送的基础上进行了修改,定义了a的值,等于i+4,这样循环3次,a的值就是4,5,6。然后调用message.putUserProperty("a",String.valueOf(a))注意,在使用消息过滤的时候,这些附加的条件属性都是通过putUserProperty方法进行设置。这里,我们设置了a的值。再看看消费端,

    consumer.subscribe("cluster-topic", MessageSelector.bySql("a > 5"));
    

    消费端,整体上没有变化,只是在订阅的方法中,使用MessageSelector.bySql("a > 5"),进行了条件的过滤。有的小伙伴可能会有疑问,我既想用sql过滤又想用tag过滤怎么办?当然也是可以,我们可以使用MessageSelector.bySql("a > 5").byTag("xx),byTag和bySql不分前后,怎么样,很强大吧。我们运行一下程序,看看效果吧。

    我们启动一下服务,报错了,怎么回事?错误信息如下:

    The broker does not support consumer to filter message by SQL92
    

    队列不支持过滤消息,我们查询了RocketMQ源码中的BrokerConfig类,这个类就是对broker的一些设置,其中发现了这两个属性,

    // whether do filter when retry.
    private boolean filterSupportRetry = false;
    private boolean enablePropertyFilter = false;
    
    • filterSupportRetry是在重试的时候,是否支持filter;
    • enablePropertyFilter,这个就是是否支持过滤消息的属性;

    我们把这两个属性在broker的配置文件改为true吧,如下:

    filterSupportRetry=true
    enablePropertyFilter=true
    

    然后,再重新部署一下我们两主两从的集群环境。环境部署完以后,我们再重启应用,没有报错。在生产端发送一下消息看看吧,

    sendResult:SEND_OK
    

    生产端发送消息没有问题,说明3个消息都发送成功了。再看看消费端的日志,

    msgs.size():1
    this is batchMQ,my NO is 2---Mon Jun 15 10:59:37 CST 2020
    

    只消费了一个消息,并且这个消息中i的值是2,那么a的值就是2+4=6,它是>5的,满足SQL的条件,所以被消费掉了。这完全符合我们的预期。

    总结

    今天的两个小功能还是比较有意思的,但里边也有需要注意的地方,

    • 消息的批量发送,只要我们满足它的条件,然后使用List发送就可以了;批量消费,默认的消费个数是1,我们可以调整它的值,这样就可以一次消费多个消息了;
    • 过滤消息中,最大的坑就是队列的配置里,需要设置enablePropertyFilter=true,否则消费端在启动的时候报不支持SQL的错误;

    我们在使用的时候,多加留意就可以了,有问题,评论区留言吧~

  • 相关阅读:
    HTTP Status 401
    Spring Data Elasticsearch 应用
    Elasticsearch基础概念
    Windows上使用Linux命令
    在服务器搭建git仓库
    SSH
    Linux/Windows 配置config 使用ssh连接
    在阿里云CentOS服务器上安装Python3.7并设置为默认Python
    Linux下Python3源码安装
    Vue
  • 原文地址:https://www.cnblogs.com/boboooo/p/13129755.html
Copyright © 2011-2022 走看看