zoukankan      html  css  js  c++  java
  • netty在shardingsphere中的运用

    在shardingsphere中会发现不少netty的代码,到底做了什么,来看下

    ShardingProxy代理服务,上代码,典型的netty启动

        public void start(final int port) {
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bossGroup = createEventLoopGroup();
                if (bossGroup instanceof EpollEventLoopGroup) {
                    groupsEpoll(bootstrap);
                } else {
                    groupsNio(bootstrap);
                }
                ChannelFuture future = bootstrap.bind(port).sync();
                future.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
                BackendExecutorContext.getInstance().getExecutorEngine().close();
            }
        }

    具体配置了什么

    bootstrap.group(bossGroup, workerGroup)
                    .channel(EpollServerSocketChannel.class)
                    .option(EpollChannelOption.SO_BACKLOG, 128)
                    .option(EpollChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024 * 1024, 16 * 1024 * 1024))
                    .option(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childOption(EpollChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .childOption(EpollChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ServerHandlerInitializer());

    handler做了啥呢

        @Override
        protected void initChannel(final SocketChannel socketChannel) {
            DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine = DatabaseProtocolFrontendEngineFactory.newInstance(LogicSchemas.getInstance().getDatabaseType());
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new PacketCodec(databaseProtocolFrontendEngine.getCodecEngine()));
            pipeline.addLast(new FrontendChannelInboundHandler(databaseProtocolFrontendEngine));
        }

    见名知意,其实就是启动了一个netty服务器,解析了数据库的交互协议做通信支持

    -----------------------------------------------------

    ShardingScaling中Bootstrap典型的Netty启动

    public static void main(final String[] args) {
            log.info("Init server config");
            initServerConfig();
            log.info("ShardingScaling Startup");
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new HttpServerInitializer());
                int port = ScalingContext.getInstance().getServerConfiguration().getPort();
                Channel channel = bootstrap.bind(port).sync().channel();
                log.info("ShardingScaling is server on http://127.0.0.1:" + port + '/');
                channel.closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }

    到底做了啥? 作为典型的http服务端接收处理请求

        @Override
        protected void initChannel(final SocketChannel socketChannel) {
            ChannelPipeline channelPipeline = socketChannel.pipeline();
            channelPipeline.addLast(new HttpServerCodec());
            channelPipeline.addLast(new HttpObjectAggregator(65536));
            channelPipeline.addLast(new HttpServerHandler());
        }

    大文章在HttpServerHandler,处理了啥? 作为扩展服务器,响应/shardingscaling/job/*请求

    protected void channelRead0(final ChannelHandlerContext channelHandlerContext, final FullHttpRequest request) {
            String requestPath = request.uri();
            String requestBody = request.content().toString(CharsetUtil.UTF_8);
            HttpMethod method = request.method();
            if (!URL_PATTERN.matcher(requestPath).matches()) {
                response(GSON.toJson(ResponseContentUtil.handleBadRequest("Not support request!")),
                        channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
                return;
            }
            if ("/shardingscaling/job/start".equalsIgnoreCase(requestPath) && method.equals(HttpMethod.POST)) {
                startJob(channelHandlerContext, requestBody);
                return;
            }
            if (requestPath.contains("/shardingscaling/job/progress/") && method.equals(HttpMethod.GET)) {
                getJobProgress(channelHandlerContext, requestPath);
                return;
            }
            if ("/shardingscaling/job/list".equalsIgnoreCase(requestPath) && method.equals(HttpMethod.GET)) {
                listAllJobs(channelHandlerContext);
                return;
            }
            if ("/shardingscaling/job/stop".equalsIgnoreCase(requestPath) && method.equals(HttpMethod.POST)) {
                stopJob(channelHandlerContext, requestBody);
                return;
            }
            response(GSON.toJson(ResponseContentUtil.handleBadRequest("Not support request!")),
                    channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
        }

    还有后续 上面startjob --- SCALING_JOB_CONTROLLER.start(shardingScalingJob);

    又启动了一个线程

    public void run() {
            realtimeDataSyncTask.prepare();
            historyDataSyncTaskGroup.prepare();
            syncTaskControlStatus = SyncTaskControlStatus.MIGRATE_HISTORY_DATA;
            historyDataSyncTaskGroup.start(event -> {
                log.info("history data migrate task {} finished, execute result: {}", event.getTaskId(), event.getEventType().name());
                if (EventType.EXCEPTION_EXIT.equals(event.getEventType())) {
                    stop();
                    dataSourceManager.close();
                    syncTaskControlStatus = SyncTaskControlStatus.MIGRATE_HISTORY_DATA_FAILURE;
                } else {
                    executeRealTimeSyncTask();
                }
            });
        }

    执行SyncTask,实时数据同步

    private void executeRealTimeSyncTask() {
            if (!SyncTaskControlStatus.MIGRATE_HISTORY_DATA.equals(syncTaskControlStatus)) {
                dataSourceManager.close();
                syncTaskControlStatus = SyncTaskControlStatus.STOPPED;
                return;
            }
            realtimeDataSyncTask.start(event -> {
                log.info("realtime data sync task {} finished, execute result: {}", syncTaskId, event.getEventType().name());
                dataSourceManager.close();
                syncTaskControlStatus = EventType.FINISHED.equals(event.getEventType()) ? SyncTaskControlStatus.STOPPED : SyncTaskControlStatus.SYNCHRONIZE_REALTIME_DATA_FAILURE;
            });
            syncTaskControlStatus = SyncTaskControlStatus.SYNCHRONIZE_REALTIME_DATA;
        }

    task能干啥,启动reader

        private void instanceSyncExecutors(final SyncExecutorGroup syncExecutorGroup) {
            reader = ReaderFactory.newInstanceLogReader(syncConfiguration.getReaderConfiguration(), logPositionManager.getCurrentPosition());
            List<Writer> writers = instanceWriters();
            DistributionChannel channel = instanceChannel(writers);
            reader.setChannel(channel);
            for (Writer each : writers) {
                each.setChannel(channel);
            }
            syncExecutorGroup.setChannel(channel);
            syncExecutorGroup.addSyncExecutor(reader);
            syncExecutorGroup.addAllSyncExecutor(writers);
        }

    reader又干了啥

    MySQLBinlogReader

    public void read(final Channel channel) {
            JDBCDataSourceConfiguration jdbcDataSourceConfiguration = (JDBCDataSourceConfiguration) rdbmsConfiguration.getDataSourceConfiguration();
            final JdbcUri uri = new JdbcUri(jdbcDataSourceConfiguration.getJdbcUrl());
            MySQLClient client = new MySQLClient(123456, uri.getHostname(), uri.getPort(), jdbcDataSourceConfiguration.getUsername(), jdbcDataSourceConfiguration.getPassword());
            client.connect();
            client.subscribe(binlogPosition.getFilename(), binlogPosition.getPosition());
            while (isRunning()) {
                AbstractBinlogEvent event = client.poll();
                if (null == event) {
                    sleep();
                    continue;
                }
                handleEvent(channel, uri, event);
            }
            pushRecord(channel, new FinishedRecord(new NopLogPosition()));
        }

    启动了client,订阅了binlog信息,Client里又干了啥,见名知意,处理了跟mysql之间的同步交互协议

        public synchronized void connect() {
            responseCallback = new DefaultPromise<>(eventLoopGroup.next());
            channel = new Bootstrap()
                    .group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(final SocketChannel socketChannel) {
                            socketChannel.pipeline().addLast(new PacketCodec(new MySQLPacketCodecEngine()));
                            socketChannel.pipeline().addLast(new MySQLCommandPacketDecoder());
                            socketChannel.pipeline().addLast(new MySQLNegotiateHandler(username, password, responseCallback));
                            socketChannel.pipeline().addLast(new MySQLCommandResponseHandler());
                        }
                    })
                    .option(ChannelOption.AUTO_READ, true)
                    .connect(host, port).channel();
            serverInfo = waitExpectedResponse(ServerInfo.class);
        }
  • 相关阅读:
    windows查询占用端口的pid以及查询对应的进程名称
    [转]Android学习系列(29)App调试的几个命令实践
    [原]Android中接入微信客户端心得
    Robots.txt使用指南
    SqlHelper中使用事务
    QQ 影音,功能手札
    Access 2007数据库压缩和修复数据库功能
    dhl:PetShop里面的sqlHelper相关操作
    dhl:svn客户端学习TortoiseSVN的基本使用方法
    从 if else 到设计模式的转变
  • 原文地址:https://www.cnblogs.com/it-worker365/p/14977744.html
Copyright © 2011-2022 走看看