zoukankan      html  css  js  c++  java
  • rocketMQ消费者java消费代码

    执行main方法即可启动(如果是spring项目,一般在构造方法调用启动方法接口,记得把类注入到容器即可)

    (启动后 当消息有推送时会自动除发consumeMessage消费事件)

    消费者名称broker-a可随意命名,但是要固定,不然会重新消费该主题所有消息
    package com.chzfsd.controller;
    
    import java.util.List;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    
    public class TestMq {
    
        public static void main(String[] args) {
            rocketMQConsumer();
        }
    
        public static void rocketMQConsumer() {
            try {
                System.out.println("rocketMQConsumer  开始------");
                // 消费目标
                // 声明一个消费者consumer,需要传入一个组
                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broker-a");
                // 设置集群的NameServer地址,多个地址之间以分号分隔
                consumer.setNamesrvAddr("127.0.0.1:9876");
                // 设置consumer的消费策略
                consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                // 集群模式消费,广播消费不会重试
                consumer.setMessageModel(MessageModel.CLUSTERING);
                // 设置最大重试次数,默认是16次
                consumer.setMaxReconsumeTimes(5);
                // 设置consumer所订阅的Topic和Tag,*代表全部的Tag
                consumer.subscribe("GD_runmode_syncRunmodeRecloseInfo", "*");
                // Listener,主要进行消息的逻辑处理,监听topic,如果有消息就会立即去消费
                consumer.registerMessageListener(new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                        // 获取第一条消息,进行处理
                        try {
                            if (msgs != null && msgs.size() > 0) {
                                  MessageExt messageExt = msgs.get(0);
                                  String msgBody = new String(messageExt.getBody(), "utf-8");
                                  System.out.println(" 接收消息整体为:" + msgBody);
                            }
                        } catch (Exception e) {
                            System.out.println("消息消费失败,请尝试重试!!!");
                            e.printStackTrace();
                            // 尝试重新消费,直接第三次如果还不成功就放弃消费,进行消息消费失败补偿操作
                            if (msgs.get(0).getReconsumeTimes() == 3) {
                                System.out.println("消息记录日志:" + msgs);
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            } else {
                                // 重试状态码,重试机制可配置
                                // System.out.println("消息消费失败,尝试重试!!!");
                                System.out.println("消息消费失败,请尝试重试!!!");
                                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                            }
                        }
                        System.out.println("消息消费成功!!!");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
                // 调用start()方法启动consumer
                consumer.start();
                System.out.println("消费者启动成功。。。");
                System.out.println("rocketMQConsumer 结束------");
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("消息消费操作失败--" + e.getMessage());
            }
        }
    }
  • 相关阅读:
    web.xml文件详解
    SQLSERVER dbo解释
    sqlserver BULK INSERT
    google 基站定位api
    Sqlserver中Select和Set区别
    SQL Server优化50法
    ibatis常用16条SQL
    面向对象 -- 三大特性之继承 补充 抽象类 接口类
    面向对象 -- 三大特性之继承
    面向对象 -- 类的组合
  • 原文地址:https://www.cnblogs.com/rdchen/p/15210585.html
Copyright © 2011-2022 走看看