zoukankan      html  css  js  c++  java
  • RocketMQ(三)——————javaAPI (1.2.3.4 接收方式)

    Consumer 消费者 对以上四种的模式的 消息进行消费:

    //官网示例
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            // 实例化消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("producerGroup_name");
    
            // 设置NameServer的地址
            consumer.setNamesrvAddr("localhost:9876");
    
            // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
            consumer.subscribe("TopicTest", "*");
            // 注册回调实现类来处理从broker拉取回来的消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    // 标记该消息已经被成功消费
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 启动消费者实例
            consumer.start();
            System.out.printf("Consumer Started.%n");
        }
    
        //简单理解
        public static void main(String[] args) throws Exception {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rocketMq1");
    
            //设置nameserver地址
            consumer.setNamesrvAddr("127.0.0.1:9876");
    
            // 每个consumer 关注一个topic
            // topic 关注的消息的地址
            // 过滤器 * 表示不过滤
            consumer.subscribe("myTopic001","*");
            
            //"TAG-B" --  只获取带有此标签的数据
            //consumer.subscribe("myTopic001","TAG-B");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    
                    for(MessageExt message: list){
                        System.out.println(new String(message.getBody()));
                    }
                    // 默认情况下 这条消息只会被 一个consumer 消费到 点对点
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.start();
    
            System.out.println("消费者  start....");
    
        }
  • 相关阅读:
    (转) mysql的分区技术 .
    (转) MySQL中索引的限制
    单键索引还是组合索引
    什么是挂载?mount的用处在哪?
    java中的try-catch-finally中的return的执行顺序
    eclipse中spring配置文件的自动提示和namespace的添加
    try catch 语句中有return 的各类情况
    IOC与DI的理解
    对已经存在的没有唯一标识的表添加一个自增的id字段(利用序列sequence)操作过程
    mysql5.7导出数据提示--secure-file-priv选项问题的解决方法
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14597874.html
Copyright © 2011-2022 走看看