zoukankan      html  css  js  c++  java
  • 实现基于netty的web框架,了解一下

    上一篇写了,基于netty实现的rpc的微框架,其中详细介绍netty的原理及组件,这篇就不过多介绍

    这篇实现基于netty的web框架,你说netty强不强,文中有不对的地方,欢迎大牛指正

    先普及几个知识点

    @sharable

    标注一个channel handler可以被多个channel安全地共享。
    ChannelHandlerAdapter还提供了实用方法isSharable()。如果其对应的实现被标注为Sharable,那么这个方法将返回true,
    表示它可以被添加到多个ChannelPipeline中。 因为一个ChannelHandler可以从属于多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext实例。
    用于这种用法的ChannelHandler必须要使用@Sharable注解标注;否则,试图将它添加到多个ChannelPipeline时将会触发异常。
    显而易见,为了安全地被用于多个并发的Channel(即连接),这样的ChannelHandler必须是线程安全的。

    AtomicInteger:这个类的存在是为了满足在高并发的情况下,原生的整形数值自增线程不安全的问题,在Java语言中,++i和i++操作并不是线程安全的,在使用的时候,不可避免的会用到synchronized关键字。而AtomicInteger则通过一种线程安全的加减操作接口。AtomicInteger为什么能够达到多而不乱,处理高并发应付自如呢?

      这是由硬件提供原子操作指令实现的,这里面用到了一种并发技术:CAS。在非激烈竞争的情况下,开销更小,速度更快

    TimeUnit: 

    TimeUnit是Java.util.concurrent包下面的一个类。它提供了两大功能:

    1)提供了可读性更好的线程暂停操作,通常用来替换Thread.sleep(); 

    2)提供了便捷方法用于把时间转换成不同单位,如把秒转换成毫秒;

    TimeUnit.MINUTES.sleep(4);  // sleeping for 4 minutes
    
    Thread.sleep(4*60*1000);

    项目的目录结构

    上代码,分享一些关键的代码,后续的giuhub上的demo的注释很详细

    //Netty 事件回调类
    @Sharable
    public class MessageCollector extends ChannelInboundHandlerAdapter {
        private final static Logger LOG = LoggerFactory.getLogger(MessageCollector.class);
        //业务线程池
        private ThreadPoolExecutor[] executors;
        private RequestDispatch requestDispatch;
        //业务队列最大值
        private int requestsMaxInflight=1000;
    
        public MessageCollector(int workerThreads,RequestDispatch dispatch){
            //给业务线程命名
            ThreadFactory factory =new ThreadFactory() {
                AtomicInteger seq=new AtomicInteger();
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread =new Thread(r);
                    thread.setName("http-"+seq.getAndIncrement());
                    return thread;
                }
            };
            this.executors=new ThreadPoolExecutor[workerThreads];
            for(int i=0;i<workerThreads;i++){
                ArrayBlockingQueue queue=new ArrayBlockingQueue<Runnable>(requestsMaxInflight);
                ////闲置时间超过30秒的线程就自动销毁
                this.executors[i]=new ThreadPoolExecutor(1,1,
                        30, TimeUnit.SECONDS, queue,factory,new CallerRunsPolicy());
            }
    
            this.requestDispatch=dispatch;
        }
    
        public  void  closeGracefully(){
            //优雅一点关闭,先通知,再等待,最后强制关闭
            for (int i=0;i<executors.length;i++){
                ThreadPoolExecutor executor=executors[i];
                try {
                    executor.awaitTermination(10,TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                executor.shutdownNow();
            }
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //客户端来了一个新的连接
           LOG.info("connection comes {}",ctx.channel().remoteAddress());
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            //客户端走了一个
            LOG.info("connection leaves {}",ctx.channel().remoteAddress());
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof FullHttpRequest){
                FullHttpRequest req= (FullHttpRequest) msg;
                CRC32 crc32=new CRC32();
                crc32.update(ctx.hashCode());
                int idx =(int) (crc32.getValue()%executors.length);
                //用业务线程处理消息
                this.executors[idx].execute(() ->{
                    requestDispatch.dispatch(ctx,req);
                });
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //此处可能因为客户机器突发重启
            //也可能客户端连接时间超时,后面的REadTimeoutHandle抛出异常
            //也可能消息协议错误,序列化异常
            ctx.close();
        }
    }
    HttpServer
    public class HttpServer {
        private final static Logger LOG= LoggerFactory.getLogger(HttpServer.class);
        private String ip;
        private int port;  //端口
        private int ioThreads;  //IO线程数,用于处理套接字读写,由Netty内部管理
        private int workerThreads;  //业务线程数,专门处理http请求,由我们本省框架管理
        private RequestDispatch requestDispatch;//请求配发器对象
    
        public HttpServer() {
        }
    
        public HttpServer(String ip, int port, int ioThreads,
                          int workerThreads, RequestDispatch requestDispatch) {
            this.ip = ip;
            this.port = port;
            this.ioThreads = ioThreads;
            this.workerThreads = workerThreads;
            this.requestDispatch = requestDispatch;
        }
        //用于服务端,使用一个ServerChannel接收客户端的连接,
        // 并创建对应的子Channel
        private ServerBootstrap bootstrap;
        //包含多个EventLoop
        private EventLoopGroup group;
        //代表一个Socket连接
        private Channel serverChannel;
        //
        private MessageCollector collector;
    
        public  void start(){
            bootstrap=new ServerBootstrap();
            group=new NioEventLoopGroup(ioThreads);
            bootstrap.group(group);
            collector=new MessageCollector(workerThreads,requestDispatch);
            bootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline=socketChannel.pipeline();
                    //如果客户端60秒没有任何请求,就关闭客户端连接
                    pipeline.addLast(new ReadTimeoutHandler(10));
                    //客户端和服务器简单的编解码器:HttpClientCodec和HttpServerCodec。
                    //ChannelPipelien中有解码器和编码器(或编解码器)后就可以操作不同的HttpObject消息了;但是HTTP请求和响应可以有很多消息数据,
                    // 你需要处理不同的部分,可能也需要聚合这些消息数据
                    pipeline.addLast(new HttpServerCodec());
                    //通过HttpObjectAggregator,Netty可以聚合HTTP消息,
                    // 使用FullHttpResponse和FullHttpRequest到ChannelPipeline中的下一个ChannelHandler,这就消除了断裂消息,保证了消息的完整
                    pipeline.addLast(new HttpObjectAggregator(1 << 30)); // max_size = 1g
                    //允许通过处理ChunkedInput来写大的数据块
                    pipeline.addLast(new ChunkedWriteHandler());
                    //将业务处理器放到最后
                    pipeline.addLast(collector);
                }
            });
        }
    
        public void stop() {
            // 先关闭服务端套件字
            serverChannel.close();
            // 再斩断消息来源,停止io线程池
            group.shutdownGracefully();
            // 最后停止业务线程
            collector.closeGracefully();
        }
    
    }
    RequestDispatcherImpl 是请求派发器,用于将收到的HTTP请求对象扔给响应的RequestHandler进行处理。
    public class RequestDispatcherImpl implements RequestDispatch {
        private final static Logger LOG = LoggerFactory.getLogger(RequestDispatcherImpl.class);
    
        private String contextRoot;
        private Router router;
        private Map<Integer, WebExceptionHandler> exceptionHandlers = new HashMap<>();
        private WebExceptionHandler defaultExceptionHandler = new DefaultExceptionHandler();
    
        private WebTemplateEngine templateEngine = new WebTemplateEngine() {
        };
    
        static class DefaultExceptionHandler implements WebExceptionHandler {
    
            @Override
            public void handle(ApplicationContext ctx, AbortException e) {
                if (e.getStatus().code() == 500) {
                    LOG.error("Internal Server Error", e);
                }
                ctx.error(e.getContent(), e.getStatus().code());
            }
    
        }
    
        public RequestDispatcherImpl(Router router) {
            this("/", router);
        }
    
        public RequestDispatcherImpl(String contextRoot, Router router) {
            this.contextRoot = CurrentUtil.normalize(contextRoot);
            this.router = router;
        }
    
        public RequestDispatcherImpl templateRoot(String templateRoot) {
            this.templateEngine = new FreemarkerEngine(templateRoot);
            return this;
        }
    
        public String root() {
            return contextRoot;
        }
    
        public RequestDispatcherImpl exception(int code, WebExceptionHandler handler) {
            this.exceptionHandlers.put(code, handler);
            return this;
        }
    
        public RequestDispatcherImpl exception(WebExceptionHandler handler) {
            this.defaultExceptionHandler = handler;
            return this;
        }
        @Override
        public void dispatch(ChannelHandlerContext channelCtx, FullHttpRequest req) {
            ApplicationContext ctx = new ApplicationContext(channelCtx, contextRoot, templateEngine);
            try {
                this.handleImpl(ctx, new Request(req));
            } catch (AbortException e) {
                this.handleException(ctx, e);
            } catch (Exception e) {
                this.handleException(ctx, new AbortException(HttpResponseStatus.INTERNAL_SERVER_ERROR, e));
            } finally {
                req.release();
            }
        }
    
        private void handleException(ApplicationContext ctx, AbortException e) {
            WebExceptionHandler handler = this.exceptionHandlers.getOrDefault(e.getStatus().code(), defaultExceptionHandler);
            try {
                handler.handle(ctx, e);
            } catch (Exception ex) {
                this.defaultExceptionHandler.handle(ctx, new AbortException(HttpResponseStatus.INTERNAL_SERVER_ERROR, ex));
            }
        }
    
        private void handleImpl(ApplicationContext ctx, Request req) throws Exception {
            if (req.decoderResult().isFailure()) {
                ctx.abort(400, "http protocol decode failed");
            }
            if (req.relativeUri().contains("./") || req.relativeUri().contains(".\")) {
                ctx.abort(400, "unsecure url not allowed");
            }
            if (!req.relativeUri().startsWith(contextRoot)) {
                throw new AbortException(HttpResponseStatus.NOT_FOUND);
            }
            req.popRootUri(contextRoot);
            router.handle(ctx, req);
        }
    }

    项目github位置

    https://github.com/developerxiaofeng/WebFrameByNetty.git  

  • 相关阅读:
    Netty简单聊天室
    JDK环境变量配置
    EasyUI Tabs
    NIO(五)
    NIO(四)
    银行对公业务和对私业务
    mysql常用操作
    LInux安装MySQL5.7.24详情
    Python3 SMTP发送邮件
    linux下sendmail邮件系统安装详情
  • 原文地址:https://www.cnblogs.com/developerxiaofeng/p/9168501.html
Copyright © 2011-2022 走看看