zoukankan      html  css  js  c++  java
  • RocketMQ-Filer

    一、搭建RocketMQ集群

      我搭建的是2-master no slave模式,所以在${rocketmq}/conf/2m-noslave/下的 brokder-*.properties 中添加 filterServerNums=1

    二、依次启动namesrv和broker

      在broker-*.properties中配置了filterServerNums=1后当你启动broker后,会自动启动filter

    三、代码部分

      3.1 Producer部分

    package org.hope.lee.filter;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class FilterProducer {
        public static void main(String[] args) throws MQClientException {
            String group_name = "filter_producer";
            DefaultMQProducer producer = new DefaultMQProducer(group_name);
            producer.setNamesrvAddr("xxx.xxx.xx.176:9876;xxx.xx.xx.165:9876");
            producer.start();
            try {
                for (int i = 0; i < 100; i++) {
                    Message msg = new Message("TopicFilter7",// topic
                        "TagA",// tag
                        "OrderID001",// key
                        ("Hello MetaQ" + i).getBytes());// body
                    msg.putUserProperty("SequenceId", String.valueOf(i));
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            producer.shutdown();
        }
    }

       3.2 Customer部分

    package org.hope.lee.filter;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.MixAll;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class FilterCustomer {
        public static void main(String[] args) throws MQClientException {
            String group_name = "filter_consumer";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr("xxx.xxx.xx.176:9876;xxx.xxx.xx.165:9876");
            // 使用Java代码,在服务器做消息过滤 
            String filterCode = MixAll.file2String("E:\code-on-oschina\hzjsd1108sohu\RocketMQ-learn\rocketmq-api\src\main\java\org\hope\lee\filter\MessageFilterImpl.java");
            System.out.println(filterCode);
            consumer.subscribe("TopicFilter7", "org.hope.lee.filter.MessageFilterImpl", filterCode);
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                        ConsumeConcurrentlyContext context) {
                    //System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                    try {
                        System.out.println(new String(msgs.get(0).getBody(),"utf-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }

      3.3 实现MessageFilter接口

        注意:这个类中不能有任何的中文,包括注释中也不能有。否则在Customer启动的时候是找不到这个文件的。

    package org.hope.lee.filter;
    
    import com.alibaba.rocketmq.common.filter.MessageFilter;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    public class MessageFilterImpl implements MessageFilter {
    
        @Override
        public boolean match(MessageExt msg) {
            // NO Chinese
            System.out.println("-------------");
            String property = msg.getUserProperty("SequenceId");
            System.out.println("---------" + property);
            if (property != null) {
                int id = Integer.parseInt(property);
                if((id % 2) == 0) {
                //if ((id % 3) == 0 && (id > 10)) {
                    return true;
                }
            }
     
            return false;
        }
    
    }

    四、测试:

      4.1 运行Customer端

      4.2运行Producer端

    https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

    参考:

    [1] 白贺翔博客,https://www.cnblogs.com/baihexiang/articles/5307073.html

    [2] 考拉哥博客,http://lifestack.cn/archives/371.html

  • 相关阅读:
    强化学习的基本迭代方法
    基于文本描述的事务聚类
    学习强化学习之前需要掌握的3种技能
    其它 华硕 ASAU S4100U 系统安装 win10安装 重装系统 Invalid Partition Table 解决
    数据分析 一些基本的知识
    Python 取样式的内容 合并多个文件的样式 自定义样式
    电商 Python 生成补单公司需要的评论格式3
    SpringBlade 本地图片上传 生成缩略图
    SQL Server 字符串截取
    SpringBlade 本地图片上传
  • 原文地址:https://www.cnblogs.com/happyflyingpig/p/8423356.html
Copyright © 2011-2022 走看看