zoukankan      html  css  js  c++  java
  • MyCat源码分析系列之——配置信息和启动流程

    更多MyCat源码分析,请戳MyCat源码分析系列


     MyCat配置信息

    除了一些默认的配置参数,大多数的MyCat配置信息是通过读取若干.xml/.properties文件获取的,主要包括:

    1)server.xml:系统和用户相关配置

    2)schema.xml:虚拟库、表、数据节点配置等

    3)rule.xml:分片规则设置

    4)cacheservice.properties:缓存相关设置

    5)dnindex.properties:datahost主从切换配置文件

    6)sequence_conf.properties:本地全局序列号配置文件

    而在代码层面,与配置相关的类主要包括3个:

    1)MycatConfig:最为重要的配置类

    2)ReloadConfig:用于通过管理端口执行mysql> reload @@config或config_all命令,重新载入配置文件

    3)RollbackConfig:用于通过管理端口执行mysql> rollback @@config命令,将配置信息回滚至reload之前的状态

    接下来重点介绍MycatConfig,它最关键的属性如下:

    private volatile SystemConfig system;
    private volatile MycatCluster cluster;
    private volatile MycatCluster _cluster;
    private volatile QuarantineConfig quarantine;
    private volatile QuarantineConfig _quarantine;
    private volatile Map<String, UserConfig> users;
    private volatile Map<String, UserConfig> _users;
    private volatile Map<String, SchemaConfig> schemas;
    private volatile Map<String, SchemaConfig> _schemas;
    private volatile Map<String, PhysicalDBNode> dataNodes;
    private volatile Map<String, PhysicalDBNode> _dataNodes;
    private volatile Map<String, PhysicalDBPool> dataHosts;
    private volatile Map<String, PhysicalDBPool> _dataHosts;
    • SystemConfig:包含了诸多系统相关的配置参数(如端口、编码、线程池大小、BufferPool大小、隔离级别等)
    • MycatCluster:MyCat集群配置信息
    • QuarantineConfig:用户权限隔离(黑白名单)
    • UserConfig:用户配置,包含用户名/密码和允许访问的虚拟库
    • SchemaConfig:虚拟库配置,包括所有下属的表配置(

      TableConfig

      )以及这些表涉及的datanode
    • PhysicalDBNode:datanode相关,对应一个数据库实例中的数据库
    • PhysicalDBPool:datahost相关,里面包含了DataHostConfig配置,包括所有writeHosts和readHosts以及读写分离类型等

    注意到除了SystemConfig,其余属性都还有一个前缀加了_的同名属性,这些属性其实是作为备份的,用于reload/rollback配置文件时的切换。reload和rollback相关的方法如下:

    public void reload(Map<String, UserConfig> users,
                Map<String, SchemaConfig> schemas,
                Map<String, PhysicalDBNode> dataNodes,
                Map<String, PhysicalDBPool> dataHosts, MycatCluster cluster,
                QuarantineConfig quarantine,boolean reloadAll) {
            apply(users, schemas, dataNodes, dataHosts, cluster, quarantine,reloadAll);
            this.reloadTime = TimeUtil.currentTimeMillis();
            this.status = reloadAll?RELOAD_ALL:RELOAD;
    }
    public void rollback(Map<String, UserConfig> users,
                Map<String, SchemaConfig> schemas,
                Map<String, PhysicalDBNode> dataNodes,
                Map<String, PhysicalDBPool> dataHosts, MycatCluster cluster,
                QuarantineConfig quarantine) {
            apply(users, schemas, dataNodes, dataHosts, cluster, quarantine,status==RELOAD_ALL);
            this.rollbackTime = TimeUtil.currentTimeMillis();
            this.status = ROLLBACK;
    }

    在reload的时候注意到有reload和reload_all,区别就在于前者不会重新加载与datahost/datanode相关的更改,而后者会。


     启动流程

    MyCat的启动类为MycatStartup,而主体为MycatServer,其中主要分为两个步骤:

    1)初始化:此过程在MycatServer构造函数时执行,包括配置文件的读取、CacheService和RouteService的创建等

    public MycatServer() {
            this.config = new MycatConfig();
            this.timer = new Timer(NAME + "Timer", true);
            this.sqlRecorder = new SQLRecorder(config.getSystem()
                    .getSqlRecordCount());
            this.isOnline = new AtomicBoolean(true);
            cacheService = new CacheService();
            routerService = new RouteService(cacheService);
            // load datanode active index from properties
            dnIndexProperties = loadDnIndexProps();
            try {
                sqlInterceptor = (SQLInterceptor) Class.forName(
                        config.getSystem().getSqlInterceptor()).newInstance();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            catletClassLoader = new DynaClassLoader(SystemConfig.getHomePath()
                    + File.separator + "catlet", config.getSystem()
                    .getCatletClassCheckSeconds());
            this.startupTime = TimeUtil.currentTimeMillis();
    }

    2)运行:此过程由startup()方法触发,包括处理器对象创建、bufferpool创建、处理线程池创建、AIOConnector/NIOConnector创建与启动、两个AIOAcceptor/NIOAcceptor创建与启动、后端数据库的初始连接建立、定时器线程池/定时任务创建与启动

    public void startup() throws IOException {
    
            SystemConfig system = config.getSystem();
            int processorCount = system.getProcessors();
    
            // server startup
            LOGGER.info("===============================================");
            LOGGER.info(NAME + " is ready to startup ...");
            String inf = "Startup processors ...,total processors:"
                    + system.getProcessors() + ",aio thread pool size:"
                    + system.getProcessorExecutor()
                    + "    
     each process allocated socket buffer pool "
                    + " bytes ,buffer chunk size:"
                    + system.getProcessorBufferChunk()
                    + "  buffer pool's capacity(buferPool/bufferChunk) is:"
                    + system.getProcessorBufferPool()
                    / system.getProcessorBufferChunk();
            LOGGER.info(inf);
            LOGGER.info("sysconfig params:" + system.toString());
    
            // startup manager
            ManagerConnectionFactory mf = new ManagerConnectionFactory();
            ServerConnectionFactory sf = new ServerConnectionFactory();
            SocketAcceptor manager = null;
            SocketAcceptor server = null;
            aio = (system.getUsingAIO() == 1);
    
            // startup processors
            int threadPoolSize = system.getProcessorExecutor();
            processors = new NIOProcessor[processorCount];
            long processBuferPool = system.getProcessorBufferPool();
            int processBufferChunk = system.getProcessorBufferChunk();
            int socketBufferLocalPercent = system.getProcessorBufferLocalPercent();
            bufferPool = new BufferPool(processBuferPool, processBufferChunk,
                    socketBufferLocalPercent / processorCount);
            businessExecutor = ExecutorUtil.create("BusinessExecutor",
                    threadPoolSize);
            timerExecutor = ExecutorUtil.create("Timer", system.getTimerExecutor());
            listeningExecutorService = MoreExecutors.listeningDecorator(businessExecutor);
    
            for (int i = 0; i < processors.length; i++) {
                processors[i] = new NIOProcessor("Processor" + i, bufferPool,
                        businessExecutor);
            }
    
            if (aio) {
                LOGGER.info("using aio network handler ");
                asyncChannelGroups = new AsynchronousChannelGroup[processorCount];
                // startup connector
                connector = new AIOConnector();
                for (int i = 0; i < processors.length; i++) {
                    asyncChannelGroups[i] = AsynchronousChannelGroup
                            .withFixedThreadPool(processorCount,
                                    new ThreadFactory() {
                                        private int inx = 1;
    
                                        @Override
                                        public Thread newThread(Runnable r) {
                                            Thread th = new Thread(r);
                                            th.setName(BufferPool.LOCAL_BUF_THREAD_PREX
                                                    + "AIO" + (inx++));
                                            LOGGER.info("created new AIO thread "
                                                    + th.getName());
                                            return th;
                                        }
                                    });
    
                }
                manager = new AIOAcceptor(NAME + "Manager", system.getBindIp(),
                        system.getManagerPort(), mf, this.asyncChannelGroups[0]);
    
                // startup server
    
                server = new AIOAcceptor(NAME + "Server", system.getBindIp(),
                        system.getServerPort(), sf, this.asyncChannelGroups[0]);
    
            } else {
                LOGGER.info("using nio network handler ");
                NIOReactorPool reactorPool = new NIOReactorPool(
                        BufferPool.LOCAL_BUF_THREAD_PREX + "NIOREACTOR",
                        processors.length);
                connector = new NIOConnector(BufferPool.LOCAL_BUF_THREAD_PREX
                        + "NIOConnector", reactorPool);
                ((NIOConnector) connector).start();
    
                manager = new NIOAcceptor(BufferPool.LOCAL_BUF_THREAD_PREX + NAME
                        + "Manager", system.getBindIp(), system.getManagerPort(),
                        mf, reactorPool);
    
                server = new NIOAcceptor(BufferPool.LOCAL_BUF_THREAD_PREX + NAME
                        + "Server", system.getBindIp(), system.getServerPort(), sf,
                        reactorPool);
            }
            // manager start
            manager.start();
            LOGGER.info(manager.getName() + " is started and listening on "
                    + manager.getPort());
            server.start();
            // server started
            LOGGER.info(server.getName() + " is started and listening on "
                    + server.getPort());
            LOGGER.info("===============================================");
            // init datahost
            Map<String, PhysicalDBPool> dataHosts = config.getDataHosts();
            LOGGER.info("Initialize dataHost ...");
            for (PhysicalDBPool node : dataHosts.values()) {
                String index = dnIndexProperties.getProperty(node.getHostName(),
                        "0");
                if (!"0".equals(index)) {
                    LOGGER.info("init datahost: " + node.getHostName()
                            + "  to use datasource index:" + index);
                }
                node.init(Integer.valueOf(index));
                node.startHeartbeat();
            }
            long dataNodeIldeCheckPeriod = system.getDataNodeIdleCheckPeriod();
            timer.schedule(updateTime(), 0L, TIME_UPDATE_PERIOD);
            timer.schedule(processorCheck(), 0L, system.getProcessorCheckPeriod());
            timer.schedule(dataNodeConHeartBeatCheck(dataNodeIldeCheckPeriod), 0L,
                    dataNodeIldeCheckPeriod);
            timer.schedule(dataNodeHeartbeat(), 0L,
                    system.getDataNodeHeartbeatPeriod());
            timer.schedule(catletClassClear(), 30000);
    }

    从以上代码中不难看出,connector用于作为客户端与后端MySQL建立连接,而server和manager则作为服务端接受来自前端应用的连接请求,其中server负责常规业务流程(默认端口8066),而manager负责监控与管理(默认端口9066)。

    而datahost的初始化行为由node.init(Integer.valueOf(index));触发,其目的是每个datahost中writehost会连续创建若干初始连接供使用(该数量由schema.xml中datahost标签的minCon属性确定),当后续连接不足时会创建新的连接(最大不超过maxCon)。

    由此,MyCat程序启动完成,等待接受来自应用的连接请求和后续的命令处理。


    为尊重原创成果,如需转载烦请注明本文出处:http://www.cnblogs.com/fernandolee24/p/5193983.html,特此感谢

  • 相关阅读:
    springcloud之zuul
    rabbitmq工作模式(三)Topics通配符模式
    rabbitMQ工作模式(二)路由模式
    rabbitmq工作模式(一)发布订阅模式
    Eureka使用案例
    SpringCloud入门
    微服务
    F查询和Q查询,摘自李文周老师
    django08 orm之增删改查
    django07 字符串替换
  • 原文地址:https://www.cnblogs.com/fernandolee24/p/5193983.html
Copyright © 2011-2022 走看看