zoukankan      html  css  js  c++  java
  • ShardingSphere~7

    ShardingProxy服务器处理框架

    启动:shardingsphere-proxy-bootstrap

    @NoArgsConstructor(access = AccessLevel.PRIVATE)
    public final class Bootstrap {
        public static void main(final String[] args) throws IOException, SQLException {
            BootstrapArguments bootstrapArgs = new BootstrapArguments(args);
            YamlProxyConfiguration yamlConfig = ProxyConfigurationLoader.load(bootstrapArgs.getConfigurationPath());
            createBootstrapInitializer(yamlConfig).init(yamlConfig, bootstrapArgs.getPort());
        }
        private static BootstrapInitializer createBootstrapInitializer(final YamlProxyConfiguration yamlConfig) {
            return null == yamlConfig.getServerConfiguration().getGovernance() ? new StandardBootstrapInitializer() : new GovernanceBootstrapInitializer();
        }
    }

    AbstractBootstrapInitializer 初始化,看到根据配置初始化各种组件,事件监听,扩展任务执行器等,最终启动proxy

        @Override
        public final void init(final YamlProxyConfiguration yamlConfig, final int port) throws SQLException {
            ProxyConfiguration proxyConfig = getProxyConfiguration(yamlConfig);
            MetaDataContexts metaDataContexts = decorateMetaDataContexts(createMetaDataContexts(proxyConfig));
            for (MetaDataAwareEventSubscriber each : ShardingSphereServiceLoader.getSingletonServiceInstances(MetaDataAwareEventSubscriber.class)) {
                each.setMetaDataContexts(metaDataContexts);
                ShardingSphereEventBus.getInstance().register(each);
            }
            String xaTransactionMangerType = metaDataContexts.getProps().getValue(ConfigurationPropertyKey.XA_TRANSACTION_MANAGER_TYPE);
            TransactionContexts transactionContexts = decorateTransactionContexts(createTransactionContexts(metaDataContexts), xaTransactionMangerType);
            ProxyContext.getInstance().init(metaDataContexts, transactionContexts);
            setDatabaseServerInfo();
            initScalingWorker(yamlConfig);
            shardingSphereProxy.start(port);
        }

    proxy服务器启动

    public final class ShardingSphereProxy {
        private EventLoopGroup bossGroup;
        private EventLoopGroup workerGroup;
        @SneakyThrows(InterruptedException.class)
        public void start(final int port) {
            try {
                createEventLoopGroup();
                ServerBootstrap bootstrap = new ServerBootstrap();
                initServerBootstrap(bootstrap);
                ChannelFuture future = bootstrap.bind(port).sync();
                log.info("ShardingSphere-Proxy start success.");
                future.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
                BackendExecutorContext.getInstance().getExecutorEngine().close();
            }
        }
        
        private void createEventLoopGroup() {
            bossGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1);
            workerGroup = Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
        }
        
        private void initServerBootstrap(final ServerBootstrap bootstrap) {
            bootstrap.group(bossGroup, workerGroup)
                    .channel(Epoll.isAvailable() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ServerHandlerInitializer(FrontDatabaseProtocolTypeFactory.getDatabaseType()));
        }
    }

    在handler中解码请求并处理相关请求

    public final class ServerHandlerInitializer extends ChannelInitializer<SocketChannel> {
        
        private final DatabaseType databaseType;
        
        @Override
        protected void initChannel(final SocketChannel socketChannel) {
            DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine = DatabaseProtocolFrontendEngineFactory.newInstance(databaseType);
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
            pipeline.addLast(new FrontendChannelInboundHandler(databaseProtocolFrontendEngine));
        }
    }

    处理各种读写事件

    public final class FrontendChannelInboundHandler extends ChannelInboundHandlerAdapter {
        
        private final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine;
        
        private final BackendConnection backendConnection;
        
        private volatile boolean authenticated;
        
        public FrontendChannelInboundHandler(final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine) {
            this.databaseProtocolFrontendEngine = databaseProtocolFrontendEngine;
            TransactionType transactionType = TransactionType.valueOf(ProxyContext.getInstance().getMetaDataContexts().getProps().getValue(ConfigurationPropertyKey.PROXY_TRANSACTION_TYPE));
            backendConnection = new BackendConnection(transactionType);
        }
        
        @Override
        public void channelActive(final ChannelHandlerContext context) {
            int connectionId = databaseProtocolFrontendEngine.getAuthenticationEngine().handshake(context);
            ConnectionThreadExecutorGroup.getInstance().register(connectionId);
            backendConnection.setConnectionId(connectionId);
        }
        
        @Override
        public void channelRead(final ChannelHandlerContext context, final Object message) {
            if (!authenticated) {
                authenticated = authenticate(context, (ByteBuf) message);
                return;
            }
            ProxyStateContext.execute(context, message, databaseProtocolFrontendEngine, backendConnection);
        }
        
        private boolean authenticate(final ChannelHandlerContext context, final ByteBuf message) {
            try (PacketPayload payload = databaseProtocolFrontendEngine.getCodecEngine().createPacketPayload(message)) {
                AuthenticationResult authResult = databaseProtocolFrontendEngine.getAuthenticationEngine().authenticate(context, payload);
                if (authResult.isFinished()) {
                    backendConnection.setGrantee(new Grantee(authResult.getUsername(), authResult.getHostname()));
                    backendConnection.setCurrentSchema(authResult.getDatabase());
                }
                return authResult.isFinished();
                // CHECKSTYLE:OFF
            } catch (final Exception ex) {
                // CHECKSTYLE:ON
                log.error("Exception occur: ", ex);
                context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(ex, backendConnection));
                context.close();
            }
            return false;
        }
        
        @Override
        public void channelInactive(final ChannelHandlerContext context) {
            context.fireChannelInactive();
            closeAllResources();
        }
        
        private void closeAllResources() {
            ConnectionThreadExecutorGroup.getInstance().unregisterAndAwaitTermination(backendConnection.getConnectionId());
            PrimaryVisitedManager.clear();
            backendConnection.closeResultSets();
            backendConnection.closeStatements();
            backendConnection.closeConnections(true);
            backendConnection.closeFederateExecutor();
            databaseProtocolFrontendEngine.release(backendConnection);
        }
        
        @Override
        public void channelWritabilityChanged(final ChannelHandlerContext context) {
            if (context.channel().isWritable()) {
                backendConnection.getResourceLock().doNotify();
            }
        }
    }
    View Code

    ProxyStateContext类中对应各种状态执行对应的动作

        static {
            STATES.put(StateType.OK, new OKProxyState());
            STATES.put(StateType.LOCK, new LockProxyState());
            STATES.put(StateType.CIRCUIT_BREAK, new CircuitBreakProxyState());
        }

    进入执行

    public final class OKProxyState implements ProxyState {
        
        @Override
        public void execute(final ChannelHandlerContext context, final Object message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, final BackendConnection backendConnection) {
            boolean supportHint = ProxyContext.getInstance().getMetaDataContexts().getProps().<Boolean>getValue(ConfigurationPropertyKey.PROXY_HINT_ENABLED);
            boolean isOccupyThreadForPerConnection = databaseProtocolFrontendEngine.getFrontendContext().isOccupyThreadForPerConnection();
            ExecutorService executorService = CommandExecutorSelector.getExecutorService(
                    isOccupyThreadForPerConnection, supportHint, backendConnection.getTransactionStatus().getTransactionType(), backendConnection.getConnectionId());
            executorService.execute(new CommandExecutorTask(databaseProtocolFrontendEngine, backendConnection, context, message));
        }
    }
  • 相关阅读:
    线程的生命周期
    同步与死锁
    同步与死锁
    线程的休眠
    线程的休眠
    RTSP转RTMP、FLV、HLS网页无插件视频直播-LiveNVR功能介绍-音频开启
    使用LiveNVR实现将RTSP转RTMP、FLV、HLS,实现监控摄像头无插件直播
    使用LiveNVR实现安防摄像头RTSP流WEB无插件播放的延迟调优
    Javascript之数据执行原理探究
    Javascript之动画1
  • 原文地址:https://www.cnblogs.com/it-worker365/p/15000208.html
Copyright © 2011-2022 走看看