zoukankan      html  css  js  c++  java
  • sofa-bolt源码阅读(1)-服务端的启动

    Bolt服务器的核心类是RpcServer,启动的时候调用父类AbstractRemotingServer的startup方法。

    com.alipay.remoting.AbstractRemotingServer#startup    
    	@Override
        public void startup() throws LifeCycleException {
            super.startup();
    
            try {
                doInit();
    
                logger.warn("Prepare to start server on port {} ", port);
                if (doStart()) {
                    logger.warn("Server started on port {}", port);
                } else {
                    logger.warn("Failed starting server on port {}", port);
                    throw new LifeCycleException("Failed starting server on port: " + port);
                }
            } catch (Throwable t) {
                this.shutdown();// do stop to ensure close resources created during doInit()
                throw new IllegalStateException("ERROR: Failed to start the Server!", t);
            }
        }
    

    这里主要做了三件事

    1. 调用父类的startup()方法设置状态为启动

       com.alipay.remoting.AbstractLifeCycle#startup  
      	@Override
          public void startup() throws LifeCycleException {
              if (isStarted.compareAndSet(false, true)) {
                  return;
              }
              throw new LifeCycleException("this component has started");
          }
      
    2. 调用实现类的doInit()进行实际的初始化工作

      	com.alipay.remoting.rpc.RpcServer#doInit
              
      	@Override
          protected void doInit() {
              if (this.addressParser == null) {
                  this.addressParser = new RpcAddressParser();
              }
              if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                  // in server side, do not care the connection service state, so use null instead of global switch
                  ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(null);
                  this.connectionManager = new DefaultServerConnectionManager(connectionSelectStrategy);
                  this.connectionManager.startup();
      
                  this.connectionEventHandler = new RpcConnectionEventHandler(switches());
                  this.connectionEventHandler.setConnectionManager(this.connectionManager);
                  this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
              } else {
                  this.connectionEventHandler = new ConnectionEventHandler(switches());
                  this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
              }
              initRpcRemoting();
              this.bootstrap = new ServerBootstrap();
              this.bootstrap.group(bossGroup, workerGroup)
                  .channel(NettyEventLoopUtil.getServerSocketChannelClass())
                  .option(ChannelOption.SO_BACKLOG, ConfigManager.tcp_so_backlog())
                  .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())
                  .childOption(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())
                  .childOption(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());
      
              // set write buffer water mark
              initWriteBufferWaterMark();
      
              // init byte buf allocator
              if (ConfigManager.netty_buffer_pooled()) {
                  this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                      .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
              } else {
                  this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)
                      .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
              }
      
              // enable trigger mode for epoll if need
              NettyEventLoopUtil.enableTriggeredMode(bootstrap);
      
              final boolean idleSwitch = ConfigManager.tcp_idle_switch();
              final int idleTime = ConfigManager.tcp_server_idle();
              final ChannelHandler serverIdleHandler = new ServerIdleHandler();
              final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);
              this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      
                  @Override
                  protected void initChannel(SocketChannel channel) {
                      ChannelPipeline pipeline = channel.pipeline();
                      pipeline.addLast("decoder", codec.newDecoder());
                      pipeline.addLast("encoder", codec.newEncoder());
                      if (idleSwitch) {
                          pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,
                              TimeUnit.MILLISECONDS));
                          pipeline.addLast("serverIdleHandler", serverIdleHandler);
                      }
                      pipeline.addLast("connectionEventHandler", connectionEventHandler);
                      pipeline.addLast("handler", rpcHandler);
                      createConnection(channel);
                  }
      
                  /**
                   * create connection operation<br>
                   * <ul>
                   * <li>If flag manageConnection be true, use {@link DefaultConnectionManager} to add a new connection, meanwhile bind it with the channel.</li>
                   * <li>If flag manageConnection be false, just create a new connection and bind it with the channel.</li>
                   * </ul>
                   */
                  private void createConnection(SocketChannel channel) {
                      Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
                      if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                          connectionManager.add(new Connection(channel, url), url.getUniqueKey());
                      } else {
                          new Connection(channel, url);
                      }
                      channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
                  }
              });
          }
      

      这里的代码看似很复杂,其实主要是配置Netty服务器。Netty服务器的可配置选项通过ConfigManager来获取,Netty的业务处理在childHandler里面。

      protected void initChannel(SocketChannel channel) {
                      ChannelPipeline pipeline = channel.pipeline();
                      pipeline.addLast("decoder", codec.newDecoder());
                      pipeline.addLast("encoder", codec.newEncoder());
                      if (idleSwitch) {
                          pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,
                              TimeUnit.MILLISECONDS));
                          pipeline.addLast("serverIdleHandler", serverIdleHandler);
                      }
                      pipeline.addLast("connectionEventHandler", connectionEventHandler);
                      pipeline.addLast("handler", rpcHandler);
                      createConnection(channel);
                  }
      

      初始化channel的方法里面有6个channelHandler

      1. decoder 解码器

      2. encoder 编码器

      3. idleStateHandler

        Netty自带的空闲处理器,用于触发IdleStateEvent。在这里配置了总空闲时间idleTime(默认值是90000),即idleTime时间内如果通道没有发生读写操作,将出发一个IdleStateEvent事件。

      4. serverIdleHandler

        配合idleStateHandler使用,用来处理IdleStateEvent。当触发该时间后,关闭客户端连接

            @Override
            public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent) {
                    try {
                        logger.warn("Connection idle, close it from server side: {}",
                            RemotingUtil.parseRemoteAddress(ctx.channel()));
                        ctx.close();
                    } catch (Exception e) {
                        logger.warn("Exception caught when closing connection in ServerIdleHandler.", e);
                    }
                } else {
                    super.userEventTriggered(ctx, evt);
                }
            }
        
      5. connectionEventHandler

        connect事件处理器,类型由枚举类ConnectionEventType定义

        public enum ConnectionEventType {
            CONNECT, CLOSE, EXCEPTION;
        }
        
        • CONNECT

          connect事件在RpcServer.createConnection()方法里触发了一次

        • CLOSE

          close事件在连接断开时会触发

          com.alipay.remoting.ConnectionEventHandler
          
          @Override
          public void channelInactive(ChannelHandlerContext ctx) throws Exception {
              String remoteAddress = RemotingUtil.parseRemoteAddress(ctx.channel());
              infoLog("Connection channel inactive: {}", remoteAddress);
              super.channelInactive(ctx);
              Attribute attr = ctx.channel().attr(Connection.CONNECTION);
              if (null != attr) {
                  // add reconnect task
                  if (this.globalSwitch != null
                      && this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
                      Connection conn = (Connection) attr.get();
                      if (reconnectManager != null) {
                          reconnectManager.reconnect(conn.getUrl());
                      }
                  }
                  // trigger close connection event
                  onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);
              }
          }
          
        • EXCEPTION

          exception异常事件在源代码里面没有触发的地方,应该是预留的。

        无论是什么事件,最终都调用onEvent方法,转发给ConnectionEventListener,最终由用户自定义的ConnectionEventProcessor来处理具体逻辑

        com.alipay.remoting.ConnectionEventHandler#onEvent
            
        private void onEvent(final Connection conn, final String remoteAddress,
                             final ConnectionEventType type) {
            if (this.eventListener != null) {
                this.eventExecutor.onEvent(new Runnable() {
                    @Override
                    public void run() {
                        ConnectionEventHandler.this.eventListener.onEvent(type, remoteAddress, conn);
                    }
                });
            }
        }
        
        
        com.alipay.remoting.ConnectionEventListener#onEvent
        public void onEvent(ConnectionEventType type, String remoteAddress, Connection connection) {
                List<ConnectionEventProcessor> processorList = this.processors.get(type);
                if (processorList != null) {
                    for (ConnectionEventProcessor processor : processorList) {
                        processor.onEvent(remoteAddress, connection);
                    }
                }
            }
        
      6. handler

        具体业务处理器,注册类为RpcHandler,调用流程图如下

        对于客户端请求的处理交给UserProcessor, 可以调用RpcServer类的registerUserProcessor注册自定义的业务。

    3. 调用dostart启动服务器

      com.alipay.remoting.rpc.RpcServer#doStart
          
      @Override
      protected boolean doStart() throws InterruptedException {
          this.channelFuture = this.bootstrap.bind(new InetSocketAddress(ip(), port())).sync();
          return this.channelFuture.isSuccess();
      }
      
  • 相关阅读:
    CentOS6.5配置MySQL主从同步
    CentOS6.5安装telnet
    linux 下安装Google Chrome (ubuntu 12.04)
    jdk w7环境变量配置
    JDBCConnectionException: could not execute query,数据库连接池问题
    注意开发软件的版本问题!
    linux mysql命令行导入导出.sql文件 (ubuntu 12.04)
    linux 下root用户和user用户的相互切换 (ubuntu 12.04)
    linux 下 vim 的使用 (ubuntu 12.04)
    linux 下安装配置tomcat-7 (ubuntu 12.04)
  • 原文地址:https://www.cnblogs.com/huiyao/p/12396122.html
Copyright © 2011-2022 走看看