zoukankan      html  css  js  c++  java
  • rocketMQ消费者java消费代码

    执行main方法即可启动(如果是spring项目,一般在构造方法调用启动方法接口,记得把类注入到容器即可)

    (启动后 当消息有推送时会自动除发consumeMessage消费事件)

    消费者名称broker-a可随意命名,但是要固定,不然会重新消费该主题所有消息
    package com.chzfsd.controller;
    
    import java.util.List;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    
    public class TestMq {
    
        public static void main(String[] args) {
            rocketMQConsumer();
        }
    
        public static void rocketMQConsumer() {
            try {
                System.out.println("rocketMQConsumer  开始------");
                // 消费目标
                // 声明一个消费者consumer,需要传入一个组
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broker-a");
                // 设置集群的NameServer地址,多个地址之间以分号分隔
                consumer.setNamesrvAddr("127.0.0.1:9876");
                // 设置consumer的消费策略
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                // 集群模式消费,广播消费不会重试
                consumer.setMessageModel(MessageModel.CLUSTERING);
                // 设置最大重试次数,默认是16次
                consumer.setMaxReconsumeTimes(5);
                // 设置consumer所订阅的Topic和Tag,*代表全部的Tag
                consumer.subscribe("GD_runmode_syncRunmodeRecloseInfo", "*");
                // Listener,主要进行消息的逻辑处理,监听topic,如果有消息就会立即去消费
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                        // 获取第一条消息,进行处理
                        try {
                            if (msgs != null && msgs.size() > 0) {
                                  MessageExt messageExt = msgs.get(0);
                                  String msgBody = new String(messageExt.getBody(), "utf-8");
                                  System.out.println(" 接收消息整体为:" + msgBody);
                            }
                        } catch (Exception e) {
                            System.out.println("消息消费失败,请尝试重试!!!");
                            e.printStackTrace();
                            // 尝试重新消费,直接第三次如果还不成功就放弃消费,进行消息消费失败补偿操作
                            if (msgs.get(0).getReconsumeTimes() == 3) {
                                System.out.println("消息记录日志:" + msgs);
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            } else {
                                // 重试状态码,重试机制可配置
                                // System.out.println("消息消费失败,尝试重试!!!");
                                System.out.println("消息消费失败,请尝试重试!!!");
                                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                            }
                        }
                        System.out.println("消息消费成功!!!");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
                // 调用start()方法启动consumer
                consumer.start();
                System.out.println("消费者启动成功。。。");
                System.out.println("rocketMQConsumer 结束------");
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("消息消费操作失败--" + e.getMessage());
            }
        }
    }
  • 相关阅读:
    server version for the right syntax to use near 'USING BTREE 数据库文件版本不合导致的错误
    百度网盘,FTP上传异常、上传失败的解决办法
    zencart产品属性dropmenu select只有一个选择项时自动变成radio单选的解决办法
    火车采集小结
    dedecms织梦移站后替换数据库中文件路径命令
    dedecms织梦网站本地迁移到服务器后,后台更新栏目文档提示模板文件不存在,无法解析文档!的解决办法
    Addthis分享插件后url乱码的解决办法
    dedecms织梦做中英文(多语言)网站步骤详解
    递归的参数和返回值
    【图论算法】Dijkstra&BFS
  • 原文地址:https://www.cnblogs.com/rdchen/p/15210585.html
Copyright © 2011-2022 走看看