zoukankan      html  css  js  c++  java
  • Netty框架

    简介

    netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。
    提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。特点:并发高、传输快、封装好。

    核心组件

    一、Channel:

    1.概念

    channel代表的一个连接,每个client请求都会对应到具体的一个channel。在Netty里,Channel是通讯的载体,而ChannelHandler负责Channel中的逻辑处理。它是Netty网络通信的主体,由它负责同对端进行网络通信、注册和数据操作等功能。

    状态主要包括:打开、关闭、连接
    主要的IO操作,读(read)、写(write)、连接(connect)、绑定(bind)。
    所有的IO操作都是异步的,调用诸如read,write方法后,并不保证IO操作完成,但会返回一个凭证,在IO操作成功,取消或失败后会记录在该凭证中。

    2.组件

    Channel对应有几个重要的组件,分别是ChannelHandler、ChannelHandlerContext、ChannelPipeline

    以下是它们的关系图:

    1>ChannelHandler

    是业务逻辑的核心处理类,用于处理Channel对应的事件。

    ChannelHandler接口里面只定义了三个生命周期方法,我们主要实现它的子接口ChannelInboundHandler和ChannelOutboundHandler,为了便利,框架提供了ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler这三个适配类,在使用的时候只需要实现你关注的方法即可。

    *以下是ChannelHandler类图:

     

    *ChannelHandler下主要是两个子接口:

          ChannelInboundHandler(入站): 处理输入数据和Channel状态类型改变。

                                          适配器: ChannelInboundHandlerAdapter(适配器设计模式)

                                          常用的: SimpleChannelInboundHandler

        ChannelOutboundHandler(出站): 处理输出数据

                                          适配器: ChannelOutboundHandlerAdapter

    每一个Handler都一定会处理出站或者入站(可能两者都处理),区别在于SimpleChannelInboundHandler会对没有外界引用的资源进行一定的清理,并且入站的消息可以通过泛型来规定。

    这里为什么有设配器模式呢?

          我们在写自定义Handel时候,很少会直接实现上面两个接口,因为接口中有很多默认方法需要实现,所以这里就采用了设配器模式,ChannelInboundHandlerAdapter和

    ChannelInboundHandlerAdapter就是设配器模式的产物,让它去实现上面接口,实现它所有方法。那么你自己写自定义Handel时,只要继承它,就无须重写上面接口的所有方法了。

    *ChannelHandler 生命周期:

    handlerAdded: 当 ChannelHandler 添加到 ChannelPipeline 调用

    handlerRemoved: 当 ChannelHandler 从 ChannelPipeline 移除时调用

    exceptionCaught: 当 ChannelPipeline 执行抛出异常时调用

     

    2>channelHandlerContex

    是维护ChannelHandler的上下文内容(1. channelHandler的一些状态信息的存储;2. channelHandler之间互相通讯的桥梁,如本文第一张图便可知道ChannelHandlerContext的重要性了。(通过channelHandlerContext,我们可用把上一个channelHandler的处理结果传递给下一个channelHandler))。

    每个ChannelHandler通过add方法加入到ChannelPipeline中去的时候,会创建一个对应的ChannelHandlerContext,并且绑定,ChannelPipeline实际维护的是ChannelHandlerContext 的关系。
    每个ChannelHandlerContext之间形成双向链表。 

    3>channelPipeline

    在Channel创建的时候,会同时创建ChannelPipeline。可以理解为ChannelPipeline是ChannelHandler的容器,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。channel事件消息在ChannelPipeline中流动和传播,相应的事件能够被ChannelHandler拦截处理、传递、忽略或者终止。Pipeline把channelHandle和Context给连接起来。

    以下是channelPipeline内的关系图:

    由上图可以看出,ChannelPipeline包含两条线路:Upstream和Downstream。Upstream对应上行,接收到的消息、被动的状态改变,都属于Upstream。Downstream则对应下行,发送的消息、主动的状态改变,都属于Downstream。

     1 /**
     2   * 可以看到,DefaultChinnelPipeline 内部使用了两个特殊的Hander 来表示Handel链的头和尾。
     3   */
     4  public DefaultChannelPipeline(AbstractChannel channel) {
     5         if (channel == null) {
     6             throw new NullPointerException("channel");
     7         }
     8         this.channel = channel;
     9  
    10         TailHandler tailHandler = new TailHandler();
    11         tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);
    12  
    13         HeadHandler headHandler = new HeadHandler(channel.unsafe());
    14         head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);
    15  
    16         head.next = tail;
    17         tail.prev = head;
    18     }

     由上图和代码可以看出,对于DefaultChinnelPipeline它的Handel头部和尾部的Handel是固定的,我们所添加的Handel是添加在这个头和尾之前的Handel。(下面这个图更加清晰)

    3.Channel 生命周期

    1>channelRegistered: channel注册到一个EventLoop。
    2>channelActive: 变为活跃状态(连接到了远程主机),可以接受和发送数据
    3>channelInactive: channel处于非活跃状态,没有连接到远程主机
    4>channelUnregistered: channel已经创建,但是未注册到一个EventLoop里面,也就是没有和Selector绑定

     二、Netty线程模型

    Netty 主要基于主从 Reactors 多线程模型(如下图)做了一定的修改,其中主从 Reactor 多线程模型有多个 Reactor:

    MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor。

    SubReactor 负责相应通道的 IO 读写请求。

    非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。

    这里引用 Doug Lee 大神的 Reactor 介绍:Scalable IO in Java 里面关于主从 Reactor 多线程模型的图:

     

    特别说明的是:虽然 Netty 的线程模型基于主从 Reactor 多线程,借用了 MainReactor 和 SubReactor 的结构。但是实际实现上 SubReactor 和 Worker 线程在同一个线程池中:

    1 EventLoopGroup bossGroup = new NioEventLoopGroup();
    2 EventLoopGroup workerGroup = new NioEventLoopGroup();
    3 ServerBootstrap server = new ServerBootstrap();
    4 server.group(bossGroup, workerGroup)
    5 .channel(NioServerSocketChannel.class)

    上面代码中的 bossGroup 和 workerGroup 是 Bootstrap 构造方法中传入的两个对象,这两个 group 均是线程池:

    bossGroup 线程池则只是在 Bind 某个端口后,获得其中一个线程作为 MainReactor,专门处理端口的 Accept 事件,每个端口对应一个 Boss 线程。

    workerGroup 线程池会被各个 SubReactor 和 Worker 线程充分利用。

    三、基于netty构建服务的基本步骤

    1.定义两个线程组 也叫做事件循环组

    2.定义一个的启动服务类

     1 package cn.haoxiaoyong.netty.netty;
     2 
     3 import io.netty.bootstrap.ServerBootstrap;
     4 import io.netty.channel.ChannelFuture;
     5 import io.netty.channel.EventLoopGroup;
     6 import io.netty.channel.nio.NioEventLoopGroup;
     7 import io.netty.channel.socket.nio.NioServerSocketChannel;
     8 import org.springframework.stereotype.Component;
     9 
    10 import javax.annotation.PostConstruct;
    11 
    12 /**
    13  *  @author chensisi
    14  *  @Date 2020/6/22
    15  *  @Description 1.实现客户端发送请求
    16  */
    17 @Component
    18 public class WebSocketServer {
    19 
    20 
    21     /**
    22      *主线程组,用于接收客户端的链接,但不做任何处理
    23      */
    24     private EventLoopGroup bossGroup;
    25     /**
    26      *定义从线程组,主线程组会把任务转给从线程组进行处理
    27      */
    28     private EventLoopGroup workerGroup;
    29     /**
    30      *服务器
    31      */
    32     private ServerBootstrap server;
    33     /**
    34      *回调
    35      */
    36      private ChannelFuture future;
    37 
    38     /**
    39      * 服务启动类,任务分配自动处理
    40      *
    41      */
    42     @PostConstruct
    43     public void start() {
    44         //启动
    45         //绑定监听端口,调用sync同步阻塞方法等待绑定操作完
    46         // ChannelFuture future = server.bind(port).sync();
    47         try {
    48             future = server.bind(9001);
    49             System.out.println("netty server - 启动成功");
    50
    51             //获取某个客户端所对应的chanel,关闭并设置同步方式
    52             //future.channel().closeFuture().sync();
    53         } catch (Exception e) {
    54             e.printStackTrace();
    55         } finally {
    56             //使用一种优雅的方式进行关闭
    57             bossGroup.shutdownGracefully();
    58             workerGroup.shutdownGracefully();
    59         }
    60     }
    61 
    62     /**
    63      * 构造方法,方便调用
    64      */
    65     public WebSocketServer() {
    66         bossGroup = new NioEventLoopGroup();
    67         workerGroup = new NioEventLoopGroup();
    68         server = new ServerBootstrap();
    69         //需要去针对一个之前的线程模型(上面定义的是主从线程)
    70         server.group(bossGroup,workerGroup)
    71                 //设置NIO的双向通道
    72                 .channel(NioServerSocketChannel.class)
    73                 //子处理器,用于处理workerGroup
    74                 /*设置chanel初始化器
    75                  每一个chanel由多个handler共同组成管道(pipeline)*/
    76 
    77                 .childHandler(new WebSocketInitializer());
    78     }
    79 
    80 }

    2. 通过ChannelPipeline初始化处理器,类似于拦截器Chain,当客户端首次连接后即调用initChannel方法完成初始化动作。

       初始化器,服务端启动后会自动调用这个方法,它是一个回调方法

     1 public class WebSocketInitializer extends ChannelInitializer<SocketChannel> {
     2     /**
     3      *
     4      * @param ch
     5      * @throws Exception
     6      */
     7     @Override
     8     protected void initChannel(SocketChannel ch) throws Exception {
     9         ChannelPipeline pipeline = ch.pipeline();
    10         /*
    11                 用于支持 Http协议
    12         */
    13         //websocket基于 http协议,需要有 http 的编解码器
    14         pipeline.addLast(new HttpServerCodec())
    15                 //对于大数据流的支持
    16                 .addLast(new ChunkedWriteHandler())
    17                 //添加对HTTP请求和响应的聚合器:只要使用Netty进行 http编码都需要使用到
    18                 //对HttpMessage进行聚合,聚合成FullHttpRequest或者FullHttpResponse
    19                 //在 netty 编程总都会使用到Handler
    20                 .addLast(new HttpObjectAggregator(1024*64))
    21                 .addLast(new WebSocketServerProtocolHandler("/ws"))
    22                 //添加Netty空闲超时检查的支持
    23                 //4:读空闲超时,8:写空闲超时,12: 读写空闲超时
    24                 .addLast(new IdleStateHandler(4,8,12))
    25                 .addLast(new HearBeatHandler())
    26                 //添加自定有的 handler
    27                 .addLast(new ChatHandler());
    28     }
    29 

    3.  Client的消息处理类Handler,通常继承SimpleChannelInboundHandler<T>, 该处理器重写channelRead0方法,该方法负责请求接入,读取客户端请求,发送响应给客户端,并且重写了ChannelInboundHandlerAdapter父类的几个方法,分析不同事件方法的调用

     1 public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
     2 
     3     private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd hh:MM");
     4 
     5     private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
     6 
     7     private static ApplicationContext applicationContext;
     8 
     9     /**
    10      * 客户端读取到数据后干什么
    11      *
    12      * 每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel。其中如果你使用的是 Netty 5.x 版本时,需要把 channelRead0() 重命名为messageReceived()
    13      */
    14     @Override
    15     protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
    16         String content = textWebSocketFrame.text();
    17         System.out.println("接收到的数据: " + content);
    18 
    19         Message message = JSON.parseObject(content, Message.class);
    20         ChatRecordService chatRecordService = SpringUtil.getBean(ChatRecordService.class);
    21         switch (message.getType()) {
    22             //处理客户端链接的消息
    23             //表示连接
    24             case 0:
    25                 //建立用户和通道之间的关系
    26                 UserChannelMap.put(message.getChatRecord().getUserid(), channelHandlerContext.channel());
    27                 System.out.println(message.getChatRecord().getUserid() + "与" + channelHandlerContext.channel().id() + "建立了关联");
    28                 UserChannelMap.print();
    29                 break;
    30             //表示发送消息
    31             case 1:
    32                 //将消息保存到数据库
    33                 ChatRecord chatRecord = message.getChatRecord();
    34                 chatRecordService.insert(chatRecord);
    35                 //查看此好友是否在线,如果在线就将消息发送给此好友
    36                 //1.根据好友id,查询此通道是否存在
    37                 Channel channel = UserChannelMap.get(chatRecord.getFriendid());
    38                 if (channel != null) {
    39                     channel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(message)));
    40                 } else {
    41                     System.out.println("用户" + chatRecord.getFriendid() + "不在线");
    42                 }
    43                 break;
    44             //接收消息
    45             case 2:
    46                 //将消息设置为已读
    47                 chatRecordService.updateChatRecordHasRead(message.getChatRecord().getId());
    48                 break;
    49             //检测心跳
    50             case 3:
    51                 //接收心跳信息
    52                 System.out.println("接收到心跳消息"+JSON.toJSONString(message));
    53         }
    54         /*//将接收的消息发送所有的客户端
    55         for (Channel channel : channels) {
    56             channel.writeAndFlush(new TextWebSocketFrame(sdf.format(new Date()) + ":" + content));
    57         }*/
    58     }
    59 
    60     /**
    61      * 助手类添加
    62      * 每当从服务端收到新的客户端连接时,客户端的 Channel 存入  ChannelGroup列表中,并通知列表中的其他客户端 Channel
    63      * 当有新的客户端连接服务器之后,就会自动调用这个方法
    64      */
    65     @Override
    66     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    67         channels.add(ctx.channel());
    68     }
    69     /**
    70      * 捕获channel异常
    71      * 出现异常是关闭通道
    72      */
    73     @Override
    74     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    75         //根据通道id取消用户和通道的关系
    76         UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
    77         ctx.channel().close();
    78         System.out.println("出现异常.....关闭通道!");
    79     }
    80     /**
    81      * 助手类移除
    82      * 每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客户端 Channel
    83      * 当客户端关闭链接时关闭通道
    84      */
    85     @Override
    86     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    87         System.out.println("关闭通道");
    88         UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
    89         ctx.channel().close();
    90         UserChannelMap.print();
    91     }

     4.心跳机制(基本步骤只要前3步即可)(这个心跳机制还没有细细研究,等研究完了再来补上相关知识点)

     1 public class HearBeatHandler extends ChannelInboundHandlerAdapter {
     2 
     3     /**
     4      * 触发用户事件
     5      */
     6     @Override
     7     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
     8         //evt是否为一个IdleStateEvent类的实例
     9         if (evt instanceof IdleStateEvent) {
    10             IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
    11             if (idleStateEvent.state() == IdleState.READER_IDLE) {
    12                 //读空闲
    13                 //检测到读空闲不做任何的操作
    14                 System.out.println("读空闲事件触发...");
    15             } else if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
    16                 //写空闲
    17                 //检测到写空闲不做任何的操作
    18                 System.out.println("写空闲事件触发...");
    19             } else if (idleStateEvent.state() == IdleState.ALL_IDLE) {
    20                 //读写空闲
    21                 System.out.println("--------------");
    22                 System.out.println("读写空闲事件触发");
    23                 System.out.println("关闭通道资源");
    24                 ctx.channel().close();
    25             }
    26         }
    27     }
    28 }

    以上只是部分代码

    代码学习的git地址:

    后端项目地址: https://github.com/haoxiaoyong1014/chat-software

    前端项目地址: https://github.com/haoxiaoyong1014/chat-view

    netty的优点

    零拷贝

    Netty的传输快其实也是依赖了NIO的这个。我们知道,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达他的的终点,如果数据量大,就会造成不必要的资源浪费。

    Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。

     

                   传统数据拷贝

                     零拷贝

    抄录地址:

    1.Netty的使用:Server和Client通信

    2.SpringBoot+Netty开发IM即时通讯系列(一)

    3.(七)分布式通信----Netty实现NIO通信

    4.【Netty】(8)---理解ChannelPipeline

    5. Netty学习笔记之ChannelHandler

    6.这可能是目前最透彻的Netty原理架构解析

    Netty学习笔记之ChannelHandler

    土豆肉丝盖浇饭
  • 相关阅读:
    SqlLite的使用
    asp.net批量上传图片带进度条显示
    对于GridView控件的RowDataBount事件的错误理解
    关于SQL中时间对比
    关于使用触发器时使用@@identity的问题
    关于Treeview控件如何给每个节点加js脚本的方法
    /etc/init.d/functions详解
    如何解决安装DreamWeaver8 时候提示“无法将数值写入键/SOFTWARE/classes/.shtml”
    [请教]关于超大数据量网站的数据搜索和分页的实现方法
    svchost.exe[900]中发生未处理的win32异常
  • 原文地址:https://www.cnblogs.com/chensisi/p/13164402.html
Copyright © 2011-2022 走看看