zoukankan      html  css  js  c++  java
  • 数据库路由中间件MyCat

    此文已由作者张镐薪授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。


    NodeList ruleNodes = e.getElementsByTagName("rule");
                int length = ruleNodes.getLength();
                if (length > 1) {
                    throw new ConfigException("only one rule can defined :"
                            + name);
                }
                //目前只处理第一个,未来可能有多列复合逻辑需求
                //RuleConfig是保存着rule与function对应关系的对象
                RuleConfig rule = loadRule((Element) ruleNodes.item(0));
                String funName = rule.getFunctionName();
                //判断function是否存在,获取function
                AbstractPartitionAlgorithm func = functions.get(funName);
                if (func == null) {
                    throw new ConfigException("can't find function of name :"
                            + funName);
                }
                rule.setRuleAlgorithm(func);
                //保存到tableRules
                tableRules.put(name, new TableRuleConfig(name, rule));
            }
        }
    }

    这样,所有的tableRule和function就加载完毕。保存在一个变量中,就是tableRules:XMLRuleLoader.java:

    private final Map<String, TableRuleConfig> tableRules;


    4.2 schema.xml:

    public XMLSchemaLoader(String schemaFile, String ruleFile) {    //先读取rule.xml
        XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile);    //将tableRules拿出,用于这里加载Schema做rule有效判断,以及之后的分片路由计算
        this.tableRules = ruleLoader.getTableRules();    //释放ruleLoader
        ruleLoader = null;    this.dataHosts = new HashMap<String, DataHostConfig>();    this.dataNodes = new HashMap<String, DataNodeConfig>();    this.schemas = new HashMap<String, SchemaConfig>();    //读取加载schema配置
        this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile);
    }private void load(String dtdFile, String xmlFile) {
        InputStream dtd = null;
        InputStream xml = null;    try {
            dtd = XMLSchemaLoader.class.getResourceAsStream(dtdFile);
            xml = XMLSchemaLoader.class.getResourceAsStream(xmlFile);
            Element root = ConfigUtil.getDocument(dtd, xml).getDocumentElement();        //先加载所有的DataHost
            loadDataHosts(root);        //再加载所有的DataNode
            loadDataNodes(root);        //最后加载所有的Schema
            loadSchemas(root);
        } catch (ConfigException e) {        throw e;
        } catch (Exception e) {        throw new ConfigException(e);
        } finally {        if (dtd != null) {            try {
                    dtd.close();
                } catch (IOException e) {
                }
            }        if (xml != null) {            try {
                    xml.close();
                } catch (IOException e) {
                }
            }
        }
    }

    先看下DataHostConfig这个类的结构: 这里写图片描述 XMLSchemaLoader.java:

    private void loadDataHosts(Element root) {
        NodeList list = root.getElementsByTagName("dataHost");    for (int i = 0, n = list.getLength(); i < n; ++i) {
    
            Element element = (Element) list.item(i);
            String name = element.getAttribute("name");        //判断是否重复
            if (dataHosts.containsKey(name)) {            throw new ConfigException("dataHost name " + name + "duplicated!");
            }        //读取最大连接数
            int maxCon = Integer.valueOf(element.getAttribute("maxCon"));        //读取最小连接数
            int minCon = Integer.valueOf(element.getAttribute("minCon"));        /**
             * 读取负载均衡配置
             * 1. balance="0", 不开启分离机制,所有读操作都发送到当前可用的 writeHost 上。
             * 2. balance="1",全部的 readHost 和 stand by writeHost 参不 select 的负载均衡
             * 3. balance="2",所有读操作都随机的在 writeHost、readhost 上分发。
             * 4. balance="3",所有读请求随机的分发到 wiriterHost 对应的 readhost 执行,writerHost 不负担读压力
             */
            int balance = Integer.valueOf(element.getAttribute("balance"));        /**
             * 读取切换类型
             * -1 表示不自动切换
             * 1 默认值,自动切换
             * 2 基于MySQL主从同步的状态决定是否切换
             * 心跳询句为 show slave status
             * 3 基于 MySQL galary cluster 的切换机制
             */
            String switchTypeStr = element.getAttribute("switchType");        int switchType = switchTypeStr.equals("") ? -1 : Integer.valueOf(switchTypeStr);        //读取从延迟界限
            String slaveThresholdStr = element.getAttribute("slaveThreshold");        int slaveThreshold = slaveThresholdStr.equals("") ? -1 : Integer.valueOf(slaveThresholdStr);        //如果 tempReadHostAvailable 设置大于 0 则表示写主机如果挂掉, 临时的读服务依然可用
            String tempReadHostAvailableStr = element.getAttribute("tempReadHostAvailable");
            boolean tempReadHostAvailable = tempReadHostAvailableStr.equals("") ? false : Integer.valueOf(tempReadHostAvailableStr) > 0;        /**
             * 读取 写类型
             * 这里只支持 0 - 所有写操作仅配置的第一个 writeHost
             */
            String writeTypStr = element.getAttribute("writeType");        int writeType = "".equals(writeTypStr) ? PhysicalDBPool.WRITE_ONLYONE_NODE : Integer.valueOf(writeTypStr);
    
    
            String dbDriver = element.getAttribute("dbDriver");
            String dbType = element.getAttribute("dbType");
            String filters = element.getAttribute("filters");
            String logTimeStr = element.getAttribute("logTime");        long logTime = "".equals(logTimeStr) ? PhysicalDBPool.LONG_TIME : Long.valueOf(logTimeStr) ;        //读取心跳语句
            String heartbeatSQL = element.getElementsByTagName("heartbeat").item(0).getTextContent();        //读取 初始化sql配置,用于oracle
            NodeList connectionInitSqlList = element.getElementsByTagName("connectionInitSql");
            String initConSQL = null;        if (connectionInitSqlList.getLength() > 0) {
                initConSQL = connectionInitSqlList.item(0).getTextContent();
            }        //读取writeHost
            NodeList writeNodes = element.getElementsByTagName("writeHost");
            DBHostConfig[] writeDbConfs = new DBHostConfig[writeNodes.getLength()];
            Map<Integer, DBHostConfig[]> readHostsMap = new HashMap<Integer, DBHostConfig[]>(2);        for (int w = 0; w < writeDbConfs.length; w++) {
                Element writeNode = (Element) writeNodes.item(w);
                writeDbConfs[w] = createDBHostConf(name, writeNode, dbType, dbDriver, maxCon, minCon,filters,logTime);
                NodeList readNodes = writeNode.getElementsByTagName("readHost");            //读取对应的每一个readHost
                if (readNodes.getLength() != 0) {
                    DBHostConfig[] readDbConfs = new DBHostConfig[readNodes.getLength()];                for (int r = 0; r < readDbConfs.length; r++) {
                        Element readNode = (Element) readNodes.item(r);
                        readDbConfs[r] = createDBHostConf(name, readNode, dbType, dbDriver, maxCon, minCon,filters, logTime);
                    }
                    readHostsMap.put(w, readDbConfs);
                }
            }
    
            DataHostConfig hostConf = new DataHostConfig(name, dbType, dbDriver, 
                    writeDbConfs, readHostsMap, switchType, slaveThreshold, tempReadHostAvailable);        
    
            hostConf.setMaxCon(maxCon);
            hostConf.setMinCon(minCon);
            hostConf.setBalance(balance);
            hostConf.setWriteType(writeType);
            hostConf.setHearbeatSQL(heartbeatSQL);
            hostConf.setConnectionInitSql(initConSQL);
            hostConf.setFilters(filters);
            hostConf.setLogTime(logTime);
            dataHosts.put(hostConf.getName(), hostConf);
        }
    }

    先读取每个DataHost的通用配置,之后读取每个DataHost对应的writeHost以及每个writeHost对应的readHost。配置好后,保存在:

    private final Map<String, DataHostConfig> dataHosts;

    之后读取载入DataHost: XMLSchemaLoader.java:

    private void loadDataNodes(Element root) {    //读取DataNode分支
        NodeList list = root.getElementsByTagName("dataNode");    for (int i = 0, n = list.getLength(); i < n; i++) {
            Element element = (Element) list.item(i);
            String dnNamePre = element.getAttribute("name");
    
            String databaseStr = element.getAttribute("database");
            String host = element.getAttribute("dataHost");        //字符串不为空
            if (empty(dnNamePre) || empty(databaseStr) || empty(host)) {            throw new ConfigException("dataNode " + dnNamePre + " define error ,attribute can't be empty");
            }        //dnNames(name),databases(database),hostStrings(dataHost)都可以配置多个,以',', '$', '-'区分,但是需要保证database的个数*dataHost的个数=name的个数
            //多个dataHost与多个database如果写在一个标签,则每个dataHost拥有所有database
            //例如:<dataNode name="dn1$0-75" dataHost="localhost$1-10" database="db$0-759" />
            //则为:localhost1拥有dn1$0-75,localhost2也拥有dn1$0-75(对应db$76-151)
            String[] dnNames = io.mycat.util.SplitUtil.split(dnNamePre, ',', '$', '-');
            String[] databases = io.mycat.util.SplitUtil.split(databaseStr, ',', '$', '-');
            String[] hostStrings = io.mycat.util.SplitUtil.split(host, ',', '$', '-');        if (dnNames.length > 1 && dnNames.length != databases.length * hostStrings.length) {            throw new ConfigException("dataNode " + dnNamePre
                                + " define error ,dnNames.length must be=databases.length*hostStrings.length");
            }        if (dnNames.length > 1) {
    
                List<String[]> mhdList = mergerHostDatabase(hostStrings, databases);            for (int k = 0; k < dnNames.length; k++) {
                    String[] hd = mhdList.get(k);
                    String dnName = dnNames[k];
                    String databaseName = hd[1];
                    String hostName = hd[0];
                    createDataNode(dnName, databaseName, hostName);
                }
    
            } else {
                createDataNode(dnNamePre, databaseStr, host);
            }
    
        }
    }private void createDataNode(String dnName, String database, String host) {
    
        DataNodeConfig conf = new DataNodeConfig(dnName, database, host);        
        if (dataNodes.containsKey(conf.getName())) {        throw new ConfigException("dataNode " + conf.getName() + " duplicated!");
        }    if (!dataHosts.containsKey(host)) {        throw new ConfigException("dataNode " + dnName + " reference dataHost:" + host + " not exists!");
        }
        dataNodes.put(conf.getName(), conf);
    }

    生成的是DataNode类,放入:

    private final Map<String, DataNodeConfig> dataNodes;



    免费体验云安全(易盾)内容安全、验证码等服务

    更多网易技术、产品、运营经验分享请点击



    相关文章:
    【推荐】 知物由学 | 未来安全隐患:AI的软肋——故意欺骗神经网络

  • 相关阅读:
    MST(prim)+树形dp-hdu-4756-Install Air Conditioning
    Java常用排序算法+程序员必须掌握的8大排序算法
    高可用可伸缩架构实用经验谈
    MYSQL索引失效的各种情形总结
    MySQL使用索引的场景及真正利用索引的SQL类型
    MySQL数据库索引的4大类型以及相关的索引创建
    JVM调优浅谈
    dubbo作为消费者注册过程分析--????
    webservice 协议
    你应该知道的 RPC 原理
  • 原文地址:https://www.cnblogs.com/zyfd/p/9895037.html
Copyright © 2011-2022 走看看