zoukankan      html  css  js  c++  java
  • rocketmq如何新增topic

    新增topic是需要客户端直接通知broker完成的:

    通过createAndUpdateTopicConfig方法 发送给broker以后,在AdminBrokerProcessor里面负责处理这个类型消息:

       private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(null);
            final CreateTopicRequestHeader requestHeader =
                (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
            log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
    
            if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
                String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
                log.warn(errorMsg);
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(errorMsg);
                return response;
            }
    
            try {
                response.setCode(ResponseCode.SUCCESS);
                response.setOpaque(request.getOpaque());
                response.markResponseType();
                response.setRemark(null);
                ctx.writeAndFlush(response);
            } catch (Exception e) {
                log.error("Failed to produce a proper response", e);
            }
    
            TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
            topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
            topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
            topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
            topicConfig.setPerm(requestHeader.getPerm());
            topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
    
            this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
    
            this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
    
            return null;
    

     

    在updateTopicConfig方法:

        public void updateTopicConfig(final TopicConfig topicConfig) {
            TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            if (old != null) {
                log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
            } else {
                log.info("create new topic [{}]", topicConfig);
            }
    
            this.dataVersion.nextVersion();
    
            this.persist();
        }
    

      TopicConfig是复写了equal方法的,所以可以明确知道一个topci是否真的需要改变,如果需要,那么更改版本、持久化。

      在后面的registerIncrementBrokerData方法中还要进一步注册到broker里面。

      注册中心是不会持久化topic的,都是broker负责持久化,启动的时候注册到nameserver,除此之外还有定时任务定期注册到注册中心。

  • 相关阅读:
    android之StrictMode介绍
    m3u8介绍
    Spring笔记3
    android之常用命令(未完待续)
    JAVA核心技术
    Struts2笔记2
    android之lint警告This Handler class should be static or leaks might occur
    [Algorithm]01分数规划——Update:2012年7月27日
    asp生成html静态
    图片放大缩小
  • 原文地址:https://www.cnblogs.com/notlate/p/12007040.html
Copyright © 2011-2022 走看看