zoukankan      html  css  js  c++  java
  • 数据库路由中间件MyCat

    此文已由作者张镐薪授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。

    4.配置模块

    每个MyCatServer初始化时,会初始化: MyCatServer.java:

    public static final String NAME = "MyCat";private static final long LOG_WATCH_DELAY = 60000L;private static final long TIME_UPDATE_PERIOD = 20L;private static final MycatServer INSTANCE = new MycatServer();private static final Logger LOGGER = LoggerFactory.getLogger("MycatServer");private final RouteService routerService;private final CacheService cacheService;private Properties dnIndexProperties;//AIO连接群组private AsynchronousChannelGroup[] asyncChannelGroups;private volatile int channelIndex = 0;//全局序列号private final MyCATSequnceProcessor sequnceProcessor = new MyCATSequnceProcessor();private final DynaClassLoader catletClassLoader;private final SQLInterceptor sqlInterceptor;private volatile int nextProcessor;private BufferPool bufferPool;private boolean aio = false;//XA事务全局ID生成private final AtomicLong xaIDInc = new AtomicLong();private MycatServer() {    //读取文件配置
        this.config = new MycatConfig();    //定时线程池,单线程线程池
        scheduler = Executors.newSingleThreadScheduledExecutor();    //SQL记录器
        this.sqlRecorder = new SQLRecorder(config.getSystem()
                .getSqlRecordCount());    /**
         * 是否在线,MyCat manager中有命令控制
         * | offline | Change MyCat status to OFF |
         * | online | Change MyCat status to ON |
         */
        this.isOnline = new AtomicBoolean(true);    //缓存服务初始化
        cacheService = new CacheService();    //路由计算初始化
        routerService = new RouteService(cacheService);    // load datanode active index from properties
        dnIndexProperties = loadDnIndexProps();    try {        //SQL解析器
            sqlInterceptor = (SQLInterceptor) Class.forName(
                    config.getSystem().getSqlInterceptor()).newInstance();
        } catch (Exception e) {        throw new RuntimeException(e);
        }    //catlet加载器
        catletClassLoader = new DynaClassLoader(SystemConfig.getHomePath()
                + File.separator + "catlet", config.getSystem()
                .getCatletClassCheckSeconds());    //记录启动时间
        this.startupTime = TimeUtil.currentTimeMillis();
    }

    第一步是读取文件配置,主要是三个文件:schema.xml,rule.xml和server.xml. 读取后的配置会加载到MyCatConfig中。 MyCatConfig.java:

    public MycatConfig() {//读取schema.xml,rule.xml和server.xmlConfigInitializer confInit = new ConfigInitializer(true);this.system = confInit.getSystem();this.users = confInit.getUsers();this.schemas = confInit.getSchemas();this.dataHosts = confInit.getDataHosts();this.dataNodes = confInit.getDataNodes();for (PhysicalDBPool dbPool : dataHosts.values()) {
        dbPool.setSchemas(getDataNodeSchemasOfDataHost(dbPool.getHostName()));
    }this.quarantine = confInit.getQuarantine();this.cluster = confInit.getCluster();//初始化重加载配置时间this.reloadTime = TimeUtil.currentTimeMillis();this.rollbackTime = -1L;this.status = RELOAD;//配置加载锁this.lock = new ReentrantLock();
    }

    它们都通过ConfigInitializer读取:

    public ConfigInitializer(boolean loadDataHost) {    //读取schema.xml
        SchemaLoader schemaLoader = new XMLSchemaLoader();    //读取server.xml
        XMLConfigLoader configLoader = new XMLConfigLoader(schemaLoader);
        schemaLoader = null;    //加载配置
        this.system = configLoader.getSystemConfig();    this.users = configLoader.getUserConfigs();    this.schemas = configLoader.getSchemaConfigs();    //是否重新加载DataHost和对应的DataNode
        if (loadDataHost) {        this.dataHosts = initDataHosts(configLoader);        this.dataNodes = initDataNodes(configLoader);
        }    //权限管理
        this.quarantine = configLoader.getQuarantineConfig();    this.cluster = initCobarCluster(configLoader);    //不同类型的全局序列处理器的配置加载
        if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_MYSQLDB) {
            IncrSequenceMySQLHandler.getInstance().load();
        }    if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_LOCAL_TIME) {
            IncrSequenceTimeHandler.getInstance().load();
        }    //检查user与schema配置对应以及schema配置不为空
        this.checkConfig();
    }

    4.1 rule.xml

    读取schema之前会先读取rule.xml。 XmlSchemaLoader.java:

    public XMLSchemaLoader(String schemaFile, String ruleFile) {    //先读取rule.xml
        XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile);    this.tableRules = ruleLoader.getTableRules();
        ruleLoader = null;    this.dataHosts = new HashMap<String, DataHostConfig>();    this.dataNodes = new HashMap<String, DataNodeConfig>();    this.schemas = new HashMap<String, SchemaConfig>();    //读取加载schema配置
        this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile);
    }public XMLSchemaLoader() {    this(null, null);
    }

    XMLRuleLoader.java:

    public XMLRuleLoader(String ruleFile) {    // this.rules = new HashSet<RuleConfig>();
        //rule名 -> rule
        this.tableRules = new HashMap<String, TableRuleConfig>();    //function名 -> 具体分片算法
        this.functions = new HashMap<String, AbstractPartitionAlgorithm>();    //默认为:/rule.dtd和/rule.xml
        load(DEFAULT_DTD, ruleFile == null ? DEFAULT_XML : ruleFile);
    }public XMLRuleLoader() {    this(null);
    }
    private void load(String dtdFile, String xmlFile) {
        InputStream dtd = null;
        InputStream xml = null;    try {
            dtd = XMLRuleLoader.class.getResourceAsStream(dtdFile);
            xml = XMLRuleLoader.class.getResourceAsStream(xmlFile);        //读取出语意树
            Element root = ConfigUtil.getDocument(dtd, xml)
                    .getDocumentElement();        //加载Function
            loadFunctions(root);        //加载TableRule
            loadTableRules(root);
        } catch (ConfigException e) {        throw e;
        } catch (Exception e) {        throw new ConfigException(e);
        } finally {        if (dtd != null) {            try {
                    dtd.close();
                } catch (IOException e) {
                }
            }        if (xml != null) {            try {
                    xml.close();
                } catch (IOException e) {
                }
            }
        }
    }

    ConfigUtil.java解析语意树:

    public static Document getDocument(final InputStream dtd, InputStream xml) throws ParserConfigurationException,
                SAXException, IOException {
        DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
        factory.setValidating(true);
        factory.setNamespaceAware(false);
        DocumentBuilder builder = factory.newDocumentBuilder();
        builder.setEntityResolver(new EntityResolver() {        @Override
            public InputSource resolveEntity(String publicId, String systemId) {            return new InputSource(dtd);
            }
        });
        builder.setErrorHandler(new ErrorHandler() {        @Override
            public void warning(SAXParseException e) {
            }        @Override
            public void error(SAXParseException e) throws SAXException {            throw e;
            }        @Override
            public void fatalError(SAXParseException e) throws SAXException {            throw e;
            }
        });    return builder.parse(xml);
    }

    加载functions,XmlRuleLoader.java

    private void loadFunctions(Element root) throws ClassNotFoundException,
                InstantiationException, IllegalAccessException,
                InvocationTargetException {
            NodeList list = root.getElementsByTagName("function");        for (int i = 0, n = list.getLength(); i < n; ++i) {
                Node node = list.item(i);            if (node instanceof Element) {
                    Element e = (Element) node;                //获取name标签
                    String name = e.getAttribute("name");                //如果Map已有,则function重复
                    if (functions.containsKey(name)) {                    throw new ConfigException("rule function " + name
                                + " duplicated!");
                    }                //获取class标签
                    String clazz = e.getAttribute("class");                //根据class利用反射新建分片算法
                    AbstractPartitionAlgorithm function = createFunction(name, clazz);
    
                    ParameterMapping.mapping(function, ConfigUtil.loadElements(e));                //每个AbstractPartitionAlgorithm可能会实现init来初始化
                    function.init();                //放入functions map
                    functions.put(name, function);
                }
            }
        }private AbstractPartitionAlgorithm createFunction(String name, String clazz)
            throws ClassNotFoundException, InstantiationException,
            IllegalAccessException, InvocationTargetException {
        Class<?> clz = Class.forName(clazz);    //判断是否继承AbstractPartitionAlgorithm
        if (!AbstractPartitionAlgorithm.class.isAssignableFrom(clz)) {        throw new IllegalArgumentException("rule function must implements "
                    + AbstractPartitionAlgorithm.class.getName() + ", name=" + name);
        }
        return (AbstractPartitionAlgorithm) clz.newInstance();
    }

    加载所有的function的node,每一个node就是一个AbstractPartitionAlgorithm,并放入functions这个map中;

    private final Map<String, TableRuleConfig> tableRules;


    对于每一个node,通过反射新建对应参数的AbstractPartitionAlgorithm。这样,所有的function就加载到了functions这个map中。 同理,加载TableRule,就加上了function是否存在的判断:


    /**
    * tableRule标签结构:
     * <tableRule name="sharding-by-month">
     *     <rule>
     *         <columns>create_date</columns>
     *         <algorithm>partbymonth</algorithm>
     *     </rule>
     * </tableRule>
     * @param root
     * @throws SQLSyntaxErrorException
        */private void loadTableRules(Element root) throws SQLSyntaxErrorException {    //获取每个tableRule标签
        NodeList list = root.getElementsByTagName("tableRule");    for (int i = 0, n = list.getLength(); i < n; ++i) {
            Node node = list.item(i);        if (node instanceof Element) {
                Element e = (Element) node;            //先判断是否重复
                String name = e.getAttribute("name");            if (tableRules.containsKey(name)) {                throw new ConfigException("table rule " + name
                            + " duplicated!");
                }            //获取rule标签


    免费体验云安全(易盾)内容安全、验证码等服务

    更多网易技术、产品、运营经验分享请点击


    相关文章:
    【推荐】 iOS安装包瘦身(上篇)

  • 相关阅读:
    大数据之路Week08_day02 (Flume架构介绍和安装)
    Hive调优
    hive的shell用法(脑子糊涂了,对着脚本第一行是 #!/bin/sh 疯狂执行hive -f 结果报错)
    Week08_day01 (Hive 自定义函数 UDF 一个输入,一个输出(最常用))
    Week08_day01 (Hive开窗函数 row_number()的使用 (求出所有薪水前两名的部门))
    Week08_day01 (Hive实现按照指定格式输出每七天的消费平均数)
    Week08_day01 (Hive实现WordCount计数)
    SQL中 count(*)和count(1)的对比,区别
    大数据之路week07--day07 (修改mysql默认编码)
    10进制转换成16进制的函数(自写函数模板)
  • 原文地址:https://www.cnblogs.com/zyfd/p/9895001.html
Copyright © 2011-2022 走看看