zoukankan      html  css  js  c++  java
  • Netty的学习

    看了几天高并发和NIO 今晚终于要开始学习Netty

    http://ifeve.com/netty5-user-guide/

    Netty实现通信的步骤

    1.创建两个NIO线程组,一个专门用于网络事件的处理(接收客户端的连接), 另一个则进行网络通信读写

    2.创建一个ServerBootStarp对象,配置Netty的一些列参数,例如接受传出数据内存的大小等。

    3.创建一个世纪处理数据的类ChannelInitializer,进行初始化的工作,例如设置接收传出数据的字符集·格式·已经实现处理数据的接口

    4.绑定端口,执行同步阻塞方法等待服务端的启动

    TCP粘包拆包

    TCP是一个流的协议,数据没有分界线 如果传了三个包 ABC  DEF  GHI   TCP会根据缓冲区实际情况进行包的划分就容易 解读为 AB  CDEF  GHI

    解决办法:1.消息定长,例如每个报文大小固定为200,如果不够,空位补空格

         2.在包尾加特殊字符进行分割

         3.将消息分为消息头和消息体,在消息头中包含消息总长度的字段,然后进行业务逻辑的处理

    public class Server {
    
        public static void main(String[] args) throws Exception {
            //1 创建线两个程组 
            //一个是用于处理服务器端接收客户端连接的
            //一个是进行网络通信的(网络读写的)
            EventLoopGroup pGroup = new NioEventLoopGroup();
            EventLoopGroup cGroup = new NioEventLoopGroup();
            
            //2 创建辅助工具类,用于服务器通道的一系列配置
            ServerBootstrap b = new ServerBootstrap();
            b.group(pGroup, cGroup)        //绑定俩个线程组
            .channel(NioServerSocketChannel.class)        //指定NIO的模式
            .option(ChannelOption.SO_BACKLOG, 1024)        //设置tcp缓冲区
            .option(ChannelOption.SO_SNDBUF, 32*1024)    //设置发送缓冲大小
            .option(ChannelOption.SO_RCVBUF, 32*1024)    //这是接收缓冲大小
            .option(ChannelOption.SO_KEEPALIVE, true)    //保持连接
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    //3 在这里配置具体数据接收方法的处理
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
            
            //4 进行绑定 
            ChannelFuture cf1 = b.bind(8765).sync();
            //ChannelFuture cf2 = b.bind(8764).sync();
            //5 等待关闭
            cf1.channel().closeFuture().sync();
            //cf2.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }
    public class ServerHandler extends ChannelHandlerAdapter {
    
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("server channel active... ");
        }
    
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
                ByteBuf buf = (ByteBuf) msg;
                byte[] req = new byte[buf.readableBytes()];
                buf.readBytes(req);
                String body = new String(req, "utf-8");
                System.out.println("Server :" + body );
                String response = "进行返回给客户端的响应:" + body ;
                ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()))
                .addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            System.out.println("write is success");
                        } else {
                            future.cause().printStackTrace();
                        }
                    }
                })
                .addListener(ChannelFutureListener.CLOSE); // 关闭
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx)
                throws Exception {
            System.out.println("读完了");
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable t)
                throws Exception {
            ctx.close();
        }
    
    }
    public class Client {
    
        public static void main(String[] args) throws Exception{
            
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(new ClientHandler());
                }
            });
            
            ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync();

       //这个对象可以看作是一个异步操
      //作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问

    //ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync();
            //发送消息
            Thread.sleep(1000);
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
            //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
            Thread.sleep(2000);
            cf1.channel().writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
            //cf2.channel().writeAndFlush(Unpooled.copiedBuffer("666".getBytes()));
            
            cf1.channel().closeFuture().sync();
            //cf2.channel().closeFuture().sync();
            group.shutdownGracefully();
            
            
            
        }
    }
    public class ClientHandler extends ChannelHandlerAdapter{
    
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                ByteBuf buf = (ByteBuf) msg;
                
                byte[] req = new byte[buf.readableBytes()];
                buf.readBytes(req);
                
                String body = new String(req, "utf-8");
                System.out.println("Client :" + body );
                String response = "收到服务器端的返回信息:" + body;
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            ctx.close();
        }
    
    }

     这里面的Bootstarp 相当于配置引导类

    对于 ChannelInitializer 是ChannelHandler  Netty 程序都是基于 ChannelPipeline。ChannelPipeline 和 EventLoop 和EventLoopGroup 密切相关,因为它们三个都和事件处理相关,所以这就是为什么它们处理IO 的工作由 EventLoop 管理的原因

    当注册一个Channel后 Netty 将这个 Channel 绑定到一个 EventLoop,在 Channel 的生 命周期内总是被绑定到一个 EventLoop。在 Netty IO 操作中,你的程序不需要同步,因为

    一个指定通道的所有 IO 始终由同一个线程来执行

    ServerBootstrap 使用 2 个 EventLoopGroup

    一个 ServerBootstrap 可以认为有 2个 channels 组,
    第一组包含一个单例 ServerChannel,代表持有一个绑定了本地端口的socket;
    第二组包含所有的 Channel,代表服务器已接受了的连接

    EventLoopGroup A 唯一的目的就是接受连接然后交给 EventLoopGroup B,

    //1 创建线两个程组
    //一个是用于处理服务器端接收客户端连接的
    //一个是进行网络通信的(网络读写的)

    Netty可以使用两个不同的 Group,因为服务器程序需要接受很多客户端连接的情况下,一个EventLoopGroup 将是程序性能的瓶颈,因为事件循环忙于处理连接请求,没有多余的资源
    和空闲来处理业务逻辑,最后的结果会是很多连接请求超时。若有两 EventLoops, 即使在高负载下,所有的连接也都会被接受,因为 EventLoops 接受连接不会和哪些已经连接了的处理共享资源

    EventLoopGroup 和 EventLoop 是什么关系?EventLoopGroup 可以包含很多个EventLoop,每个 Channel 绑定一个 EventLoop 不会被改变,因为 EventLoopGroup 包含少量的 EventLoop 的 Channels,很多 Channel 会共享同一个 EventLoop。这意味着在一个Channel 保持 EventLoop 繁忙会禁止其他 Channel 绑定到相同的 EventLoop。我们可以理解为 EventLoop 是一个事件循环线程,而 EventLoopGroup 是一个事件循环集合。

     ChannelFuture 

    这个对象可以看作是一个异步操作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问

    每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture;也就是说,它们都不会阻塞。正如我们前面所提到过的一样,Netty 完全是异步和事件驱动的

    Netty 中发送消息有两种方法:直接写入通道或写入ChannelHandlerContext 对象。这两种方法的主要区别如下:

      直接写入通道导致处理消息从 ChannelPipeline 的尾部开始(ChannelOutboundHandler 出站)

      写入 ChannelHandlerContext 对象导致处理消息从 ChannelPipeline 的下一个handler 开始(ChannelInboundHandler入站)

    对于这些Handler  常 有“ByteToMessageDecoder”、“MessageToByteEncoder”,还有 Google 的协议“ProtobufEncoder”和“ProtobufDecoder”

    我们可以在添加 ChannelHandler 到 ChannelPipeline 中时指定一个 EventExecutorGroup,EventExecutorGroup 会获得一个 EventExecutor,EventExecutor将执行ChannelHandler 的所有方法。EventExecutor 将使用不同的线程来执行和释放EventLoop

    Transport API

    如上图所示,每个 Channel 都会分配一个 ChannelPipeline 和 ChannelConfig

    ChannelConfig 负责设置并存储配置,并允许在运行期间更新它们, ChannelPipeline 容纳了使用的 ChannelHandler 实例

    ChannelHandler 

     传输数据时,将数据从一种格式转换到另一种格式
     异常通知
     Channel 变为有效或无效时获得通知
     Channel 被注册或从 EventLoop 中注销时获得通知
     通知用户特定事件

    Channel 

     eventLoop(),返回分配给 Channel 的 EventLoop
     pipeline(),返回分配给 Channel 的 ChannelPipeline
     isActive(),返回 Channel 是否激活,已激活说明与远程连接对等
     localAddress(),返回已绑定的本地 SocketAddress
     remoteAddress(),返回已绑定的远程 SocketAddress
     write(),写数据到远程客户端,数据通过 ChannelPipeline 传输过去

     不同类型的 ByteBuf

    Heap Buffer(堆缓冲区)

    存储在JVM的堆上,存储在数组中,清除也很快 ByteBuf.array()获取数据   hasArray 检查

    Direct Buffer(直接缓冲区)

    直接缓冲区,在堆之外直接分配内存。直接缓冲区不会占用堆空间容量。直接缓冲区在使用 Socket 传递数据时性能很好,因为若使用间接缓冲区,JVM 会先将数据复制到直接缓冲区再进行传递;缺点是在分配内存空间和释放内存时比堆缓冲区更复杂,而 Netty 使用内存池来解决这样的问题,这也是 Netty 使用内存池的原因之一。直接缓冲区不支持数组访问数据,但是我们可以间接的访问数据数组

      ByteBuf directBuf = Unpooled.directBuffer(16);
      if(!directBuf.hasArray()){
       int len = directBuf.readableBytes();
       byte[] arr = new byte[len];
       directBuf.getBytes(0, arr);
      }

    Composite Buffer(复合缓冲区)

    CompositeByteBuf compBuf = Unpooled.compositeBuffer();
            ByteBuf heapBuf = Unpooled.buffer(8);
            ByteBuf directBuf = Unpooled.directBuffer(16);
            // 添加 ByteBuf 到 CompositeByteBuf
            compBuf.addComponents(heapBuf, directBuf);
            // 删除第一个 ByteBuf
            compBuf.removeComponent(0);
            Iterator<ByteBuf> iter = compBuf.iterator();
            while (iter.hasNext()) {
                System.out.println(iter.next().toString());
            }
            // 使用数组访问数据
            if (!compBuf.hasArray()) {
                int len = compBuf.readableBytes();
                byte[] arr = new byte[len];
                compBuf.getBytes(0, arr);
            }

    ByteBuf 提供两个指针变量支付读和写操作,读操作是使用 readerIndex(),写操作时使用 writerIndex()。这和 JDK 的 ByteBuffer 不同,ByteBuffer 只有一个方法来设置索引所以需要使用 flip()方法来切换读和写模式

    discardReadBytes()可以用来清空 ByteBuf 中已读取的数据, 

    isReadable()  判断 writerIndex > readerIndex;

    writableBytes   capacity() - writerIndex 能否写

    clear() 重设索引    readerIndex = writerIndex = 0;

    调用 duplicate()、slice()、slice(int index, int length)、order(ByteOrder endianness)
    会创建一个现有缓冲区的视图

    ByteBufAllocator 负责分配 ByteBuf 实例  ByteBufAllocator.heapBuffer()

    可以从 Channel 的 alloc()获取,也可以从ChannelHandlerContext 的 alloc()获取

    Unpooled 也能创建ByteBuf实例    Unpooled.compositeBuffer();   Unpooled.buffer(8);

    ChannelHandler

    ChannelPipeline

     ChannelPipeline是ChannelHandler实例的列表,用于处理或截获通道的接收和发送数据。

    addFirst(...),添加ChannelHandler在ChannelPipeline的第一个位置  addBefore(...),在ChannelPipeline中指定的ChannelHandler名称之前添加ChannelHandler

      每个ChannelHandler被添加到ChannelPipeline后,都会创建一个ChannelHandlerContext并与之创建的ChannelHandler关联绑定。ChannelHandlerContext允许ChannelHandler与其他的ChannelHandler实现进行交互。

       如果你想有一些事件流全部通过ChannelPipeline,有两个不同的方法可以做到:
    • 调用Channel的方法
    • 调用ChannelPipeline的方法
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                //Event via Channel
                Channel channel = ctx.channel();
                channel.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));
                //Event via ChannelPipeline
                ChannelPipeline pipeline = ctx.pipeline();
                pipeline.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8));

        //为了节省开销,不感兴趣的ChannelHandler不让通过 直接跳过前面

        //排除一些ChannelHandler

        ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

    
            }
        });
    }

    ChannelHandler实例如果带有@Sharable注解则可以被添加到多个ChannelPipeline。也就是说单个ChannelHandler实例可以有多个ChannelHandlerContext,因此可以调用不同ChannelHandlerContext获取同一个ChannelHandler

    Channel的状态在其生命周期中变化

    • handlerAdded,ChannelHandler添加到实际上下文中准备处理事件
    • handlerRemoved,将ChannelHandler从实际上下文中删除,不再处理事件
    • exceptionCaught,处理抛出的异常

    ChannelInboundHandler

    • channelRegistered,ChannelHandlerContext的Channel被注册到EventLoop;
    • channelUnregistered,ChannelHandlerContext的Channel从EventLoop中注销
    • channelActive,ChannelHandlerContext的Channel已激活
    • channelInactive,ChannelHanderContxt的Channel结束生命周期
    • channelRead,从当前Channel的对端读取消息
    • channelReadComplete,消息读取完成后执行
    • userEventTriggered,一个用户事件被处罚
    • channelWritabilityChanged,改变通道的可写状态,可以使用Channel.isWritable()检查
    • exceptionCaught,重写父类ChannelHandler的方法,处理异常

    常用的 SimpleChannelInboundHandler

     ChannelOutboundHandler

            ChannelOutboundHandler用来处理“出站”的数据消息。ChannelOutboundHandler提供了下面一些方法:
    • bind,Channel绑定本地地址
    • connect,Channel连接操作
    • disconnect,Channel断开连接
    • close,关闭Channel
    • deregister,注销Channel
    • read,读取消息,实际是截获ChannelHandlerContext.read()
    • write,写操作,实际是通过ChannelPipeline写消息,Channel.flush()属性到实际通道
    • flush,刷新消息到通道

     ByteToMessageDecoder

    MessageToByteEncoder

     ByteToMessageCodec   用来处理 byte-to-message 和 message-to-byte

    MessageToMessageCodec

    CombinedChannelDuplexHandler 来结合解码器和编码器   组合

  • 相关阅读:
    Linux操作_常用命令操作练习
    Linux编程_Shell脚本练习题
    Linux操作_grep/egrep工具的使用
    Linux中的链接文件_软链接和硬链接
    Linux操作_磁盘管理_增加虚拟磁盘
    Linux命令_磁盘管理_查看磁盘或目录的容量
    Linux命令_用户身份切换
    使用Unity中的Box Collider组件完成游戏场景中的碰撞检测功能
    在Unity场景中更改天空盒的步骤
    Linux命令_用户和用户组管理
  • 原文地址:https://www.cnblogs.com/mxz1994/p/9324635.html
Copyright © 2011-2022 走看看