上一篇写了,基于netty实现的rpc的微框架,其中详细介绍netty的原理及组件,这篇就不过多介绍
这篇实现基于netty的web框架,你说netty强不强,文中有不对的地方,欢迎大牛指正
先普及几个知识点
@sharable
标注一个channel handler可以被多个channel安全地共享。
ChannelHandlerAdapter还提供了实用方法isSharable()。如果其对应的实现被标注为Sharable,那么这个方法将返回true,
表示它可以被添加到多个ChannelPipeline中。
因为一个ChannelHandler可以从属于多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext实例。
用于这种用法的ChannelHandler必须要使用@Sharable注解标注;否则,试图将它添加到多个ChannelPipeline时将会触发异常。
显而易见,为了安全地被用于多个并发的Channel(即连接),这样的ChannelHandler必须是线程安全的。
AtomicInteger:这个类的存在是为了满足在高并发的情况下,原生的整形数值自增线程不安全的问题,在Java语言中,++i和i++操作并不是线程安全的,在使用的时候,不可避免的会用到synchronized关键字。而AtomicInteger则通过一种线程安全的加减操作接口。AtomicInteger为什么能够达到多而不乱,处理高并发应付自如呢?
这是由硬件提供原子操作指令实现的,这里面用到了一种并发技术:CAS。在非激烈竞争的情况下,开销更小,速度更快
TimeUnit:
TimeUnit是Java.util.concurrent包下面的一个类。它提供了两大功能:
1)提供了可读性更好的线程暂停操作,通常用来替换Thread.sleep();
2)提供了便捷方法用于把时间转换成不同单位,如把秒转换成毫秒;
TimeUnit.MINUTES.sleep(4); // sleeping for 4 minutes Thread.sleep(4*60*1000);
项目的目录结构
上代码,分享一些关键的代码,后续的giuhub上的demo的注释很详细
//Netty 事件回调类 @Sharable public class MessageCollector extends ChannelInboundHandlerAdapter { private final static Logger LOG = LoggerFactory.getLogger(MessageCollector.class); //业务线程池 private ThreadPoolExecutor[] executors; private RequestDispatch requestDispatch; //业务队列最大值 private int requestsMaxInflight=1000; public MessageCollector(int workerThreads,RequestDispatch dispatch){ //给业务线程命名 ThreadFactory factory =new ThreadFactory() { AtomicInteger seq=new AtomicInteger(); @Override public Thread newThread(Runnable r) { Thread thread =new Thread(r); thread.setName("http-"+seq.getAndIncrement()); return thread; } }; this.executors=new ThreadPoolExecutor[workerThreads]; for(int i=0;i<workerThreads;i++){ ArrayBlockingQueue queue=new ArrayBlockingQueue<Runnable>(requestsMaxInflight); ////闲置时间超过30秒的线程就自动销毁 this.executors[i]=new ThreadPoolExecutor(1,1, 30, TimeUnit.SECONDS, queue,factory,new CallerRunsPolicy()); } this.requestDispatch=dispatch; } public void closeGracefully(){ //优雅一点关闭,先通知,再等待,最后强制关闭 for (int i=0;i<executors.length;i++){ ThreadPoolExecutor executor=executors[i]; try { executor.awaitTermination(10,TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } executor.shutdownNow(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //客户端来了一个新的连接 LOG.info("connection comes {}",ctx.channel().remoteAddress()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //客户端走了一个 LOG.info("connection leaves {}",ctx.channel().remoteAddress()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest){ FullHttpRequest req= (FullHttpRequest) msg; CRC32 crc32=new CRC32(); crc32.update(ctx.hashCode()); int idx =(int) (crc32.getValue()%executors.length); //用业务线程处理消息 this.executors[idx].execute(() ->{ requestDispatch.dispatch(ctx,req); }); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //此处可能因为客户机器突发重启 //也可能客户端连接时间超时,后面的REadTimeoutHandle抛出异常 //也可能消息协议错误,序列化异常 ctx.close(); } }
HttpServer
public class HttpServer { private final static Logger LOG= LoggerFactory.getLogger(HttpServer.class); private String ip; private int port; //端口 private int ioThreads; //IO线程数,用于处理套接字读写,由Netty内部管理 private int workerThreads; //业务线程数,专门处理http请求,由我们本省框架管理 private RequestDispatch requestDispatch;//请求配发器对象 public HttpServer() { } public HttpServer(String ip, int port, int ioThreads, int workerThreads, RequestDispatch requestDispatch) { this.ip = ip; this.port = port; this.ioThreads = ioThreads; this.workerThreads = workerThreads; this.requestDispatch = requestDispatch; } //用于服务端,使用一个ServerChannel接收客户端的连接, // 并创建对应的子Channel private ServerBootstrap bootstrap; //包含多个EventLoop private EventLoopGroup group; //代表一个Socket连接 private Channel serverChannel; // private MessageCollector collector; public void start(){ bootstrap=new ServerBootstrap(); group=new NioEventLoopGroup(ioThreads); bootstrap.group(group); collector=new MessageCollector(workerThreads,requestDispatch); bootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline=socketChannel.pipeline(); //如果客户端60秒没有任何请求,就关闭客户端连接 pipeline.addLast(new ReadTimeoutHandler(10)); //客户端和服务器简单的编解码器:HttpClientCodec和HttpServerCodec。 //ChannelPipelien中有解码器和编码器(或编解码器)后就可以操作不同的HttpObject消息了;但是HTTP请求和响应可以有很多消息数据, // 你需要处理不同的部分,可能也需要聚合这些消息数据 pipeline.addLast(new HttpServerCodec()); //通过HttpObjectAggregator,Netty可以聚合HTTP消息, // 使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一个ChannelHandler,这就消除了断裂消息,保证了消息的完整 pipeline.addLast(new HttpObjectAggregator(1 << 30)); // max_size = 1g //允许通过处理ChunkedInput来写大的数据块 pipeline.addLast(new ChunkedWriteHandler()); //将业务处理器放到最后 pipeline.addLast(collector); } }); } public void stop() { // 先关闭服务端套件字 serverChannel.close(); // 再斩断消息来源,停止io线程池 group.shutdownGracefully(); // 最后停止业务线程 collector.closeGracefully(); } }
RequestDispatcherImpl 是请求派发器,用于将收到的HTTP请求对象扔给响应的RequestHandler进行处理。
public class RequestDispatcherImpl implements RequestDispatch { private final static Logger LOG = LoggerFactory.getLogger(RequestDispatcherImpl.class); private String contextRoot; private Router router; private Map<Integer, WebExceptionHandler> exceptionHandlers = new HashMap<>(); private WebExceptionHandler defaultExceptionHandler = new DefaultExceptionHandler(); private WebTemplateEngine templateEngine = new WebTemplateEngine() { }; static class DefaultExceptionHandler implements WebExceptionHandler { @Override public void handle(ApplicationContext ctx, AbortException e) { if (e.getStatus().code() == 500) { LOG.error("Internal Server Error", e); } ctx.error(e.getContent(), e.getStatus().code()); } } public RequestDispatcherImpl(Router router) { this("/", router); } public RequestDispatcherImpl(String contextRoot, Router router) { this.contextRoot = CurrentUtil.normalize(contextRoot); this.router = router; } public RequestDispatcherImpl templateRoot(String templateRoot) { this.templateEngine = new FreemarkerEngine(templateRoot); return this; } public String root() { return contextRoot; } public RequestDispatcherImpl exception(int code, WebExceptionHandler handler) { this.exceptionHandlers.put(code, handler); return this; } public RequestDispatcherImpl exception(WebExceptionHandler handler) { this.defaultExceptionHandler = handler; return this; } @Override public void dispatch(ChannelHandlerContext channelCtx, FullHttpRequest req) { ApplicationContext ctx = new ApplicationContext(channelCtx, contextRoot, templateEngine); try { this.handleImpl(ctx, new Request(req)); } catch (AbortException e) { this.handleException(ctx, e); } catch (Exception e) { this.handleException(ctx, new AbortException(HttpResponseStatus.INTERNAL_SERVER_ERROR, e)); } finally { req.release(); } } private void handleException(ApplicationContext ctx, AbortException e) { WebExceptionHandler handler = this.exceptionHandlers.getOrDefault(e.getStatus().code(), defaultExceptionHandler); try { handler.handle(ctx, e); } catch (Exception ex) { this.defaultExceptionHandler.handle(ctx, new AbortException(HttpResponseStatus.INTERNAL_SERVER_ERROR, ex)); } } private void handleImpl(ApplicationContext ctx, Request req) throws Exception { if (req.decoderResult().isFailure()) { ctx.abort(400, "http protocol decode failed"); } if (req.relativeUri().contains("./") || req.relativeUri().contains(".\")) { ctx.abort(400, "unsecure url not allowed"); } if (!req.relativeUri().startsWith(contextRoot)) { throw new AbortException(HttpResponseStatus.NOT_FOUND); } req.popRootUri(contextRoot); router.handle(ctx, req); } }
项目github位置
https://github.com/developerxiaofeng/WebFrameByNetty.git