zoukankan      html  css  js  c++  java
  • Rocketmq 不同的topic要配不同的consumegroup

      使用Rocketmq一定要注意,如果项目中要订阅两个topic,一定要保证consumeGroup是两个不同的。

      这是因为,Consumer会定期发送心跳,默认是30s一次。心跳会像全部broker发送,心跳包内容包括groupname,topicname1。然后broker端会缓存这个信息,以groupname为key

      代码在 ClientManagerProcessor # heartBeat

    public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
            RemotingCommand response = RemotingCommand.createResponseCommand(null);
            HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
                ctx.channel(),
                heartbeatData.getClientID(),
                request.getLanguage(),
                request.getVersion()
            );
    
            for (ConsumerData
                    data : heartbeatData.getConsumerDataSet()) {
                SubscriptionGroupConfig subscriptionGroupConfig =
                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                        data.getGroupName());
                boolean isNotifyConsumerIdsChangedEnable = true;
                if (null != subscriptionGroupConfig) {
                    isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
                    int topicSysFlag = 0;
                    if (data.isUnitMode()) {
                        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                    }
                    String newTopic = MixAll.getRetryTopic(data.getGroupName());
                    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                        newTopic,
                        subscriptionGroupConfig.getRetryQueueNums(),
                        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
                }
    
                boolean changed = this.brokerController.getConsumerManager().registerConsumer(@1
                    data.getGroupName(),
                    clientChannelInfo,
                    data.getConsumeType(),
                    data.getMessageModel(),
                    data.getConsumeFromWhere(),
                    data.getSubscriptionDataSet(),
                    isNotifyConsumerIdsChangedEnable
                );
    
                if (changed) {
                    log.info("registerConsumer info changed {} {}",
                        data.toString(),
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                    );
                }
            }
    
            for (ProducerData data : heartbeatData.getProducerDataSet()) {
                this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
                    clientChannelInfo);
            }
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
            return response;
        }

      代码很长,就不一一分析了。重点是 心跳里带着数据 

      heartbeatData.getConsumerDataSet()。

    public class ConsumerData {
        private String groupName;
        private ConsumeType consumeType;
        private MessageModel messageModel;
        private ConsumeFromWhere consumeFromWhere;
        private Set<SubscriptionData> subscriptionDataSet = new HashSet<SubscriptionData>();
        private boolean unitMode;

      SubscriptionData 是关于topic的相关信息,里面最重要的就是topic

      @1处的代码

    boolean changed = this.brokerController.getConsumerManager().registerConsumer(
                    data.getGroupName(),
                    clientChannelInfo,
                    data.getConsumeType(),
                    data.getMessageModel(),
                    data.getConsumeFromWhere(),
                    data.getSubscriptionDataSet(),
                    isNotifyConsumerIdsChangedEnable
                );
    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
            ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
            final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
    
            ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
            if (null == consumerGroupInfo) {
                ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
                ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
                consumerGroupInfo = prev != null ? prev : tmp;
            }
    
            boolean r1 =
                consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                    consumeFromWhere);
            boolean r2 = consumerGroupInfo.updateSubscription(subList);
    public boolean updateSubscription(final Set<SubscriptionData> subList) {
            boolean updated = false;
    
            for (SubscriptionData sub : subList) {
                SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
                if (old == null) {
                    SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
                    if (null == prev) {
                        updated = true;
                        log.info("subscription changed, add new topic, group: {} {}",
                            this.groupName,
                            sub.toString());
                    }
                } else if (sub.getSubVersion() > old.getSubVersion()) {
                    if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                        log.info("subscription changed, group: {} OLD: {} NEW: {}",
                            this.groupName,
                            old.toString(),
                            sub.toString()
                        );
                    }
    
                    this.subscriptionTable.put(sub.getTopic(), sub);
                }
            }

      从上面的代码看得出来,如果两个不同的topic,consmer客户端不小心配成了相同的consumeGroup,在broker端的缓存里,每次心跳就有可能覆盖之前的订阅信息。导致某一个consume消费不到自己要订阅的topic了

      

  • 相关阅读:
    DS博客作业02--栈和队列
    DS博客作业02--线性表
    c博客06-2019-结构体&文件
    C博客作业05--2019-指针
    C语言博客作业04--数组
    C语言博客作业03--函数
    JAVA作业-.图书馆查书、借书、还书
    JAVA购物车
    5-互评-OO之接口-DAO模式代码阅读及应用
    DS博客作业05--查找
  • 原文地址:https://www.cnblogs.com/juniorMa/p/15137918.html
Copyright © 2011-2022 走看看