zoukankan      html  css  js  c++  java
  • RocketMQ入门(消费者)_3

    tag多个 用  || 隔开。

    消费者角色:

    1. 推式(一般建议用推式)

    2. 拉式

    消费模式:

    1. 集群(cluster)                --均衡负载消费

    2. 广播(broadcasting) --发布和订阅者模式

    MQ的消费不会清除broker中的数据,broker数据一直存在队列中,队列offset会一直递增,因此可以通过回查来获取到丢失数据。这个时候我们可以采用pull形式较好。

    push形式,MQ会记录访问的偏移量,即使宕机下次重启也会按照顺序继续消费,不会出现重复消费。

    RocketMQ入门(生产者)_2中已经写过一个推式的代码,接下来就看下拉式。

    /**
     * 普通拉式消费者,代码编写
     * @author DennyZhao
     *
     */
    public class PullConsumer {
        
        /**
         * 暂时以map作为offset入库看待。<queueId, offset>
         */
        private static Map<String, Long> offsetMap = new HashMap<String, Long>();
    
        public static void main(String[] args) throws UnsupportedEncodingException {
            //创建拉式消费者
            DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("pullConsumerGroup");
            pullConsumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
            try {
                pullConsumer.start();
                Set<MessageQueue> mqSet= pullConsumer.fetchSubscribeMessageQueues("fruit");
                while(true) {
                //循环队列
                for(MessageQueue mq: mqSet) {
                    // 从队列中获取固定偏移值
                    PullResult pullResult = pullConsumer.pullBlockIfNotFound(mq, "*", getOffset(mq), 32);
                    setOffset(mq, pullResult.getNextBeginOffset());
                    switch(pullResult.getPullStatus()) {
                    case FOUND:
                        List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
                        for(MessageExt msg : msgFoundList) {
                            String fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                            System.out.println(fruit + "   -----fruit");
                        }
                        break;
                    case NO_NEW_MSG:
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    }
                }
                Thread.sleep(2000);
                }
            } catch (MQClientException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (RemotingException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (MQBrokerException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
        /**
         * set offset
         * @param mq
         * @param nextBeginOffset
         */
        private static void setOffset(MessageQueue mq, long nextBeginOffset) {
            String queueId = mq.getBrokerName() + mq.getTopic() + mq.getQueueId();
            offsetMap.put(queueId, nextBeginOffset);
        }
    
        /**
         * 获取固定偏移值
         * @param mq queueId
         * @return int
         */
        private static long getOffset(MessageQueue mq) {
            String queueId = mq.getBrokerName() + mq.getTopic() + mq.getQueueId();
            Long offset =  offsetMap.get(queueId);
            if(offset == null) {
                offset = 0l;
            }
            System.out.println(offset + "---------------");
            return offset;
        }
    
    }

     使用Schedule拉式:

    /**
     * ScheduleService 進行數據拉取
     * @author DennyZhao
     *
     */
    public class PullScheduleService {
    
        public static void main(String[] args) throws MQClientException {
            MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("scheduleConsumers");
            scheduleService.setMessageModel(MessageModel.CLUSTERING);
            scheduleService.setPullThreadNums(4);
            DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer("pullConsumer");
            defaultMQPullConsumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
            scheduleService.setDefaultMQPullConsumer(defaultMQPullConsumer);
            scheduleService.registerPullTaskCallback("fruit", new PullTaskCallback() {
                /**
                 * 数据处理
                 */
                @Override
                public void doPullTask(MessageQueue mq, PullTaskContext context) {
                    MQPullConsumer pullConsumer = context.getPullConsumer();
                    try {
                        long offset = pullConsumer.fetchConsumeOffset(mq, false);
                        PullResult pull = pullConsumer.pull(mq, "*", offset, 32);
                        switch(pull.getPullStatus()) {
                        case FOUND:
                            // 结果输出
                            List<MessageExt> msgFoundList = pull.getMsgFoundList();
                            for(MessageExt msg : msgFoundList) {
                                String fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
                                System.out.println("result:   " + fruit);
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        default:
                            
                        }
                        // 获取下一个循环的offset
                        pullConsumer.updateConsumeOffset(mq, pull.getNextBeginOffset());
                        // 设置下次访问时间
                        context.setPullNextDelayTimeMillis(1000);
                    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException | UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
            });
            scheduleService.start();
        }
    
    }

     参数说明:

    //push主要参数
    DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("pushConsumerGroup");
    // 从何地开始,默认(CONSUME_FROM_LAST_OFFSET) 
    pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    pushConsumer.setConsumeThreadMin(2); //最小线程数
    pushConsumer.setConsumeThreadMax(8); //最大线程数
    pushConsumer.setConsumeTimeout(5000); //连接超时
    pushConsumer.setMessageModel(MessageModel.CLUSTERING);//消息模式(集群CLUSTERING和广播BROADCASTING,default:cluster)
    pushConsumer.setConsumeConcurrentlyMaxSpan(1000);//单队列最大消费数1000
    pushConsumer.setConsumeMessageBatchMaxSize(1); //批量消费数1 ,这个就是默认值,因此我们从list中每次只取一个,因为也就只有一个
    pushConsumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");//集群IP
    pushConsumer.setHeartbeatBrokerInterval(2000); //心跳监测
    pushConsumer.setMaxReconsumeTimes(3);//重复消费次数,用于失败后重试
    pushConsumer.queryMessage(topic, key, maxNum, begin, end); //获取消息
    pushConsumer.fetchSubscribeMessageQueues(topic);//订阅topic
    pushConsumer.registerMessageListener(new MessageListenerConcurrently());//及时普通消费型
    pushConsumer.registerMessageListener(new MessageListenerOrderly()); //严格顺序消费型;
            // pull常用参数
    //消息模式(集群CLUSTERING和广播BROADCASTING,default:cluster) pullConsumer.setMessageModel(MessageModel.CLUSTERING); pullConsumer.fetchSubscribeMessageQueues(topic); //订阅主题 pullConsumer.fetchConsumeOffset(mq, false); //获取queue当前offset位置 pullConsumer.pullBlockIfNotFound(mq, subExpression, offset, maxNums);//获取消费内容 pullConsumer.updateConsumeOffset(mq, offset); //更新消费位置 pullConsumer.setConsumerPullTimeoutMillis(5000); //连接超时

     对于push

    pushConsumer.setConsumeMessageBatchMaxSize(1) ;
    默认是1个,因此list中我们get(0).
    如果调整MaxSize,那么中途异常需要 使用 context.setAckIndex(i)然后直接返回SUCCESS。这样就标记到i是成功的(i从0开始)。其它都是未响应的。
    pushConsumer.setMaxReconsumeTimes(3);  
    失败重试次数,当消费失败后,数据会写入 %RETRY%consumerGroup,如果还是消费失败则进入死信队列。%DLQ%consumerGroup












  • 相关阅读:
    js的new操作符深度解析
    vue的v-if和v-show的区别
    gulp的简单打包示例(一)
    vue报错Error in v-on handler: "RangeError: Maximum call stack size exceeded"
    svg图片在vue脚手架vue-cli怎么使用
    charles 抓包 https 证书
    navicat 批量插入 测试数据
    Zookeeper + Guava loading cache 实现分布式缓存
    Zookeeper Curator API 使用
    Zookeeper JAVA API的使用
  • 原文地址:https://www.cnblogs.com/DennyZhao/p/9900297.html
Copyright © 2011-2022 走看看