zoukankan      html  css  js  c++  java
  • RocketMQ(三)——————javaAPI (1.2.3.4 接收方式)

    Consumer 消费者 对以上四种的模式的 消息进行消费:

    //官网示例
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            // 实例化消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("producerGroup_name");
    
            // 设置NameServer的地址
            consumer.setNamesrvAddr("localhost:9876");
    
            // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
            consumer.subscribe("TopicTest", "*");
            // 注册回调实现类来处理从broker拉取回来的消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    // 标记该消息已经被成功消费
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 启动消费者实例
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    
        //简单理解
        public static void main(String[] args) throws Exception {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketMq1");
    
            //设置nameserver地址
            consumer.setNamesrvAddr("127.0.0.1:9876");
    
            // 每个consumer 关注一个topic
            // topic 关注的消息的地址
            // 过滤器 * 表示不过滤
            consumer.subscribe("myTopic001","*");
            
            //"TAG-B" --  只获取带有此标签的数据
            //consumer.subscribe("myTopic001","TAG-B");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    
                    for(MessageExt message: list){
                        System.out.println(new String(message.getBody()));
                    }
                    // 默认情况下 这条消息只会被 一个consumer 消费到 点对点
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.start();
    
            System.out.println("消费者  start....");
    
        }
  • 相关阅读:
    生成不带签名(BOM)的UTF8格式的XML
    矢量数据的裁剪及合并
    使用dotNET_Reactor4.7加密后的dll在VS2010中无法打包
    加密后的程序在Win8中无法运行
    修改jpg的图片大小
    shapefile中dbf的数据格式(转载)
    shapefile 输出的地理处理注意事项(转载)
    linxu下面的绝对路径和相对路径
    go中的类型转换成interface之后如何复原
    使用docker部署一个go应用
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14597874.html
Copyright © 2011-2022 走看看