zoukankan      html  css  js  c++  java
  • RocketMQ之NameServer学习笔记

    org.apache.rocketmq.namesrv.NamesrvController

        NameserController,NameServer的核心控制类。

    1.1 NamesrvConfig 

    NamesrvConfig,主要指定nameserver的相关配置目录属性

    1)kvConfigPath(kvConfig.json)

    2)mqhome/namesrv/namesrv.properties

    3)orderMessageEnable,是否开启顺序消息功能,默认为false

     

    1.2 ScheduledExecutorService scheduledExecutorService

        private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryIm               pl("NSScheduledThread"));

       NameServer 定时任务执行线程池,一个线程,默认定时执行两个任务:

       任务1、每隔10s扫描broker,维护当前存活的Broker信息

       任务2、每隔10s打印KVConfig信息。

     

    1.3 KVConfigManager 

       读取或变更NameServer的配置属性,加载NamesrvConfig中配置的配置文件到内存,此类一个亮点就是使用轻量级的非线程安全容器,再结合读写锁对资源读写进行保护。尽最大程度提高线程的并发度。

     

    1.4 RouteInfoManager 

        NameServer数据的载体,记录Broker,Topic等信息。

    //NameServer 与 Broker 空闲时长,默认2分钟,在2分钟内Nameserver没有收到Broker的心跳包,则关闭该连接。 
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;  
    //读写锁,用来保护非线程安全容器HashMap                                       
        private final ReadWriteLock lock = new ReentrantReadWriteLock(); 
    //topicQueueTable,主题与队列关系,记录一个主题的队列分布在哪些Broker上,每个Broker上存在该主题的队列个数                                                     
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;  
        //brokerAddrTable,所有Broker信息,使用brokerName当key,BrokerData信息描述每一个broker信息。
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    //clusterAddrTable,broker集群信息,每个集群包含哪些Broker                               
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;   
    //brokerLiveTable,当前存活的Broker,该信息不是实时的,NameServer每10S扫描一次所有的broker,根据心跳包的时间得知broker的状态,该机制也是导致当一个master Down掉后,消息生产者无法感知,可能继续向Down掉的Master发送消息,导致失败
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;                                
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;        
    /**
     *  Broker信息;key为brokerName,value为BrokerData
     */
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    
    public class BrokerData implements Comparable<BrokerData> {
        /**
         * Cluster名称
         */
        private String cluster;
        /**
         * broker名称
         */
        private String brokerName;
        /**
         * 0->ip:port
         */
        private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
    }
    • 注册topic信息topicQueueTable
    /**
     * 消息队列路由信息;key为topic,value为QueueData
     */
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    
    public class QueueData implements Comparable<QueueData> {
        /**
         * Broker名称
         */
        private String brokerName;
        /**
         * 读队列个数,默认4个
         */
        private int readQueueNums;
        /**
         * 写队列个数,默认4个
         */
        private int writeQueueNums;
        /**
         * 队列权限
         */
        private int perm;
        /**
         * 配置的,同步复制还是异步复制标记,对应TopicConfig.topicSysFlag
         *
         */
        private int topicSynFlag;
    }
    /**
     * Broker状态信息,NameServer每次收到心跳包会替换该信息,每隔30秒更新一次
     * brokerAddr: ip:port->{}
     */
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    
    class BrokerLiveInfo {
        /**
         * 存储上次收到心跳包的时间,每隔30秒更新一次
         */
        private long lastUpdateTimestamp;
        private DataVersion dataVersion;
        private Channel channel;
        private String haServerAddr;
    }
    • NamesrvStartup.java 启动入口类,NameServer 启动默认端口9876
    nettyServerConfig.setListenPort(9876)
    • 每10秒钟扫描一次,移除失效的broker,同时删除缓存元数据信息
    //初始化NameServer
    boolean initResult = controller.initialize();
    public boolean initialize() {
        //加载KV配置
        this.kvConfigManager.load();
    
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
    
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
    
        this.registerProcessor();
        //每10秒钟扫描一次,移除失效的broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);
        //每隔10秒钟打印一次KV配置信息
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);
    
        return true;
    }
    • 失效时间为2分钟,即:Broker在2分钟内未上报心跳会被移除
    /**
     * 失效时间为2分钟,即:Broker在2分钟内未上报心跳会被移除
     */
    public void scanNotActiveBroker() {
        Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, BrokerLiveInfo> next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    2.DefaultRequestProcessor
    • 用于响应客户端、Broker的请求。主要向NameServer发送心跳包、获取Cluster、Broker、Topic元数据信息。
    • 调用链:
      在NameServer启动时注册,NamesrvController.initialize()->registerProcessor()
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        if (log.isDebugEnabled()) {
            log.debug("receive request, {} {} {}",
                request.getCode(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                request);
        }
    
        switch (request.getCode()) {
            case RequestCode.PUT_KV_CONFIG://增加NameServer配置信息;由DefaultMQAdminExt使用
                return this.putKVConfig(ctx, request);
            case RequestCode.GET_KV_CONFIG://根据NameSpace和key获取NameServer配置信息;由DefaultMQAdminExt使用
                return this.getKVConfig(ctx, request);
            case RequestCode.DELETE_KV_CONFIG: //据NameSapce和Key删除NameServerr配置信息
                return this.deleteKVConfig(ctx, request);
            case RequestCode.REGISTER_BROKER: //注册Broker信息;由BrokerOuterAPI.registerBroker使用,在BrokerController启动时调用
                Version brokerVersion = MQVersion.value2Version(request.getVersion());
                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                    return this.registerBrokerWithFilterServer(ctx, request);
                } else {
                    return this.registerBroker(ctx, request);
                }
            case RequestCode.UNREGISTER_BROKER://移除注销broker信息;由BrokerOuterAPI.unregisterBroker使用,在BrokerController.shutdown时调用
                return this.unregisterBroker(ctx, request);
            case RequestCode.GET_ROUTEINTO_BY_TOPIC: //获取Topic路由信息 TopicRouteData
                return this.getRouteInfoByTopic(ctx, request);
            case RequestCode.GET_BROKER_CLUSTER_INFO://获取Cluster及Broker信息
                return this.getBrokerClusterInfo(ctx, request);
            case RequestCode.WIPE_WRITE_PERM_OF_BROKER: //去除该broker上所有topic的写权限
                return this.wipeWritePermOfBroker(ctx, request);
            case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: //获取所有的Topic列表
                return getAllTopicListFromNameserver(ctx, request);
            case RequestCode.DELETE_TOPIC_IN_NAMESRV: //从nameServer中删除topic
                return deleteTopicInNamesrv(ctx, request);
            case RequestCode.GET_KVLIST_BY_NAMESPACE: //获取配置信息 configTable
                return this.getKVListByNamespace(ctx, request);
            case RequestCode.GET_TOPICS_BY_CLUSTER: //获取该集群下的所有topic list
                return this.getTopicsByCluster(ctx, request);
            case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: // 此处意思为:系统会将集群名称、broker名称作为默认topic创建。现在获取这类topic
                return this.getSystemTopicListFromNs(ctx, request);
            case RequestCode.GET_UNIT_TOPIC_LIST: //暂无使用
                return this.getUnitTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: //暂无使用
                return this.getHasUnitSubTopicList(ctx, request);
            case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST://暂无使用
                return this.getHasUnitSubUnUnitTopicList(ctx, request);
            case RequestCode.UPDATE_NAMESRV_CONFIG: //更新properties请求
                return this.updateConfig(ctx, request);
            case RequestCode.GET_NAMESRV_CONFIG: //获取properties内容
                return this.getConfig(ctx, request);
            default:
                break;
        }
        return null;
    }

    注册broker信息

    • Broker每隔30秒向所有的NameServer上报Topic注册信息
    • Broker调用链
      BrokerController.start()->this.registerBrokerAll()->this.brokerOuterAPI.registerBrokerAll()
    //每隔30秒向所有的NameServer上报Topic注册信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
        @Override
        public void run() {
            try {
                BrokerController.this.registerBrokerAll(true, false);
            } catch (Throwable e) {
                log.error("registerBrokerAll Exception", e);
            }
        }
    }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
    • 服务端处理主要包括:注册集群信息clusterAddrTable、注册broker信息brokerAddrTable、
      注册topic信息topicQueueTable、broker心跳包brokerLiveTable
    • NameServer处理链
      DefaultRequestProcessor->processRequest->RequestCode.REGISTER_BROKER->this.registerBroker->RouteInfoManager.registerBroker()
    public RegisterBrokerResult registerBroker(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final Channel channel) {
        RegisterBrokerResult result = new RegisterBrokerResult();
        try {
            try {
                //加写锁,防止并发修改
                this.lock.writeLock().lockInterruptibly();
                //注册集群信息
                Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
                if (null == brokerNames) {
                    brokerNames = new HashSet<String>();
                    this.clusterAddrTable.put(clusterName, brokerNames);
                }
                brokerNames.add(brokerName);
    
                boolean registerFirst = false;
                //注册broker信息
                BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                if (null == brokerData) {
                    registerFirst = true;
                    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                    this.brokerAddrTable.put(brokerName, brokerData);
                }
                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
                registerFirst = registerFirst || (null == oldAddr);
                //Topic配置变化了;Master Broker第一次注册或者Topic dataVersion不相同时更新路由信息
                //有Topic新增时dataVersion会递增
                if (null != topicConfigWrapper //
                    && MixAll.MASTER_ID == brokerId) {
                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
                        || registerFirst) {
                        ConcurrentMap<String, TopicConfig> tcTable =
                            topicConfigWrapper.getTopicConfigTable();
                        if (tcTable != null) {
                            for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                                this.createAndUpdateQueueData(brokerName, entry.getValue()); //更新topicQueueTable
                            }
                        }
                    }
                }
                //更新broker心跳信息
                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                    new BrokerLiveInfo(
                        System.currentTimeMillis(),
                        topicConfigWrapper.getDataVersion(),
                        channel,
                        haServerAddr));
                //新broker注册时会有日志输出
                if (null == prevBrokerLiveInfo) {
                    log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
                }
                //更新filterServer信息
                if (filterServerList != null) {
                    if (filterServerList.isEmpty()) {
                        this.filterServerTable.remove(brokerAddr);
                    } else {
                        this.filterServerTable.put(brokerAddr, filterServerList);
                    }
                }
                //Slave设置MasterAddr和HaServerAddr
                if (MixAll.MASTER_ID != brokerId) {
                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                    if (masterAddr != null) {
                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                        if (brokerLiveInfo != null) {
                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                            result.setMasterAddr(masterAddr);
                        }
                    }
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } catch (Exception e) {
            log.error("registerBroker Exception", e);
        }
    
        return result;
    }
     
  • 相关阅读:
    es集群状态
    浅谈GO语言中的面向对象
    jstat命令详解
    jvm g1gc回收器
    解决ES集群状态异常教程(存在UNASSIGNED)
    html5分割上传实现超大文件无插件网页上传工具
    html5分割上传实现超大文件无插件网页上传
    科讯使用的:ckeditor编辑器.复制word图片.一直沾不上去.谁有好的解决办法呢
    编辑器直接word直接上传word里的图片
    请问有支持直接从 word 文档复制图片的 editor 吗
  • 原文地址:https://www.cnblogs.com/laowz/p/10781060.html
Copyright © 2011-2022 走看看