zoukankan      html  css  js  c++  java
  • sofa-rpc 服务端源码流程走读

    sofa-rpc是阿里开源的一款高性能的rpc框架,这篇文章主要是对sofa-rpc provider启动服务流程的一个代码走读,下面是我简单绘制的一个基本的关系流程图

    下面我们根据sofa-rpc代码,对流程进行一个跟踪与走读。我们以BoltServer的为例

        public static void main(String[] args) {
            ApplicationConfig application = new ApplicationConfig().setAppName("test-server");
    
            ServerConfig serverConfig = new ServerConfig()
                .setPort(22000)
                .setDaemon(false);
    
            ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
                .setInterfaceId(HelloService.class.getName())
                .setApplication(application)
                .setRef(new HelloServiceImpl())
                .setServer(serverConfig)
                .setRegister(false);
    
            ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>()
                .setInterfaceId(EchoService.class.getName())
                .setApplication(application)
                .setRef(new EchoServiceImpl())
                .setServer(serverConfig)
                .setRegister(false);
    
            providerConfig.export();
            providerConfig2.export();
    
            LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);
        }

    可以看到sofa-rpc通过ProviderConfig类对服务提供方Provider进行了配置信息的初始化,同时也提供了export做为服务启动的入口。

        public synchronized void export() {
            if (providerBootstrap == null) {
                providerBootstrap = Bootstraps.from(this);
            }
            providerBootstrap.export();
        }
    根据ProviderConfig中setBootstrap()配置的Bootstrap类型,我们通过Bootstaps.from(this)可以获取到不同的Bootstrap引导服务,分别是DefaultProviderBootstrap与 DubboProviderBootstrap 
        /**
         * 发布一个服务
         *
         * @param providerConfig 服务发布者配置
         * @param <T>            接口类型
         * @return 发布启动类
         */
        public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {
            String bootstrap = providerConfig.getBootstrap();
            if (StringUtils.isEmpty(bootstrap)) {
                // Use default provider bootstrap 无的话就返回默认DefaultProviderBootstrap
                bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
                providerConfig.setBootstrap(bootstrap);
            }
            ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)
                .getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig });
            return (ProviderBootstrap<T>) providerBootstrap;
        }
    
    

    DefaultProviderBootstrap与 DubboProviderBootstrap 都继承自ProviderBootstrap。

    DefaultProviderBootstrap又被BoltProviderBootstrap、Http2ClearTextProviderBootstrap、RestProviderBootstrap三个类所继承,这其实对应了sofa-rpc中的三种server服务方式。

    我们看下DefaultProviderBootstrap服务启动源码

        @Override
        public void export() {
            if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
                Thread thread = factory.newThread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Thread.sleep(providerConfig.getDelay());
                        } catch (Throwable ignore) { // NOPMD
                        }
                        doExport();
                    }
                });
                thread.start();
            } else {
                doExport();
            }
        }
    
        private void doExport() {
            if (exported) {
                return;
            }
    
            // 检查参数
            checkParameters();
    
            String appName = providerConfig.getAppName();
    
            //key  is the protocol of server,for concurrent safe
            Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
            // 将处理器注册到server
            List<ServerConfig> serverConfigs = providerConfig.getServer();
            for (ServerConfig serverConfig : serverConfigs) {
                String protocol = serverConfig.getProtocol();
    
                String key = providerConfig.buildKey() + ":" + protocol;
    
                if (LOGGER.isInfoEnabled(appName)) {
                    LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
                }
    
                // 注意同一interface,同一uniqleId,不同server情况
                AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
                if (cnt == null) { // 没有发布过
                    cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
                }
                int c = cnt.incrementAndGet();
                hasExportedInCurrent.put(serverConfig.getProtocol(), true);
                int maxProxyCount = providerConfig.getRepeatedExportLimit();
                if (maxProxyCount > 0) {
                    if (c > maxProxyCount) {
                        decrementCounter(hasExportedInCurrent);
                        // 超过最大数量,直接抛出异常
                        throw new SofaRpcRuntimeException("Duplicate provider config with key " + key
                            + " has been exported more than " + maxProxyCount + " times!"
                            + " Maybe it's wrong config, please check it."
                            + " Ignore this if you did that on purpose!");
                    } else if (c > 1) {
                        if (LOGGER.isInfoEnabled(appName)) {
                            LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
                                + " Maybe it's wrong config, please check it."
                                + " Ignore this if you did that on purpose!", key);
                        }
                    }
                }
    
            }
    
            try {
                // 构造请求调用器
                providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
                // 初始化注册中心
                if (providerConfig.isRegister()) {
                    List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
                    if (CommonUtils.isNotEmpty(registryConfigs)) {
                        for (RegistryConfig registryConfig : registryConfigs) {
                            RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                        }
                    }
                }
                // 将处理器注册到server
                for (ServerConfig serverConfig : serverConfigs) {
                    try {
                        //构建Server
                        Server server = serverConfig.buildIfAbsent();
                        // 注册序列化接口
                        server.registerProcessor(providerConfig, providerProxyInvoker);
                        if (serverConfig.isAutoStart()) {
                            //启动服务
                            server.start();
                        }
    
                    } catch (SofaRpcRuntimeException e) {
                        throw e;
                    } catch (Exception e) {
                        LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
                            + serverConfig.getId(), e);
                    }
                }
    
                // 注册到注册中心
                providerConfig.setConfigListener(new ProviderAttributeListener());
                register();
            } catch (Exception e) {
                decrementCounter(hasExportedInCurrent);
    
                if (e instanceof SofaRpcRuntimeException) {
                    throw (SofaRpcRuntimeException) e;
                } else {
                    throw new SofaRpcRuntimeException("Build provider proxy error!", e);
                }
            }
    
            // 记录一些缓存数据
            RpcRuntimeContext.cacheProviderConfig(this);
            exported = true;
        }

    代码中通过serverConfig.buildIfAbsent()构建Server服务对象,而在buildIfAbsent()函数中我们可以看到,sever是通过SeverFactory工厂获取到的,在SeverFactory的getSever()中根据SeverConfig的配置获取Sever的具体实例,并执行Init()进行初始化。

        /**
         * 启动服务
         *
         * @return the server
         */
        public synchronized Server buildIfAbsent() {
            if (server != null) {
                return server;
            }
            // 提前检查协议+序列化方式
            // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),
            //                SerializationType.valueOf(getSerialization()));
            
            //在sever工厂中拿到sever实例
            server = ServerFactory.getServer(this);
            return server;
        }
        /**
         * 初始化Server实例
         *
         * @param serverConfig 服务端配置
         * @return Server
         */
        public synchronized static Server getServer(ServerConfig serverConfig) {
            try {
                Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
                if (server == null) {
                    // 算下网卡和端口
                    resolveServerConfig(serverConfig);
    
                    ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
                        .getExtensionClass(serverConfig.getProtocol());
                    if (ext == null) {
                        throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),
                            "Unsupported protocol of server!");
                    }
                    server = ext.getExtInstance();
                    //服务初始化
                    server.init(serverConfig);
                    SERVER_MAP.put(serverConfig.getPort() + "", server);
                }
                return server;
            } catch (SofaRpcRuntimeException e) {
                throw e;
            } catch (Throwable e) {
                throw new SofaRpcRuntimeException(e.getMessage(), e);
            }
        }

    sofa-rpc提供了三种server类型 BoltServer,RestServer与AbstractHttpServer

    BoltServer中通讯底层通过RemotingServer实现的,RemotingServer是基于阿里sofa-bolt通信框架开发的。

        /**
         * Bolt服务端
         */
        protected RemotingServer       remotingServer;
    
       @Override
        public void start() {
            if (started) {
                return;
            }
            synchronized (this) {
                if (started) {
                    return;
                }
                // 生成阿里基于netty的bolt服务Server对象 
                remotingServer = initRemotingServer();
                try {
                    if (remotingServer.start(serverConfig.getBoundHost())) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),
                                serverConfig.getPort());
                        }
                    } else {
                        throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
                    }
                    started = true;
    
                    if (EventBus.isEnable(ServerStartedEvent.class)) {
                        EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
                    }
    
                } catch (SofaRpcRuntimeException e) {
                    throw e;
                } catch (Exception e) {
                    throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
                }
            }
        }

    AbstractHttpServer 提供http服务,底层通信通过ServerTransport类实现的

        /**
         * 服务端通讯层
         */
        private ServerTransport         serverTransport;
    
        @Override
        public void init(ServerConfig serverConfig) {
            this.serverConfig = serverConfig;
            this.serverTransportConfig = convertConfig(serverConfig);
            // 启动线程池
            this.bizThreadPool = initThreadPool(serverConfig);
            // 服务端处理器
            this.serverHandler = new HttpServerHandler();
    
            // set default transport config
            this.serverTransportConfig.setContainer(container);
            this.serverTransportConfig.setServerHandler(serverHandler);
        }
    
        @Override
        public void start() {
            if (started) {
                return;
            }
            synchronized (this) {
                if (started) {
                    return;
                }
                try {
                    // 启动线程池
                    this.bizThreadPool = initThreadPool(serverConfig);
                    this.serverHandler.setBizThreadPool(bizThreadPool);
                    //实例化服务,具体代码见
                    serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);
                    started = serverTransport.start();
    
                    if (started) {
                        if (EventBus.isEnable(ServerStartedEvent.class)) {
                            EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
                        }
                    }
                } catch (SofaRpcRuntimeException e) {
                    throw e;
                } catch (Exception e) {
                    throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e);
                }
            }
        }

    ServerTransport是个抽象类,具体实现为transport包下AbstractHttp2ServerTransport

        /**
         * 构造函数
         *
         * @param transportConfig 服务端配置
         */
        protected AbstractHttp2ServerTransport(ServerTransportConfig transportConfig) {
            super(transportConfig);
        }
    
     @Override
        public boolean start() {
            if (serverBootstrap != null) {
                return true;
            }
            synchronized (this) {
                if (serverBootstrap != null) {
                    return true;
                }
                boolean flag = false;
                SslContext sslCtx = SslContextBuilder.build();
    
                // Configure the server.
                EventLoopGroup bossGroup = NettyHelper.getServerBossEventLoopGroup(transportConfig);
    
                //可以看到然是基于Netty
                HttpServerHandler httpServerHandler = (HttpServerHandler) transportConfig.getServerHandler();
                bizGroup = NettyHelper.getServerBizEventLoopGroup(transportConfig, httpServerHandler.getBizThreadPool());
    
                serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(bossGroup, bizGroup)
                    .channel(transportConfig.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog())
                    .option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr())
                    .option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator())
                    .option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
                    .childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive())
                    .childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay())
                    .childOption(ChannelOption.SO_RCVBUF, 8192 * 128)
                    .childOption(ChannelOption.SO_SNDBUF, 8192 * 128)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
                    .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                        transportConfig.getBufferMin(), transportConfig.getBufferMax()))
                    .childHandler(new Http2ServerChannelInitializer(bizGroup, sslCtx,
                        httpServerHandler, transportConfig.getPayload()));
    
                // 绑定到全部网卡 或者 指定网卡
                ChannelFuture future = serverBootstrap.bind(
                    new InetSocketAddress(transportConfig.getHost(), transportConfig.getPort()));
                ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("HTTP/2 Server bind to {}:{} success!",
                                    transportConfig.getHost(), transportConfig.getPort());
                            }
                        } else {
                            LOGGER.error("HTTP/2 Server bind to {}:{} failed!",
                                transportConfig.getHost(), transportConfig.getPort());
                            stop();
                        }
                    }
                });
    
                try {
                    channelFuture.await();
                    if (channelFuture.isSuccess()) {
                        flag = Boolean.TRUE;
                    } else {
                        throw new SofaRpcRuntimeException("Server start fail!", future.cause());
                    }
                } catch (InterruptedException e) {
                    LOGGER.error(e.getMessage(), e);
                }
                return flag;
            }
        }

    RestServer 提供Rest服务,底层通信实现具体可见SofaNettyJaxrsServer。

        /**
         * Rest服务端
         */
        protected SofaNettyJaxrsServer httpServer;
    
        @Override
        public void init(ServerConfig serverConfig) {
            this.serverConfig = serverConfig;
            httpServer = buildServer();
        }

    SofaNettyJaxrsServer中服务启动的具体代码

     @Override
        public void start() {
            // CHANGE: 增加线程名字
            boolean daemon = serverConfig.isDaemon();
            boolean isEpoll = serverConfig.isEpoll();
            NamedThreadFactory ioFactory = new NamedThreadFactory("SEV-REST-IO-" + port, daemon);
            NamedThreadFactory bizFactory = new NamedThreadFactory("SEV-REST-BIZ-" + port, daemon);
            eventLoopGroup = isEpoll ? new EpollEventLoopGroup(ioWorkerCount, ioFactory)
                : new NioEventLoopGroup(ioWorkerCount, ioFactory);
            eventExecutor = isEpoll ? new EpollEventLoopGroup(executorThreadCount, bizFactory)
                : new NioEventLoopGroup(executorThreadCount, bizFactory);
            // Configure the server.
            bootstrap = new ServerBootstrap()
                .group(eventLoopGroup)
                .channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .childHandler(createChannelInitializer())
                .option(ChannelOption.SO_BACKLOG, backlog)
                .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isKeepAlive()); // CHANGE: setKeepAlive
    
            for (Map.Entry<ChannelOption, Object> entry : channelOptions.entrySet()) {
                bootstrap.option(entry.getKey(), entry.getValue());
            }
    
            for (Map.Entry<ChannelOption, Object> entry : childChannelOptions.entrySet()) {
                bootstrap.childOption(entry.getKey(), entry.getValue());
            }
    
            final InetSocketAddress socketAddress;
            if (null == hostname || hostname.isEmpty()) {
                socketAddress = new InetSocketAddress(port);
            } else {
                socketAddress = new InetSocketAddress(hostname, port);
            }
    
            bootstrap.bind(socketAddress).syncUninterruptibly();
        }

     OK,以上就是sofa-rpc服务端启动的一个基本的流程,这里关注的只是简单的服务启动流程,没有深入代码功能进行分析,在此基础上,我们可以进一步探究代码的具体实现。

    关注微信公众号,查看更多技术文章。

  • 相关阅读:
    U1. 广度优先搜索(BFS)和 广度优先搜索(DFS)
    C5. Spring 服务的注册与发现(Spring Cloud Eureka)
    S3. Android 消息推送
    S2. Android 常用控件
    S12. Android 检查更新功能实现
    S1. Android 功能大全
    B9 Concurrent 重入锁(ReentrantLock)
    117.dom2事件
    106.事件的传播机制
    105.事件对象及兼容处理
  • 原文地址:https://www.cnblogs.com/dafanjoy/p/9762366.html
Copyright © 2011-2022 走看看