zoukankan      html  css  js  c++  java
  • 数据库路由中间件MyCat

    此文已由作者张镐薪授权网易云社区发布。

    欢迎访问网易云社区,了解更多网易技术产品运营经验。

    3. 连接模块

    3.5 后端连接

    3.5.2 后端连接获取与维护管理

    还是那之前的流程,

    st=>start: MyCat接受客户端连接并为之建立唯一绑定的Session
    e=>end: 将请求发送给对应连接,处理完之后归还连接
    op1=>operation: MyCat接受客户端的请求,计算路由
    op2=>operation: 根据请求和路由创建合适的handler,这里为SingleNodeHandler
    op3=>operation: 从PhysicalDBNode中获取后端连接
    cond=>condition: 尝试获取连接,连接够用?
    op4=>operation: 尝试异步创建新的连接
    op5=>operation: 通过DelegateResponseHandler将连接与之前的Handler,这里是SingleNodeHandler绑定
    st->op1->op2->op3->condcond(yes)->econd(no)->op4->op5->e

    现在我们到了尝试获取连接的阶段 PhysicalDataSource.java:

    public void getConnection(String schema,boolean autocommit, final ResponseHandler handler,    final Object attachment) throws IOException {//从当前连接map中拿取已建立好的后端连接
        BackendConnection con = this.conMap.tryTakeCon(schema,autocommit);    if (con != null) {//如果不为空,则绑定对应前端请求的handler
            takeCon(con, handler, attachment, schema);        return;
        } else {//如果为空,新建连接
            int activeCons = this.getActiveCount();//当前最大活动连接
            if(activeCons+1>size){//下一个连接大于最大连接数
                LOGGER.error("the max activeConnnections size can not be max than maxconnections");            throw new IOException("the max activeConnnections size can not be max than maxconnections");
            }else{            // create connection
                LOGGER.info("not ilde connection in pool,create new connection for " + this.name
                        + " of schema "+schema);
                createNewConnection(handler, attachment, schema);
            }
    
        }
    
    }
    private void createNewConnection(final ResponseHandler handler,        final Object attachment, final String schema) throws IOException {    //异步创建连接,将连接的handler绑定为DelegateResponseHandler
        MycatServer.getInstance().getBusinessExecutor().execute(new Runnable() {        public void run() {            try {
                    createNewConnection(new DelegateResponseHandler(handler) {                    @Override
                        public void connectionError(Throwable e,
                                BackendConnection conn) {
                            handler.connectionError(e, conn);
                        }                    @Override
                        public void connectionAcquired(BackendConnection conn) {
                            takeCon(conn, handler, attachment, schema);
                        }
                    }, schema);
                } catch (IOException e) {
                    handler.connectionError(e, null);
                }
            }
        });
    }

    异步调用工厂方法创建后端连接,这里为MySQLConnection MySQLDataSource.java:

    @Override
        public void createNewConnection(ResponseHandler handler,String schema) throws IOException {
            factory.make(this, handler,schema);
    }

    根据之前所述,MySQLConnection的工厂方法会先将NIOhandler设置为MySQLConnectionAuthenticator: MySQLConnectionFactory.java:

    public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler,
                String schema) throws IOException {        //DBHost配置
            DBHostConfig dsc = pool.getConfig();        //根据是否为NIO返回SocketChannel或者AIO的AsynchronousSocketChannel
            NetworkChannel channel = openSocketChannel(MycatServer.getInstance()
                    .isAIO());        //新建MySQLConnection
            MySQLConnection c = new MySQLConnection(channel, pool.isReadNode());        //根据配置初始化MySQLConnection
            MycatServer.getInstance().getConfig().setSocketParams(c, false);
            c.setHost(dsc.getIp());
            c.setPort(dsc.getPort());
            c.setUser(dsc.getUser());
            c.setPassword(dsc.getPassword());
            c.setSchema(schema);        //目前实际连接还未建立,handler为MySQL连接认证MySQLConnectionAuthenticator,传入的handler为后端连接处理器ResponseHandler
            c.setHandler(new MySQLConnectionAuthenticator(c, handler));
            c.setPool(pool);
            c.setIdleTimeout(pool.getConfig().getIdleTimeout());        //AIO和NIO连接方式建立实际的MySQL连接
            if (channel instanceof AsynchronousSocketChannel) {
                ((AsynchronousSocketChannel) channel).connect(                    new InetSocketAddress(dsc.getIp(), dsc.getPort()), c,
                        (CompletionHandler) MycatServer.getInstance()
                                .getConnector());
            } else {            //通过NIOConnector建立连接
                ((NIOConnector) MycatServer.getInstance().getConnector())
                        .postConnect(c);
    
            }        return c;
        }

    这里传入的ResponseHandler为DelegateResponseHandler,在连接建立验证之后,会调用: MySQLConnectionAuthenticator.java:

    public void handle(byte[] data) {    //省略                
        //设置ResponseHandler
        if (listener != null) {
                listener.connectionAcquired(source);
        }    //省略}

    DelegateResponseHandler.java:

    private final ResponseHandler target;@Override
       public void connectionAcquired(BackendConnection conn) {   //将后端连接的ResponseHandler设置为target
       target.connectionAcquired(conn);
    }

    这样,原来没获取到连接的ResponseHandler就获得需要的连接,之后进行处理。处理完后,归还到连接池中。

    private void returnCon(BackendConnection c) {    //清空连接的Attachment
        c.setAttachment(null);    //设置为未使用
        c.setBorrowed(false);    //更新上次使用时间,用于清理空闲连接
        c.setLastTime(TimeUtil.currentTimeMillis());    //获取连接池对应的队列
        ConQueue queue = this.conMap.getSchemaConQueue(c.getSchema());    //按照是否Autocommit分类归还连接
        boolean ok = false;    if (c.isAutocommit()) {
            ok = queue.getAutoCommitCons().offer(c);
        } else {
            ok = queue.getManCommitCons().offer(c);
        }    //归还失败,关闭连接,记录
        if (!ok) {
    
            LOGGER.warn("can't return to pool ,so close con " + c);
            c.close("can't return to pool ");
        }
    }

    4.配置模块

    MyCat实例初始化时究竟会有什么操作呢?看下MyCat程序入口: MycatStartup.java:

    public static void main(String[] args) {    //是否启用zk配置,/myid.properties中的loadZk属性决定,默认不启用,从本地xml文件中读取配置
        ZkConfig.instance().initZk();    try {
            String home = SystemConfig.getHomePath();        if (home == null) {
                System.out.println(SystemConfig.SYS_HOME + "  is not set.");
                System.exit(-1);
            }        // init
            MycatServer server = MycatServer.getInstance();
            server.beforeStart();        // startup
            server.startup();
            System.out.println("MyCAT Server startup successfully. see logs in logs/mycat.log");        while (true) {
                Thread.sleep(300 * 1000);
            }
        } catch (Exception e) {
            SimpleDateFormat sdf = new SimpleDateFormat(dateFormat);
            LogLog.error(sdf.format(new Date()) + " startup error", e);
            System.exit(-1);
        }
    }

    从代码中,可以简单的分为三步:

    1. MycatServer.getInstance():获取MyCat实例,其实就是读取配置文件,并验证正确性等

    2. server.beforeStart():获取环境变量,日志配置

    3. server.startup():启动MyCat,启动线程,初始化线程池和连接池等。


    免费体验云安全(易盾)内容安全、验证码等服务

    更多网易技术、产品、运营经验分享请点击




    相关文章:
    【推荐】 SpringBoot入门(二)——起步依赖

  • 相关阅读:
    腾讯Techo开发者大会PPT分享
    构建三维一体立体化监控,看这一篇就够了!
    ACOUG 联合创始人盖国强:万象更新,数据库技术和生态的发展演进
    全局配置项set_global_options 支持的opts
    InitOpts:初始化配置项:
    数据结构第四章树和森林,期末不挂科指南,第7篇
    Netflix:当你按下“播放”的时候发生了什么?
    Netflix:当你按下“播放”的时候发生了什么?
    搭建vsftpd文件服务器并创建虚拟用户
    搭建vsftpd文件服务器并创建虚拟用户
  • 原文地址:https://www.cnblogs.com/zyfd/p/9894917.html
Copyright © 2011-2022 走看看