源码地址:https://gitee.com/a1234567891/koalas-rpc
企业生产级百亿日PV高可用可拓展的RPC框架。理论上并发数量接近服务器带宽,客户端采用thrift协议,服务端支持netty和thrift的TThreadedSelectorServer半同步半异步线程模型,支持动态扩容,服务上下线,权重动态,可用性配置,页面流量统计,支持trace跟踪等,天然接入cat支持数据大盘展示等,持续为个人以及中小型公司提供可靠的RPC框架技术方案,贴上
1 @Override 2 public void run() { 3 try { 4 if (Epoll.isAvailable ()) { 5 bossGroup = new EpollEventLoopGroup (serverPublisher.bossThreadCount==0?AbstractKoalsServerPublisher.DEFAULT_EVENT_LOOP_THREADS:serverPublisher.bossThreadCount); 6 workerGroup = new EpollEventLoopGroup ( serverPublisher.workThreadCount==0? AbstractKoalsServerPublisher.DEFAULT_EVENT_LOOP_THREADS*2:serverPublisher.workThreadCount); 7 } else { 8 bossGroup = new NioEventLoopGroup (serverPublisher.bossThreadCount==0?AbstractKoalsServerPublisher.DEFAULT_EVENT_LOOP_THREADS:serverPublisher.bossThreadCount); 9 workerGroup = new NioEventLoopGroup ( serverPublisher.workThreadCount==0? AbstractKoalsServerPublisher.DEFAULT_EVENT_LOOP_THREADS*2:serverPublisher.workThreadCount ); 10 } 11 executorService = KoalasThreadedSelectorWorkerExcutorUtil.getWorkerExecutorWithQueue (serverPublisher.koalasThreadCount==0?AbstractKoalsServerPublisher.DEFAULT_KOALAS_THREADS:serverPublisher.koalasThreadCount,serverPublisher.koalasThreadCount==0?AbstractKoalsServerPublisher.DEFAULT_KOALAS_THREADS:serverPublisher.koalasThreadCount,serverPublisher.workQueue,new KoalasDefaultThreadFactory (serverPublisher.serviceInterface.getName ())); 12 13 ServerBootstrap b = new ServerBootstrap (); 14 b.group ( bossGroup, workerGroup ).channel ( workerGroup instanceof EpollEventLoopGroup ? EpollServerSocketChannel.class : NioServerSocketChannel.class ) 15 .handler ( new LoggingHandler ( LogLevel.INFO ) ) 16 .childHandler ( new NettyServerInitiator (serverPublisher,executorService)) 17 .option ( ChannelOption.SO_BACKLOG, 1024 ) 18 .option ( ChannelOption.SO_REUSEADDR, true ) 19 .option ( ChannelOption.SO_KEEPALIVE, true ); 20 Channel ch = b.bind ( serverPublisher.port ).sync ().channel (); 21 Runtime.getRuntime().addShutdownHook(new Thread(){ 22 @Override 23 public void run(){ 24 logger.info ( "Shutdown by Runtime" ); 25 if(zookeeperServer != null){ 26 zookeeperServer.destroy (); 27 } 28 logger.info ( "wait for service over 3000ms" ); 29 try { 30 Thread.sleep ( 3000 ); 31 } catch (Exception e) { 32 } 33 if(executorService!=null){ 34 executorService.shutdown (); 35 } 36 if(bossGroup != null) bossGroup.shutdownGracefully (); 37 if(workerGroup != null) workerGroup.shutdownGracefully (); 38 } 39 }); 40 41 if(StringUtils.isNotEmpty ( serverPublisher.zkpath )){ 42 ZookServerConfig zookServerConfig = new ZookServerConfig ( serverPublisher.zkpath,serverPublisher.serviceInterface.getName (),serverPublisher.env,serverPublisher.port,serverPublisher.weight,"netty" ); 43 zookeeperServer = new ZookeeperServer ( zookServerConfig ); 44 zookeeperServer.init (); 45 } 46 } catch ( Exception e){ 47 logger.error ( "NettyServer start faid !",e ); 48 if(bossGroup != null) bossGroup.shutdownGracefully (); 49 if(workerGroup != null) workerGroup.shutdownGracefully (); 50 } 51 52 logger.info("netty server init success server={}",serverPublisher); 53 54 }
首先开启NIO服务,由系统内核来判断是否支持epoll-EpollEventLoopGroup,如果不支持epoll采用IO多路复用的方式EpollEventLoopGroup,然后声明一个用户自定义线程池,这里有不清楚的读者肯定会问,netty本身支持连接线程和IO线程,为什么还要自定义声明自定义线程池,原因是假设在IO线程池中做的业务非常复杂,大量耗时,这样就会阻塞了netty线程的IO处理速度,影响吞吐量,这也就是reactor模型的设计理念,不让业务干扰连接线程和IO读写线程。NettyServerInitiator就是实际处理的业务handle了。
1 Runtime.getRuntime().addShutdownHook(new Thread(){ 2 @Override 3 public void run(){ 4 logger.info ( "Shutdown by Runtime" ); 5 if(zookeeperServer != null){ 6 zookeeperServer.destroy (); 7 } 8 logger.info ( "wait for service over 3000ms" ); 9 try { 10 Thread.sleep ( 3000 ); 11 } catch (Exception e) { 12 } 13 if(executorService!=null){ 14 executorService.shutdown (); 15 } 16 if(bossGroup != null) bossGroup.shutdownGracefully (); 17 if(workerGroup != null) workerGroup.shutdownGracefully (); 18 } 19 });
手动关闭钩子,服务关闭的时候要主动关闭节点信息。下面来看一下hander拦截器
package netty.initializer; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import netty.hanlder.KoalasDecoder; import netty.hanlder.KoalasEncoder; import netty.hanlder.KoalasHandler; import org.apache.thrift.TProcessor; import server.config.AbstractKoalsServerPublisher; import java.util.concurrent.ExecutorService; /** * Copyright (C) 2018 * All rights reserved * User: yulong.zhang * Date:2018年11月23日11:13:33 */ public class NettyServerInitiator extends ChannelInitializer<SocketChannel> { private ExecutorService executorService; private AbstractKoalsServerPublisher serverPublisher; public NettyServerInitiator(AbstractKoalsServerPublisher serverPublisher,ExecutorService executorService){ this.serverPublisher = serverPublisher; this.executorService = executorService; } @Override protected void initChannel(SocketChannel ch) { ch.pipeline ().addLast ( "decoder",new KoalasDecoder () ); ch.pipeline ().addLast ( "encoder",new KoalasEncoder ()); ch.pipeline ().addLast ( "handler",new KoalasHandler (serverPublisher,executorService) ); } }
decode负责拆包。encoder负责装包,handler是真正业务处理的逻辑,所有的业务处理都在这里的线程池中运行,结果通过ChannelHandlerContext 异步的返回给client端,通过这种方式真正的实现了reactor
下面我们看看拆包处理
1 package netty.hanlder; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.handler.codec.ByteToMessageDecoder; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import server.KoalasServerPublisher; 9 10 import java.util.List; 11 /** 12 * Copyright (C) 2018 13 * All rights reserved 14 * User: yulong.zhang 15 * Date:2018年11月23日11:13:33 16 */ 17 public class KoalasDecoder extends ByteToMessageDecoder { 18 19 private final static Logger logger = LoggerFactory.getLogger ( KoalasDecoder.class ); 20 21 @Override 22 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { 23 24 try { 25 if (in.readableBytes () < 4) { 26 return; 27 } 28 29 in.markReaderIndex (); 30 byte[] b = new byte[4]; 31 in.readBytes ( b ); 32 33 int length = decodeFrameSize ( b ); 34 35 if (in.readableBytes () < length) { 36 //reset the readerIndex 37 in.resetReaderIndex (); 38 return; 39 } 40 41 in.resetReaderIndex (); 42 ByteBuf fream = in.readRetainedSlice ( 4 + length ); 43 in.resetReaderIndex (); 44 45 in.skipBytes ( 4 + length ); 46 out.add ( fream ); 47 } catch (Exception e) { 48 logger.error ( "decode error",e ); 49 } 50 51 } 52 53 public static final int decodeFrameSize(byte[] buf) { 54 return (buf[0] & 255) << 24 | (buf[1] & 255) << 16 | (buf[2] & 255) << 8 | buf[3] & 255; 55 } 56 }
通过读取四个字节的长度来决定消息体长度,然后根据消息体长度来读取所有的字节流数据。decodeFrameSize方法将四个字节流转成int类型。KoalasHandler处理器逻辑比较复杂,我们只看核心的内容,首先通过thrift解析字节流来获取transport
ByteArrayInputStream inputStream = new ByteArrayInputStream ( b ); ByteArrayOutputStream outputStream = new ByteArrayOutputStream ( ); TIOStreamTransport tioStreamTransportInput = new TIOStreamTransport ( inputStream); TIOStreamTransport tioStreamTransportOutput = new TIOStreamTransport ( outputStream); TKoalasFramedTransport inTransport = new TKoalasFramedTransport ( tioStreamTransportInput,2048000 ); inTransport.setReadMaxLength_ ( maxLength ); TKoalasFramedTransport outTransport = new TKoalasFramedTransport ( tioStreamTransportOutput,2048000,ifUserProtocol );
最终扔到线程池里去执行,将当前IO线程释放给下一个任务。
try { executorService.execute ( new NettyRunable ( ctx,in,out,outputStream,localTprocessor,b,privateKey,publicKey,className,methodName,koalasTrace,cat)); } catch (RejectedExecutionException e){ logger.error ( e.getMessage ()+ErrorType.THREAD+",className:" +className,e ); handlerException(b,ctx,e,ErrorType.THREAD,privateKey,publicKey,thriftNative); }
RejectedExecutionException来负责当线程池不够用的时候返回给client端异常,因为server端的业务处理能力有限,所以这里适当的做了一下服务端保护防止雪崩的问题。当发现server端有大量的
RejectedExecutionException抛出,说明单机已经无法满足业务请求了,需要横向拓展机器来进行负载均衡。用户实际的业务执行是在Runable里,我们看看他到底做了什么
1 try { 2 tprocessor.process ( in,out ); 3 ctx.writeAndFlush (outputStream); 4 if(transaction!=null && cat) 5 transaction.setStatus ( Transaction.SUCCESS ); 6 } catch (Exception e) { 7 if(transaction!=null && cat) 8 transaction.setStatus ( e ); 9 logger.error ( e.getMessage () + ErrorType.APPLICATION+",className:"+className,e ); 10 handlerException(this.b,ctx,e,ErrorType.APPLICATION,privateKey,publicKey,thriftNative); 11 }
通过thrift的process来执行业务逻辑,将结果通过ctx.writeAndFlush (outputStream),返回给client端。在catch里处理当出现异常之后返回给client端异常结果。这样netty server的实现就全部结束了,thrift服务端解析相关内容我们下一篇来说,里面当中有很多细节需要读者是跟着源码阅读,如果有问题欢迎加群825199617来交流,更多spring,spring mvc,aop,jdk等源码交流等你来!