zoukankan      html  css  js  c++  java
  • mycat服务启动{管理模块启动过程}

    mycat启动的时候启动了三个模块

    1:NIOConnector(负责链接mysql数据库,连接池以数据库为准不以链接字符串为准),

    1:NIOAcceptor,ManagerConnectionFactory(管理模块,默认端口9066)

    2:NIOAcceptor,ServerConnectionFactory(mysql服务模块,默认端口8066)

    这里介绍下管理模块的启动流程

    顺序图

    NIO和AIO

    mycat分别实现了NIO和AIO,由于linux当前没有真正实现AIO这里主要介绍NIO的流程。

    NIO的Reactor与AIO的Proactor两种模式的场景区别:
    下面是Reactor的做法:
    1. 等待事件响应 (Reactor job)
    2. 分发 “Ready-to-Read” 事件给用户句柄 ( Reactor job)
    3. 读数据 (user handler job)
    4. 处理数据( user handler job)
    下面再来看看真正意义的异步模式Proactor是如何做的:
    1. 等待事件响应 (Proactor job)
    2. 读数据 (Proactor job)
    3. 分发 “Read-Completed” 事件给用户句柄 (Proactor job)
    4. 处理数据(user handler job)

    mycat的NIO实现

    Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。
    Selector可以监听四种不同类型的事件:
    - Connect
    - Accept
    - Read
    - Write
    这四种事件用SelectionKey的四个常量来表示:
    - SelectionKey.OP_CONNECT
    - SelectionKey.OP_ACCEPT
    - SelectionKey.OP_READ
    - SelectionKey.OP_WRITE
    前面已经说了,NIO采用的Reactor模式:例如汽车是乘客访问的主体(Reactor),乘客上车后,到售票员(acceptor)处登记,之后乘客便可以休息睡觉去了,当到达乘客所要到达的目的地后,售票员将其唤醒即可。

    核心顺序

    mycat管理端的启动流程

    1:new ManagerConnectionFactory extends FrontendConnectionFactory

    2:new NIOReactorPool,new NIOReactor,new RW中new ConcurrentLinkedQueue<AbstractConnection>()而AbstractConnection中new NIOSocketWR

    3:new NIOAcceptor中向反应堆中注册了OP_ACCEPT,该类继承了Thread然后start启动

    accept

    			channel = serverChannel.accept();
    			channel.configureBlocking(false);
    			FrontendConnection c = factory.make(channel);
    			c.setAccepted(true);
    			c.setId(ID_GENERATOR.getId());
    			NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()
    					.nextProcessor();
    			c.setProcessor(processor);
    
    			LOGGER.info("accept");
    
    			NIOReactor reactor = reactorPool.getNextReactor();
    			reactor.postRegister(c);
    

    factory.make(channel):最终构造了ManagerQueryHandler(管理命令解析器)和FrontendAuthenticator(mycat权限解析器)

    reactor.postRegister(c):把当前链接添加到reactor的registerQueue中并唤醒reactor的selector

    read

    在NIOReactor的registerQueue为空的时候run循环空运转,当上一步把accept的链接放到队列的时候则

    			for (;;) {
    
    
    
    				++reactCount;
    				try {
    					selector.select(500L);
    					register(selector);
    					keys = selector.selectedKeys();
    					for (SelectionKey key : keys) {
    						AbstractConnection con = null;
    						try {
    							Object att = key.attachment();
    							if (att != null) {
    								con = (AbstractConnection) att;
    								if (key.isValid() && key.isReadable()) {
    									try {
    										con.asynRead();
    									} catch (IOException e) {
                                            con.close("program err:" + e.toString());
    										continue;
    									} catch (Exception e) {
    										LOGGER.debug("caught err:", e);
    										con.close("program err:" + e.toString());
    										continue;
    									}
    								}
    								if (key.isValid() && key.isWritable()) {
    									con.doNextWriteCheck();
    								}
    							} else {
    								key.cancel();
    							}
                            } catch (CancelledKeyException e) {
                                if (LOGGER.isDebugEnabled()) {
                                    LOGGER.debug(con + " socket key canceled");
                                }
                            } catch (Exception e) {
                                LOGGER.warn(con + " " + e);
                            }
    					}
    				} catch (Exception e) {
    					LOGGER.warn(name, e);
    				} finally {
    					if (keys != null) {
    						keys.clear();
    					}
    
    				}

    register(selector);也即

    ((NIOSocketWR) c.getSocketWR()).register(selector); 注册OP_READ事件
    c.register();即FrontendConnection的register发送握手数据包

    con.asynRead();即NIOSocketWR的asynRead即

    	public void asynRead() throws IOException {
    		LOGGER.info("asynRead");
    		ByteBuffer theBuffer = con.readBuffer;
    		if (theBuffer == null) {
    			theBuffer = con.processor.getBufferPool().allocate();
    			con.readBuffer = theBuffer;
    		}
    		int got = channel.read(theBuffer);
    		con.onReadData(got);
    
    	}
    

    con.onReadData(got);即AbstractConnection的onReadData这里拆包得到完成的数据包后调用

    handler.handle(data);也即FrontendAuthenticator的handle在这里check user;check password;check schema如果失败则将失败信息写入缓冲区,如果成功

    则把AbstractConnection的默认hander从FrontendAuthenticator换成FrontendCommandHandler等待接下来的处理(比如show命令等,

    以上的处理是发生在输入mysql -utest -ptest -h10.97.177.83 -P9066时)

    认证完成后下一次的handler.handle(data)则使用FrontendCommandHandler的handle来处理也即

        public void handle(byte[] data)
        {
            if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
            {
                MySQLMessage mm = new MySQLMessage(data);
                int  packetLength = mm.readUB3();
                if(packetLength+4==data.length)
                {
                    source.loadDataInfileData(data);
                }
                return;
            }
            switch (data[4])
            {
                case MySQLPacket.COM_INIT_DB:
                    commands.doInitDB();
                    source.initDB(data);
                    break;
                case MySQLPacket.COM_QUERY:
                    commands.doQuery();
                    source.query(data);
                    break;
                case MySQLPacket.COM_PING:
                    commands.doPing();
                    source.ping();
                    break;
                case MySQLPacket.COM_QUIT:
                    commands.doQuit();
                    source.close("quit cmd");
                    break;
                case MySQLPacket.COM_PROCESS_KILL:
                    commands.doKill();
                    source.kill(data);
                    break;
                case MySQLPacket.COM_STMT_PREPARE:
                    commands.doStmtPrepare();
                    source.stmtPrepare(data);
                    break;
                case MySQLPacket.COM_STMT_EXECUTE:
                    commands.doStmtExecute();
                    source.stmtExecute(data);
                    break;
                case MySQLPacket.COM_STMT_CLOSE:
                    commands.doStmtClose();
                    source.stmtClose(data);
                    break;
                case MySQLPacket.COM_HEARTBEAT:
                    commands.doHeartbeat();
                    source.heartbeat(data);
                    break;
                default:
                         commands.doOther();
                         source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
                                 "Unknown command");
    
            }
        }
    

    source.query(data);即queryHandler.query(sql);这里的queryHandler是ManagerQueryHandler即

        public void query(String sql) {
            ManagerConnection c = this.source;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
            }
            int rs = ManagerParse.parse(sql);
            switch (rs & 0xff) {
                case ManagerParse.SELECT:
                    SelectHandler.handle(sql, c, rs >>> SHIFT);
                    break;
                case ManagerParse.SET:
                    c.write(c.writeToBuffer(OkPacket.OK, c.allocate()));
                    break;
                case ManagerParse.SHOW:
                    ShowHandler.handle(sql, c, rs >>> SHIFT);
                    break;
                case ManagerParse.SWITCH:
                    SwitchHandler.handler(sql, c, rs >>> SHIFT);
                    break;
                case ManagerParse.KILL_CONN:
                    KillConnection.response(sql, rs >>> SHIFT, c);
                    break;
                case ManagerParse.OFFLINE:
                    Offline.execute(sql, c);
                    break;
                case ManagerParse.ONLINE:
                    Online.execute(sql, c);
                    break;
                case ManagerParse.STOP:
                    StopHandler.handle(sql, c, rs >>> SHIFT);
                    break;
                case ManagerParse.RELOAD:
                    ReloadHandler.handle(sql, c, rs >>> SHIFT);
                    break;
                case ManagerParse.ROLLBACK:
                    RollbackHandler.handle(sql, c, rs >>> SHIFT);
                    break;
                case ManagerParse.CLEAR:
                    ClearHandler.handle(sql, c, rs >>> SHIFT);
                    break;
                case ManagerParse.CONFIGFILE:
                    ConfFileHandler.handle(sql, c);
                    break;
                case ManagerParse.LOGFILE:
                    ShowServerLog.handle(sql, c);
                    break;
                default:
                    c.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement");
            }
        }
    

    总结

    mycat的网络处理逻辑上是通过队列加上后台线程来实现了accept和read的解耦从而实现了高性能,但是代码写的就不敢恭维。

  • 相关阅读:
    解决ListView异步加载数据之后不能点击的问题
    android点击实现图片放大缩小 java技术博客
    关于 数据文件自增长 的一点理解
    RAC 实例不能启动 ORA1589 signalled during ALTER DATABASE OPEN
    Linux 超级用户的权利
    RAC 实例 迁移到 单实例 使用导出导入
    Shell 基本语法
    Linux 开机引导与关机过程
    RAC 实例不能启动 ORA1589 signalled during ALTER DATABASE OPEN
    Oracle RAC + Data Guard 环境搭建
  • 原文地址:https://www.cnblogs.com/tommyli/p/5127550.html
Copyright © 2011-2022 走看看