zoukankan      html  css  js  c++  java
  • Netty的学习

    看了几天高并发和NIO 今晚终于要开始学习Netty

    http://ifeve.com/netty5-user-guide/

    Netty实现通信的步骤

    1.创建两个NIO线程组,一个专门用于网络事件的处理(接收客户端的连接), 另一个则进行网络通信读写

    2.创建一个ServerBootStarp对象,配置Netty的一些列参数,例如接受传出数据内存的大小等。

    3.创建一个世纪处理数据的类ChannelInitializer,进行初始化的工作,例如设置接收传出数据的字符集·格式·已经实现处理数据的接口

    4.绑定端口,执行同步阻塞方法等待服务端的启动

    TCP粘包拆包

    TCP是一个流的协议,数据没有分界线 如果传了三个包 ABC  DEF  GHI   TCP会根据缓冲区实际情况进行包的划分就容易 解读为 AB  CDEF  GHI

    解决办法:1.消息定长,例如每个报文大小固定为200,如果不够,空位补空格

         2.在包尾加特殊字符进行分割

         3.将消息分为消息头和消息体,在消息头中包含消息总长度的字段,然后进行业务逻辑的处理

    public class Server {
    
        public static void main(String[] args) throws Exception {
            //1 创建线两个程组 
            //一个是用于处理服务器端接收客户端连接的
            //一个是进行网络通信的(网络读写的)
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            //2 创建辅助工具类,用于服务器通道的一系列配置
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)        //绑定俩个线程组
            .channel(NioServerSocketChannel.class)        //指定NIO的模式
            .option(ChannelOption.SO_BACKLOG, 1024)        //设置tcp缓冲区
            .option(ChannelOption.SO_SNDBUF, 32*1024)    //设置发送缓冲大小
            .option(ChannelOption.SO_RCVBUF, 32*1024)    //这是接收缓冲大小
            .option(ChannelOption.SO_KEEPALIVE, true)    //保持连接
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    //3 在这里配置具体数据接收方法的处理
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
            
            //4 进行绑定 
            ChannelFuture cf1 = b.bind(8765).sync();
            //ChannelFuture cf2 = b.bind(8764).sync();
            //5 等待关闭
            cf1.channel().closeFuture().sync();
            //cf2.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }
    public class ServerHandler extends ChannelHandlerAdapter {
    
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("server channel active... ");
        }
    
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
                ByteBuf buf = (ByteBuf) msg;
                byte[] req = new byte[buf.readableBytes()];
                buf.readBytes(req);
                String body = new String(req, "utf-8");
                System.out.println("Server :" + body );
                String response = "进行返回给客户端的响应:" + body ;
                ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()))
                .addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            System.out.println("write is success");
                        } else {
                            future.cause().printStackTrace();
                        }
                    }
                })
                .addListener(ChannelFutureListener.CLOSE); // 关闭
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx)
                throws Exception {
            System.out.println("读完了");
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
                throws Exception {
            ctx.close();
        }
    
    }
    public class Client {
    
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
            
            ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();

       //这个对象可以看作是一个异步操
      //作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问

    //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();
            //发送消息
            Thread.sleep(1000);
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
            //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
            Thread.sleep(2000);
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
            //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
            
            cf1.channel().closeFuture().sync();
            //cf2.channel().closeFuture().sync();
            group.shutdownGracefully();
            
            
            
        }
    }
    public class ClientHandler extends ChannelHandlerAdapter{
    
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                ByteBuf buf = (ByteBuf) msg;
                
                byte[] req = new byte[buf.readableBytes()];
                buf.readBytes(req);
                
                String body = new String(req, "utf-8");
                System.out.println("Client :" + body );
                String response = "收到服务器端的返回信息:" + body;
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            ctx.close();
        }
    
    }

     这里面的Bootstarp 相当于配置引导类

    对于 ChannelInitializer 是ChannelHandler  Netty 程序都是基于 ChannelPipeline。ChannelPipeline 和 EventLoop 和EventLoopGroup 密切相关,因为它们三个都和事件处理相关,所以这就是为什么它们处理IO 的工作由 EventLoop 管理的原因

    当注册一个Channel后 Netty 将这个 Channel 绑定到一个 EventLoop,在 Channel 的生 命周期内总是被绑定到一个 EventLoop。在 Netty IO 操作中,你的程序不需要同步,因为

    一个指定通道的所有 IO 始终由同一个线程来执行

    ServerBootstrap 使用 2 个 EventLoopGroup

    一个 ServerBootstrap 可以认为有 2个 channels 组,
    第一组包含一个单例 ServerChannel,代表持有一个绑定了本地端口的socket;
    第二组包含所有的 Channel,代表服务器已接受了的连接

    EventLoopGroup A 唯一的目的就是接受连接然后交给 EventLoopGroup B,

    //1 创建线两个程组
    //一个是用于处理服务器端接收客户端连接的
    //一个是进行网络通信的(网络读写的)

    Netty可以使用两个不同的 Group,因为服务器程序需要接受很多客户端连接的情况下,一个EventLoopGroup 将是程序性能的瓶颈,因为事件循环忙于处理连接请求,没有多余的资源
    和空闲来处理业务逻辑,最后的结果会是很多连接请求超时。若有两 EventLoops, 即使在高负载下,所有的连接也都会被接受,因为 EventLoops 接受连接不会和哪些已经连接了的处理共享资源

    EventLoopGroup 和 EventLoop 是什么关系?EventLoopGroup 可以包含很多个EventLoop,每个 Channel 绑定一个 EventLoop 不会被改变,因为 EventLoopGroup 包含少量的 EventLoop 的 Channels,很多 Channel 会共享同一个 EventLoop。这意味着在一个Channel 保持 EventLoop 繁忙会禁止其他 Channel 绑定到相同的 EventLoop。我们可以理解为 EventLoop 是一个事件循环线程,而 EventLoopGroup 是一个事件循环集合。

     ChannelFuture 

    这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问

    每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture;也就是说,它们都不会阻塞。正如我们前面所提到过的一样,Netty 完全是异步和事件驱动的

    Netty 中发送消息有两种方法:直接写入通道或写入ChannelHandlerContext 对象。这两种方法的主要区别如下:

      直接写入通道导致处理消息从 ChannelPipeline 的尾部开始(ChannelOutboundHandler 出站)

      写入 ChannelHandlerContext 对象导致处理消息从 ChannelPipeline 的下一个handler 开始(ChannelInboundHandler入站)

    对于这些Handler  常 有“ByteToMessageDecoder”、“MessageToByteEncoder”,还有 Google 的协议“ProtobufEncoder”和“ProtobufDecoder”

    我们可以在添加 ChannelHandler 到 ChannelPipeline 中时指定一个 EventExecutorGroup,EventExecutorGroup 会获得一个 EventExecutor,EventExecutor将执行ChannelHandler 的所有方法。EventExecutor 将使用不同的线程来执行和释放EventLoop

    Transport API

    如上图所示,每个 Channel 都会分配一个 ChannelPipeline 和 ChannelConfig

    ChannelConfig 负责设置并存储配置,并允许在运行期间更新它们, ChannelPipeline 容纳了使用的 ChannelHandler 实例

    ChannelHandler 

     传输数据时,将数据从一种格式转换到另一种格式
     异常通知
     Channel 变为有效或无效时获得通知
     Channel 被注册或从 EventLoop 中注销时获得通知
     通知用户特定事件

    Channel 

     eventLoop(),返回分配给 Channel 的 EventLoop
     pipeline(),返回分配给 Channel 的 ChannelPipeline
     isActive(),返回 Channel 是否激活,已激活说明与远程连接对等
     localAddress(),返回已绑定的本地 SocketAddress
     remoteAddress(),返回已绑定的远程 SocketAddress
     write(),写数据到远程客户端,数据通过 ChannelPipeline 传输过去

     不同类型的 ByteBuf

    Heap Buffer(堆缓冲区)

    存储在JVM的堆上,存储在数组中,清除也很快 ByteBuf.array()获取数据   hasArray 检查

    Direct Buffer(直接缓冲区)

    直接缓冲区,在堆之外直接分配内存。直接缓冲区不会占用堆空间容量。直接缓冲区在使用 Socket 传递数据时性能很好,因为若使用间接缓冲区,JVM 会先将数据复制到直接缓冲区再进行传递;缺点是在分配内存空间和释放内存时比堆缓冲区更复杂,而 Netty 使用内存池来解决这样的问题,这也是 Netty 使用内存池的原因之一。直接缓冲区不支持数组访问数据,但是我们可以间接的访问数据数组

      ByteBuf directBuf = Unpooled.directBuffer(16);
      if(!directBuf.hasArray()){
       int len = directBuf.readableBytes();
       byte[] arr = new byte[len];
       directBuf.getBytes(0, arr);
      }

    Composite Buffer(复合缓冲区)

    CompositeByteBuf compBuf = Unpooled.compositeBuffer();
            ByteBuf heapBuf = Unpooled.buffer(8);
            ByteBuf directBuf = Unpooled.directBuffer(16);
            // 添加 ByteBuf 到 CompositeByteBuf
            compBuf.addComponents(heapBuf, directBuf);
            // 删除第一个 ByteBuf
            compBuf.removeComponent(0);
            Iterator<ByteBuf> iter = compBuf.iterator();
            while (iter.hasNext()) {
                System.out.println(iter.next().toString());
            }
            // 使用数组访问数据
            if (!compBuf.hasArray()) {
                int len = compBuf.readableBytes();
                byte[] arr = new byte[len];
                compBuf.getBytes(0, arr);
            }

    ByteBuf 提供两个指针变量支付读和写操作,读操作是使用 readerIndex(),写操作时使用 writerIndex()。这和 JDK 的 ByteBuffer 不同,ByteBuffer 只有一个方法来设置索引所以需要使用 flip()方法来切换读和写模式

    discardReadBytes()可以用来清空 ByteBuf 中已读取的数据, 

    isReadable()  判断 writerIndex > readerIndex;

    writableBytes   capacity() - writerIndex 能否写

    clear() 重设索引    readerIndex = writerIndex = 0;

    调用 duplicate()、slice()、slice(int index, int length)、order(ByteOrder endianness)
    会创建一个现有缓冲区的视图

    ByteBufAllocator 负责分配 ByteBuf 实例  ByteBufAllocator.heapBuffer()

    可以从 Channel 的 alloc()获取,也可以从ChannelHandlerContext 的 alloc()获取

    Unpooled 也能创建ByteBuf实例    Unpooled.compositeBuffer();   Unpooled.buffer(8);

    ChannelHandler

    ChannelPipeline

     ChannelPipeline是ChannelHandler实例的列表,用于处理或截获通道的接收和发送数据。

    addFirst(...),添加ChannelHandler在ChannelPipeline的第一个位置  addBefore(...),在ChannelPipeline中指定的ChannelHandler名称之前添加ChannelHandler

      每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext并与之创建的ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler实现进行交互。

       如果你想有一些事件流全部通过ChannelPipeline,有两个不同的方法可以做到:
    • 调用Channel的方法
    • 调用ChannelPipeline的方法
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                //Event via Channel
                Channel channel = ctx.channel();
                channel.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));
                //Event via ChannelPipeline
                ChannelPipeline pipeline = ctx.pipeline();
                pipeline.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));

        //为了节省开销,不感兴趣的ChannelHandler不让通过 直接跳过前面

        //排除一些ChannelHandler

        ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

    
            }
        });
    }

    ChannelHandler实例如果带有@Sharable注解则可以被添加到多个ChannelPipeline。也就是说单个ChannelHandler实例可以有多个ChannelHandlerContext,因此可以调用不同ChannelHandlerContext获取同一个ChannelHandler

    Channel的状态在其生命周期中变化

    • handlerAdded,ChannelHandler添加到实际上下文中准备处理事件
    • handlerRemoved,将ChannelHandler从实际上下文中删除,不再处理事件
    • exceptionCaught,处理抛出的异常

    ChannelInboundHandler

    • channelRegistered,ChannelHandlerContext的Channel被注册到EventLoop;
    • channelUnregistered,ChannelHandlerContext的Channel从EventLoop中注销
    • channelActive,ChannelHandlerContext的Channel已激活
    • channelInactive,ChannelHanderContxt的Channel结束生命周期
    • channelRead,从当前Channel的对端读取消息
    • channelReadComplete,消息读取完成后执行
    • userEventTriggered,一个用户事件被处罚
    • channelWritabilityChanged,改变通道的可写状态,可以使用Channel.isWritable()检查
    • exceptionCaught,重写父类ChannelHandler的方法,处理异常

    常用的 SimpleChannelInboundHandler

     ChannelOutboundHandler

            ChannelOutboundHandler用来处理“出站”的数据消息。ChannelOutboundHandler提供了下面一些方法:
    • bind,Channel绑定本地地址
    • connect,Channel连接操作
    • disconnect,Channel断开连接
    • close,关闭Channel
    • deregister,注销Channel
    • read,读取消息,实际是截获ChannelHandlerContext.read()
    • write,写操作,实际是通过ChannelPipeline写消息,Channel.flush()属性到实际通道
    • flush,刷新消息到通道

     ByteToMessageDecoder

    MessageToByteEncoder

     ByteToMessageCodec   用来处理 byte-to-message 和 message-to-byte

    MessageToMessageCodec

    CombinedChannelDuplexHandler 来结合解码器和编码器   组合

  • 相关阅读:
    Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.
    DHCP "No subnet declaration for xxx (no IPv4 addresses)" 报错
    Centos安装前端开发常用软件
    kubernetes学习笔记之十:RBAC(二)
    k8s学习笔记之StorageClass+NFS
    k8s学习笔记之ConfigMap和Secret
    k8s笔记之chartmuseum搭建
    K8S集群集成harbor(1.9.3)服务并配置HTTPS
    Docker镜像仓库Harbor1.7.0搭建及配置
    Nginx自建SSL证书部署HTTPS网站
  • 原文地址:https://www.cnblogs.com/mxz1994/p/9324635.html
Copyright © 2011-2022 走看看