zoukankan      html  css  js  c++  java
  • RocketMQ-Filer

    一、搭建RocketMQ集群

      我搭建的是2-master no slave模式,所以在${rocketmq}/conf/2m-noslave/下的 brokder-*.properties 中添加 filterServerNums=1

    二、依次启动namesrv和broker

      在broker-*.properties中配置了filterServerNums=1后当你启动broker后,会自动启动filter

    三、代码部分

      3.1 Producer部分

    package org.hope.lee.filter;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class FilterProducer {
        public static void main(String[] args) throws MQClientException {
            String group_name = "filter_producer";
            DefaultMQProducer producer = new DefaultMQProducer(group_name);
            producer.setNamesrvAddr("xxx.xxx.xx.176:9876;xxx.xx.xx.165:9876");
            producer.start();
            try {
                for (int i = 0; i < 100; i++) {
                    Message msg = new Message("TopicFilter7",// topic
                        "TagA",// tag
                        "OrderID001",// key
                        ("Hello MetaQ" + i).getBytes());// body
                    msg.putUserProperty("SequenceId", String.valueOf(i));
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            producer.shutdown();
        }
    }

       3.2 Customer部分

    package org.hope.lee.filter;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.MixAll;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class FilterCustomer {
        public static void main(String[] args) throws MQClientException {
            String group_name = "filter_consumer";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr("xxx.xxx.xx.176:9876;xxx.xxx.xx.165:9876");
            // 使用Java代码,在服务器做消息过滤 
            String filterCode = MixAll.file2String("E:\code-on-oschina\hzjsd1108sohu\RocketMQ-learn\rocketmq-api\src\main\java\org\hope\lee\filter\MessageFilterImpl.java");
            System.out.println(filterCode);
            consumer.subscribe("TopicFilter7", "org.hope.lee.filter.MessageFilterImpl", filterCode);
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                        ConsumeConcurrentlyContext context) {
                    //System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                    try {
                        System.out.println(new String(msgs.get(0).getBody(),"utf-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }

      3.3 实现MessageFilter接口

        注意:这个类中不能有任何的中文,包括注释中也不能有。否则在Customer启动的时候是找不到这个文件的。

    package org.hope.lee.filter;
    
    import com.alibaba.rocketmq.common.filter.MessageFilter;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class MessageFilterImpl implements MessageFilter {
    
        @Override
        public boolean match(MessageExt msg) {
            // NO Chinese
            System.out.println("-------------");
            String property = msg.getUserProperty("SequenceId");
            System.out.println("---------" + property);
            if (property != null) {
                int id = Integer.parseInt(property);
                if((id % 2) == 0) {
                //if ((id % 3) == 0 && (id > 10)) {
                    return true;
                }
            }
     
            return false;
        }
    
    }

    四、测试:

      4.1 运行Customer端

      4.2运行Producer端

    https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

    参考:

    [1] 白贺翔博客,https://www.cnblogs.com/baihexiang/articles/5307073.html

    [2] 考拉哥博客,http://lifestack.cn/archives/371.html

  • 相关阅读:
    集合的笼统介绍之Collection
    集合的笼统介绍之ArrayList
    final关键字+static关键字+匿名对象
    多态
    练习018:搜索插入位置
    练习017:实现strStr()
    练习016:移除元素
    练习015:删除排序数组中的重复项
    练习014:合并两个有序链表
    用JS实现链表
  • 原文地址:https://www.cnblogs.com/happyflyingpig/p/8423356.html
Copyright © 2011-2022 走看看