zoukankan      html  css  js  c++  java
  • Netty 服务端创建过程

    1.首先通过构造函数创建ServerBootstrap 实例,ServerBootStrap是Netty的启动辅助类。用于设置服务端启动相关的参数

    ServerBootstrap bootstrap = new ServerBootstrap();
    

    2.设置并绑定Reactor线程池,也就是创建EventLoopGroup对象,管理相关业务。

     EventLoopGroup bossGroup = new NioEventLoopGroup();
     EventLoopGroup wordGroup = new NioEventLoopGroup();
    
    NioEventLoopGroup实际上就是Reactor线程池,负责调度和执行客户端的接入、网络读写事件的处理、用户自定义任务和定时任务的执行。

    NettyServerBootStrap.Group()源码:
      public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
            super.group(parentGroup);
            if (childGroup == null) {
                throw new NullPointerException("childGroup");
            } else if (this.childGroup != null) {
                throw new IllegalStateException("childGroup set already");
            } else {
                this.childGroup = childGroup;
                return this;
            }
        }
    

    3.设置并且绑定服务端通道(Channel),Netty中也就是NioServerSocketChannel。

    4.在链路创建的时候初始化ChannelPipeline。ChannelPipeline主要负责管理执行相关的网络事件(根据ChannelHandler的执行策略调度ChannelHandler执行),它本质上是一个负责处理网络事件的职责链(用参数配置),负责管理和执行ChannelHandler。

    第3步和第4步通过链式:

    serverBootstrap.group(bossGroup, workGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChildChannelInitializer());

    调度的ChannelPipeline:

    class ChildChannelInitializer extends ChannelInitializer<SocketChannel>{
    
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast("http-codec", new HttpServerCodec());
            pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
            socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
            pipeline.addLast("handler", new WebSocketServerHandler());
        }
    }
    

    5.ChannelPipeline调度ChannelHandler。ChannelHandler(下面的WebSocketServerHandler)是处理业务的关键接口,是Netty提供给用户定制和扩展需要实现的接口。比如下面这里就实现了处理网络请求、发送响应信息等。ChannelHandler提供了很多定制化的处理类供使用:

    系统编解码框架

    通用基于长度的半包解码器

    码流日志打印

    SSL安全认证

    链路空闲监测

    流量整形

    Base64编解码等

    class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>{
    
        private WebSocketServerHandshaker handshaker;
    
    
    
        @Override
        protected void messageReceived(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
            System.out.println("解析请求");
            // 传统http接入
            if (o instanceof FullHttpRequest){
                handleHttpRequest(channelHandlerContext, (FullHttpRequest) o);
            } else if (o instanceof WebSocketFrame){
                handleWebSocketFrame(channelHandlerContext, (WebSocketFrame) o);
            }
    
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        // 解码http请求
        private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
    
            // 如果http解码失败 返回http异常
            if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))){
                sentHttpResponse(ctx,req, new DefaultFullHttpResponse(HTTP_1_1, BAD_REQUEST));
                System.out.println("解析异常");
                return;
            }
            String url = "ws://localhost:"+ SocketEnum.PORT.getNum()+"/websocket";
            System.out.println("url:"+url);
            // 握手工厂 构造握手响应返回 本地测试
            WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(url, null, false);
            handshaker = wsFactory.newHandshaker(req);
            if (null == handshaker){
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
            }else {
                handshaker.handshake(ctx.channel(), req);
            }
    
        }
    
        // 解码websocket请求
        private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
            // 判断是否是关闭链路的指令
            if (frame instanceof CloseWebSocketFrame){
                handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
                return;
            }
            // 判断是否是ping消息
            if (frame instanceof PingWebSocketFrame){
                ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
                return;
            }
            //判断如果不是文本消息则抛出异常
            if (!(frame instanceof TextWebSocketFrame)){
                throw new UnsupportedOperationException(String.format("%s frame types noe supported", frame.getClass().getName()));
            }
    
            String  request = ((TextWebSocketFrame)frame).text();
            ctx.channel().write(
                    new TextWebSocketFrame(request
                    + ", 欢迎使用Netty WebSocket 服务,现在时刻:"
                    + new Date().toString()));
        }
    
    
        //发送响应
        private static void sentHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res){
            // 返回响应给客户端
            if (res.status().code() != 200){
                ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
                res.content().writeBytes(buf);
                buf.release();
                setContentLength(res, res.content().readableBytes());
            }
    
            // 如果是非Keep-Aliv,关闭连接
            ChannelFuture f = ctx.channel().writeAndFlush(res);
            if (!isKeepAlive(req) || res.status().code()!= 200){
                f.addListener(ChannelFutureListener.CLOSE);
            }
        }
    

    6.在这些初始化和处理方式都搭建完毕以后即可启动并且监听端口。

      

    Channel channel = serverBootstrap.bind(port).sync().channel();

    内部最后执行:

    protected void doBind(SocketAddress localAddress) throws Exception{
      javaChannel().socket.bind(localAdress, config.getBacklog());  
    }

    然后Selector会轮询,遇到准备就绪的Channel之后就由Reactor线程NioLoop执行ChannelPipeline的对应方法调度Handler去实现具体业务。  

  • 相关阅读:
    jmeter参数化关联
    电商
    mysql联查
    购物车
    冒泡排序、二分查找、选择排序、斐波那契
    python数据转换/9*9表/for循环
    python三角形
    mysql语句
    Selenium 8
    Selenium 7
  • 原文地址:https://www.cnblogs.com/meijsuger/p/11222931.html
Copyright © 2011-2022 走看看