zoukankan      html  css  js  c++  java
  • JAVA RPC (九) netty服务端解析

    源码地址:https://gitee.com/a1234567891/koalas-rpc

    企业生产级百亿日PV高可用可拓展的RPC框架。理论上并发数量接近服务器带宽,客户端采用thrift协议,服务端支持netty和thrift的TThreadedSelectorServer半同步半异步线程模型,支持动态扩容,服务上下线,权重动态,可用性配置,页面流量统计,支持trace跟踪等,天然接入cat支持数据大盘展示等,持续为个人以及中小型公司提供可靠的RPC框架技术方案,贴上

     1   @Override
     2     public void run() {
     3         try {
     4             if (Epoll.isAvailable ()) {
     5                 bossGroup = new EpollEventLoopGroup (serverPublisher.bossThreadCount==0?AbstractKoalsServerPublisher.DEFAULT_EVENT_LOOP_THREADS:serverPublisher.bossThreadCount);
     6                 workerGroup = new EpollEventLoopGroup ( serverPublisher.workThreadCount==0? AbstractKoalsServerPublisher.DEFAULT_EVENT_LOOP_THREADS*2:serverPublisher.workThreadCount);
     7             } else {
     8                 bossGroup = new NioEventLoopGroup (serverPublisher.bossThreadCount==0?AbstractKoalsServerPublisher.DEFAULT_EVENT_LOOP_THREADS:serverPublisher.bossThreadCount);
     9                 workerGroup = new NioEventLoopGroup ( serverPublisher.workThreadCount==0? AbstractKoalsServerPublisher.DEFAULT_EVENT_LOOP_THREADS*2:serverPublisher.workThreadCount );
    10             }
    11             executorService = KoalasThreadedSelectorWorkerExcutorUtil.getWorkerExecutorWithQueue (serverPublisher.koalasThreadCount==0?AbstractKoalsServerPublisher.DEFAULT_KOALAS_THREADS:serverPublisher.koalasThreadCount,serverPublisher.koalasThreadCount==0?AbstractKoalsServerPublisher.DEFAULT_KOALAS_THREADS:serverPublisher.koalasThreadCount,serverPublisher.workQueue,new KoalasDefaultThreadFactory (serverPublisher.serviceInterface.getName ()));
    12 
    13             ServerBootstrap b = new ServerBootstrap ();
    14             b.group ( bossGroup, workerGroup ).channel ( workerGroup instanceof EpollEventLoopGroup ? EpollServerSocketChannel.class : NioServerSocketChannel.class )
    15                     .handler ( new LoggingHandler ( LogLevel.INFO ) )
    16                     .childHandler ( new NettyServerInitiator (serverPublisher,executorService))
    17                     .option ( ChannelOption.SO_BACKLOG, 1024 )
    18                     .option ( ChannelOption.SO_REUSEADDR, true )
    19                     .option ( ChannelOption.SO_KEEPALIVE, true );
    20             Channel ch = b.bind ( serverPublisher.port ).sync ().channel ();
    21             Runtime.getRuntime().addShutdownHook(new Thread(){
    22                 @Override
    23                 public void run(){
    24                     logger.info ( "Shutdown by Runtime" );
    25                     if(zookeeperServer != null){
    26                         zookeeperServer.destroy ();
    27                     }
    28                     logger.info ( "wait for service over 3000ms" );
    29                     try {
    30                         Thread.sleep ( 3000 );
    31                     } catch (Exception e) {
    32                     }
    33                     if(executorService!=null){
    34                         executorService.shutdown ();
    35                     }
    36                     if(bossGroup != null) bossGroup.shutdownGracefully ();
    37                     if(workerGroup != null) workerGroup.shutdownGracefully ();
    38                 }
    39             });
    40 
    41             if(StringUtils.isNotEmpty ( serverPublisher.zkpath )){
    42                 ZookServerConfig zookServerConfig = new ZookServerConfig ( serverPublisher.zkpath,serverPublisher.serviceInterface.getName (),serverPublisher.env,serverPublisher.port,serverPublisher.weight,"netty" );
    43                 zookeeperServer = new ZookeeperServer ( zookServerConfig );
    44                 zookeeperServer.init ();
    45             }
    46         } catch ( Exception e){
    47             logger.error ( "NettyServer start faid !",e );
    48             if(bossGroup != null) bossGroup.shutdownGracefully ();
    49             if(workerGroup != null) workerGroup.shutdownGracefully ();
    50         }
    51 
    52         logger.info("netty server init success server={}",serverPublisher);
    53 
    54     }

    首先开启NIO服务,由系统内核来判断是否支持epoll-EpollEventLoopGroup,如果不支持epoll采用IO多路复用的方式EpollEventLoopGroup,然后声明一个用户自定义线程池,这里有不清楚的读者肯定会问,netty本身支持连接线程和IO线程,为什么还要自定义声明自定义线程池,原因是假设在IO线程池中做的业务非常复杂,大量耗时,这样就会阻塞了netty线程的IO处理速度,影响吞吐量,这也就是reactor模型的设计理念,不让业务干扰连接线程和IO读写线程。NettyServerInitiator就是实际处理的业务handle了。

     1  Runtime.getRuntime().addShutdownHook(new Thread(){
     2                 @Override
     3                 public void run(){
     4                     logger.info ( "Shutdown by Runtime" );
     5                     if(zookeeperServer != null){
     6                         zookeeperServer.destroy ();
     7                     }
     8                     logger.info ( "wait for service over 3000ms" );
     9                     try {
    10                         Thread.sleep ( 3000 );
    11                     } catch (Exception e) {
    12                     }
    13                     if(executorService!=null){
    14                         executorService.shutdown ();
    15                     }
    16                     if(bossGroup != null) bossGroup.shutdownGracefully ();
    17                     if(workerGroup != null) workerGroup.shutdownGracefully ();
    18                 }
    19             });

    手动关闭钩子,服务关闭的时候要主动关闭节点信息。下面来看一下hander拦截器

    package netty.initializer;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.socket.SocketChannel;
    import netty.hanlder.KoalasDecoder;
    import netty.hanlder.KoalasEncoder;
    import netty.hanlder.KoalasHandler;
    import org.apache.thrift.TProcessor;
    import server.config.AbstractKoalsServerPublisher;
    
    import java.util.concurrent.ExecutorService;
    /**
     * Copyright (C) 2018
     * All rights reserved
     * User: yulong.zhang
     * Date:2018年11月23日11:13:33
     */
    public class NettyServerInitiator extends ChannelInitializer<SocketChannel> {
    
        private ExecutorService executorService;
    
        private  AbstractKoalsServerPublisher serverPublisher;
    
        public NettyServerInitiator(AbstractKoalsServerPublisher serverPublisher,ExecutorService executorService){
            this.serverPublisher = serverPublisher;
            this.executorService = executorService;
        }
    
        @Override
        protected void initChannel(SocketChannel ch) {
            ch.pipeline ().addLast ( "decoder",new KoalasDecoder () );
            ch.pipeline ().addLast ( "encoder",new KoalasEncoder ());
            ch.pipeline ().addLast ( "handler",new KoalasHandler (serverPublisher,executorService) );
        }
    
    }
    decode负责拆包。encoder负责装包,handler是真正业务处理的逻辑,所有的业务处理都在这里的线程池中运行,结果通过ChannelHandlerContext 异步的返回给client端,通过这种方式真正的实现了reactor
    下面我们看看拆包处理
     1 package netty.hanlder;
     2 
     3 import io.netty.buffer.ByteBuf;
     4 import io.netty.channel.ChannelHandlerContext;
     5 import io.netty.handler.codec.ByteToMessageDecoder;
     6 import org.slf4j.Logger;
     7 import org.slf4j.LoggerFactory;
     8 import server.KoalasServerPublisher;
     9 
    10 import java.util.List;
    11 /**
    12  * Copyright (C) 2018
    13  * All rights reserved
    14  * User: yulong.zhang
    15  * Date:2018年11月23日11:13:33
    16  */
    17 public class KoalasDecoder extends ByteToMessageDecoder {
    18 
    19     private final static Logger logger = LoggerFactory.getLogger ( KoalasDecoder.class );
    20 
    21     @Override
    22     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    23 
    24         try {
    25             if (in.readableBytes () < 4) {
    26                 return;
    27             }
    28 
    29             in.markReaderIndex ();
    30             byte[] b = new byte[4];
    31             in.readBytes ( b );
    32 
    33             int length = decodeFrameSize ( b );
    34 
    35             if (in.readableBytes () < length) {
    36                 //reset the readerIndex
    37                 in.resetReaderIndex ();
    38                 return;
    39             }
    40 
    41             in.resetReaderIndex ();
    42             ByteBuf fream = in.readRetainedSlice ( 4 + length );
    43             in.resetReaderIndex ();
    44 
    45             in.skipBytes ( 4 + length );
    46             out.add ( fream );
    47         } catch (Exception e) {
    48             logger.error ( "decode error",e );
    49         }
    50 
    51     }
    52 
    53     public static final int decodeFrameSize(byte[] buf) {
    54         return (buf[0] & 255) << 24 | (buf[1] & 255) << 16 | (buf[2] & 255) << 8 | buf[3] & 255;
    55     }
    56 }

    通过读取四个字节的长度来决定消息体长度,然后根据消息体长度来读取所有的字节流数据。decodeFrameSize方法将四个字节流转成int类型。KoalasHandler处理器逻辑比较复杂,我们只看核心的内容,首先通过thrift解析字节流来获取transport

                ByteArrayInputStream inputStream = new ByteArrayInputStream ( b );
                ByteArrayOutputStream outputStream = new ByteArrayOutputStream (  );
    
                TIOStreamTransport tioStreamTransportInput = new TIOStreamTransport (  inputStream);
                TIOStreamTransport tioStreamTransportOutput = new TIOStreamTransport (  outputStream);
    
                TKoalasFramedTransport inTransport = new TKoalasFramedTransport ( tioStreamTransportInput,2048000 );
                inTransport.setReadMaxLength_ ( maxLength );
                TKoalasFramedTransport outTransport = new TKoalasFramedTransport ( tioStreamTransportOutput,2048000,ifUserProtocol );

    最终扔到线程池里去执行,将当前IO线程释放给下一个任务。

               try {
                    executorService.execute ( new NettyRunable (  ctx,in,out,outputStream,localTprocessor,b,privateKey,publicKey,className,methodName,koalasTrace,cat));
                } catch (RejectedExecutionException e){
                    logger.error ( e.getMessage ()+ErrorType.THREAD+",className:" +className,e );
                    handlerException(b,ctx,e,ErrorType.THREAD,privateKey,publicKey,thriftNative);
                }
    RejectedExecutionException来负责当线程池不够用的时候返回给client端异常,因为server端的业务处理能力有限,所以这里适当的做了一下服务端保护防止雪崩的问题。当发现server端有大量的
    RejectedExecutionException抛出,说明单机已经无法满足业务请求了,需要横向拓展机器来进行负载均衡。用户实际的业务执行是在Runable里,我们看看他到底做了什么
     1            try {
     2                 tprocessor.process ( in,out );
     3                 ctx.writeAndFlush (outputStream);
     4                 if(transaction!=null && cat)
     5                     transaction.setStatus ( Transaction.SUCCESS );
     6             } catch (Exception e) {
     7                 if(transaction!=null && cat)
     8                     transaction.setStatus ( e );
     9                 logger.error ( e.getMessage () + ErrorType.APPLICATION+",className:"+className,e );
    10                 handlerException(this.b,ctx,e,ErrorType.APPLICATION,privateKey,publicKey,thriftNative);
    11             }

    通过thrift的process来执行业务逻辑,将结果通过ctx.writeAndFlush (outputStream),返回给client端。在catch里处理当出现异常之后返回给client端异常结果。这样netty server的实现就全部结束了,thrift服务端解析相关内容我们下一篇来说,里面当中有很多细节需要读者是跟着源码阅读,如果有问题欢迎加群825199617来交流,更多spring,spring mvc,aop,jdk等源码交流等你来!

     

  • 相关阅读:
    POJ 2955 Brackets 区间DP
    POJ 3311 Hie with the Pie 最短路+状压DP
    POJ 3615 Cow Hurdles(最短路径flyod)
    hdu 3790 最短路径dijkstra(多重权值)
    poj 3254 Corn Fields 状压DP
    状压DP
    poj2411 Mondriaan's Dream 状压DP
    M: Mysterious Conch 字符串哈希
    哈希(hash)理解
    域渗透:GPP(Group Policy Preferences)漏洞
  • 原文地址:https://www.cnblogs.com/zyl2016/p/10840087.html
Copyright © 2011-2022 走看看