zoukankan      html  css  js  c++  java
  • ShardingSphere~7

    ShardingProxy服务器处理框架

    启动:shardingsphere-proxy-bootstrap

    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public final class Bootstrap {
        public static void main(final String[] args) throws IOException, SQLException {
            BootstrapArguments bootstrapArgs = new BootstrapArguments(args);
            YamlProxyConfiguration yamlConfig = ProxyConfigurationLoader.load(bootstrapArgs.getConfigurationPath());
            createBootstrapInitializer(yamlConfig).init(yamlConfig, bootstrapArgs.getPort());
        }
        private static BootstrapInitializer createBootstrapInitializer(final YamlProxyConfiguration yamlConfig) {
            return null == yamlConfig.getServerConfiguration().getGovernance() ? new StandardBootstrapInitializer() : new GovernanceBootstrapInitializer();
        }
    }

    AbstractBootstrapInitializer 初始化,看到根据配置初始化各种组件,事件监听,扩展任务执行器等,最终启动proxy

        @Override
        public final void init(final YamlProxyConfiguration yamlConfig, final int port) throws SQLException {
            ProxyConfiguration proxyConfig = getProxyConfiguration(yamlConfig);
            MetaDataContexts metaDataContexts = decorateMetaDataContexts(createMetaDataContexts(proxyConfig));
            for (MetaDataAwareEventSubscriber each : ShardingSphereServiceLoader.getSingletonServiceInstances(MetaDataAwareEventSubscriber.class)) {
                each.setMetaDataContexts(metaDataContexts);
                ShardingSphereEventBus.getInstance().register(each);
            }
            String xaTransactionMangerType = metaDataContexts.getProps().getValue(ConfigurationPropertyKey.XA_TRANSACTION_MANAGER_TYPE);
            TransactionContexts transactionContexts = decorateTransactionContexts(createTransactionContexts(metaDataContexts), xaTransactionMangerType);
            ProxyContext.getInstance().init(metaDataContexts, transactionContexts);
            setDatabaseServerInfo();
            initScalingWorker(yamlConfig);
            shardingSphereProxy.start(port);
        }

    proxy服务器启动

    public final class ShardingSphereProxy {
        private EventLoopGroup bossGroup;
        private EventLoopGroup workerGroup;
        @SneakyThrows(InterruptedException.class)
        public void start(final int port) {
            try {
                createEventLoopGroup();
                ServerBootstrap bootstrap = new ServerBootstrap();
                initServerBootstrap(bootstrap);
                ChannelFuture future = bootstrap.bind(port).sync();
                log.info("ShardingSphere-Proxy start success.");
                future.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
                BackendExecutorContext.getInstance().getExecutorEngine().close();
            }
        }
        
        private void createEventLoopGroup() {
            bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1);
            workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
        }
        
        private void initServerBootstrap(final ServerBootstrap bootstrap) {
            bootstrap.group(bossGroup, workerGroup)
                    .channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ServerHandlerInitializer(FrontDatabaseProtocolTypeFactory.getDatabaseType()));
        }
    }

    在handler中解码请求并处理相关请求

    public final class ServerHandlerInitializer extends ChannelInitializer<SocketChannel> {
        
        private final DatabaseType databaseType;
        
        @Override
        protected void initChannel(final SocketChannel socketChannel) {
            DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine = DatabaseProtocolFrontendEngineFactory.newInstance(databaseType);
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
            pipeline.addLast(new FrontendChannelInboundHandler(databaseProtocolFrontendEngine));
        }
    }

    处理各种读写事件

    public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAdapter {
        
        private final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine;
        
        private final BackendConnection backendConnection;
        
        private volatile boolean authenticated;
        
        public FrontendChannelInboundHandler(final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine) {
            this.databaseProtocolFrontendEngine = databaseProtocolFrontendEngine;
            TransactionType transactionType = TransactionType.valueOf(ProxyContext.getInstance().getMetaDataContexts().getProps().getValue(ConfigurationPropertyKey.PROXY_TRANSACTION_TYPE));
            backendConnection = new BackendConnection(transactionType);
        }
        
        @Override
        public void channelActive(final ChannelHandlerContext context) {
            int connectionId = databaseProtocolFrontendEngine.getAuthenticationEngine().handshake(context);
            ConnectionThreadExecutorGroup.getInstance().register(connectionId);
            backendConnection.setConnectionId(connectionId);
        }
        
        @Override
        public void channelRead(final ChannelHandlerContext context, final Object message) {
            if (!authenticated) {
                authenticated = authenticate(context, (ByteBuf) message);
                return;
            }
            ProxyStateContext.execute(context, message, databaseProtocolFrontendEngine, backendConnection);
        }
        
        private boolean authenticate(final ChannelHandlerContext context, final ByteBuf message) {
            try (PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload(message)) {
                AuthenticationResult authResult = databaseProtocolFrontendEngine.getAuthenticationEngine().authenticate(context, payload);
                if (authResult.isFinished()) {
                    backendConnection.setGrantee(new Grantee(authResult.getUsername(), authResult.getHostname()));
                    backendConnection.setCurrentSchema(authResult.getDatabase());
                }
                return authResult.isFinished();
                // CHECKSTYLE:OFF
            } catch (final Exception ex) {
                // CHECKSTYLE:ON
                log.error("Exception occur: ", ex);
                context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(ex, backendConnection));
                context.close();
            }
            return false;
        }
        
        @Override
        public void channelInactive(final ChannelHandlerContext context) {
            context.fireChannelInactive();
            closeAllResources();
        }
        
        private void closeAllResources() {
            ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(backendConnection.getConnectionId());
            PrimaryVisitedManager.clear();
            backendConnection.closeResultSets();
            backendConnection.closeStatements();
            backendConnection.closeConnections(true);
            backendConnection.closeFederateExecutor();
            databaseProtocolFrontendEngine.release(backendConnection);
        }
        
        @Override
        public void channelWritabilityChanged(final ChannelHandlerContext context) {
            if (context.channel().isWritable()) {
                backendConnection.getResourceLock().doNotify();
            }
        }
    }
    View Code

    ProxyStateContext类中对应各种状态执行对应的动作

        static {
            STATES.put(StateType.OK, new OKProxyState());
            STATES.put(StateType.LOCK, new LockProxyState());
            STATES.put(StateType.CIRCUIT_BREAK, new CircuitBreakProxyState());
        }

    进入执行

    public final class OKProxyState implements ProxyState {
        
        @Override
        public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) {
            boolean supportHint = ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.PROXY_HINT_ENABLED);
            boolean isOccupyThreadForPerConnection = databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection();
            ExecutorService executorService = CommandExecutorSelector.getExecutorService(
                    isOccupyThreadForPerConnection, supportHint, backendConnection.getTransactionStatus().getTransactionType(), backendConnection.getConnectionId());
            executorService.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
        }
    }
  • 相关阅读:
    Codeforces 1045C Hyperspace Highways (看题解) 圆方树
    Codeforces 316E3 线段树 + 斐波那切数列 (看题解)
    Codeforces 803G Periodic RMQ Problem 线段树
    Codeforces 420D Cup Trick 平衡树
    Codeforces 295E Yaroslav and Points 线段树
    Codeforces 196E Opening Portals MST (看题解)
    Codeforces 653F Paper task SA
    Codeforces 542A Place Your Ad Here
    python基础 异常与返回
    mongodb 删除
  • 原文地址:https://www.cnblogs.com/it-worker365/p/15000208.html
Copyright © 2011-2022 走看看