zoukankan      html  css  js  c++  java
  • RocketMQ入门(Filter)_5

    RocketMQ中存储的消息对于消费者来说,并不完全都是他们需要的,因此需要对消息进行过滤。

    订阅Topic主题 ,选择Tags都是我们简单的过滤。Topic是大分类,Tags是二级分类。

     

    RocketMQ还有一种过滤机制MessageFilter,是在服务端开启过滤服务器,消费者将指定的java上传后,在服务端过滤。

    这种会在很大程度上影响服务器性能,java类要确保简单安全,不占用过多资源。

    服务器配置:

    要开启FilterServer需要在配置文件(broker)中添加启动filterServer,默认是关闭的。

    filterServerNums=1

    开启的Server数,一个broker可对应多个filterServer,建议一般就1个。

    启动前关闭broker,添加完配置,然后启动broker。

    再启动filterServer

    》nohup sh mqfiltersrv  -c /usr/local/program/rocketmq/conf/2m-2s-sync/broker-a.properties &

    启动完毕后可使用jps来查看。

    程序配置:

    Producer端无需任何变化。

    Consumer端变化:

    先定义一个Filter类,实现MessageFilter接口。

    /**
     * filter
     * attention: don't use the chinese word..
     * @author DennyZhao
     *
     */
    public class FruitMsgFilter implements MessageFilter {
    
        @Override
        public boolean match(MessageExt msg, FilterContext filterContext) {
            String origin = msg.getUserProperty("origin");  //from map
            if("mainland".equals(origin)) {
                return true;
            }
            return false;
        }
    
    }

    ※ 注意:里面不能有中文,判断内容可随意。

                   将Filter文件,放入到resource下,防止被编译,无法发送。

    1. 增加filter java类写成String.发送给broker。

    File file = new File(Thread.currentThread().getContextClassLoader().getResource("FruitMsgFilter.java").getFile());
    String fileStr = MixAll.file2String(file);

     // 订阅下添加此java到当前的package下,此处不能带 .java
     consumer.subscribe("fruit", "com.rocketmq.learn.filter.FruitMsgFilter", fileStr);

     运行 consumer,在console的consumer下可以看到此consumer,点击client可以看到filter信息。

    启动producer测试类:

    /**
     * 水果生產者
     * @author DennyZhao
     *
     */
    public class FruitProducer {
        
        /**
         * 主方法
         * @param args
         * @throws MQClientException 
         * @throws InterruptedException 
         */
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("fruitProducerGroup");
            producer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
            producer.start();
            String[] fruitArray = {"apple","pear","strawberry","kiwifruit","mango","peach"};
            int count = 0;
            for(String fruitname : fruitArray) {
                Message msg = new Message("fruit", "common", fruitname, fruitname.getBytes());
                count++;
                Thread.sleep(2000);
                // 用于定义产地
                if(count !=4 && count != 5) {
                    msg.putUserProperty("origin", "mainland");
                }else {
                    msg.putUserProperty("origin", "foregin");
                }
                SendResult result = null;
                try {
                    result = producer.send(msg);
                    if(result != null) {
                        switch(result.getSendStatus()) {
                        case SEND_OK:
                            System.out.println("发送成功........");
                            break;
                        default:
                            System.out.println("发送失败.......");
                            // TODO将内容写入到Redis或失败表中,用于后面从新发送
                            break;
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            
            producer.shutdown();
        }
    
    }

     测试结果,只会展示 mainland 的水果。

    ※:一般情况不要开启FilterServer,对mq性能消耗较大。

  • 相关阅读:
    实习的一些感想,感触,心得体会
    一张优惠券引发的血案(redis并发安全问题)
    Java各种对象(PO,BO,VO,DTO,POJO,DAO,Entity,JavaBean,JavaBeans)的区分
    Redis 集群
    Maven Pom文件标签详解
    Google Guava 基本工具
    context:component-scan的使用说明
    logback的简单分析
    轮询和长轮询
    StringUtils中 isNotEmpty 和isNotBlank的区别?
  • 原文地址:https://www.cnblogs.com/DennyZhao/p/9944384.html
Copyright © 2011-2022 走看看