zoukankan      html  css  js  c++  java
  • NIO之路3---MINA框架详细解析

    前言:

    上一文分析了Java NIO的实现逻辑, 可以发现NIO相比于BIO来说性能提升了不止一个档次,但是Java的NIO使用还有一个弊端,比如Buffer的API设计太过复杂,想要用好NIO就需要对于NIO的各个组件的所以API使用非常熟练才行,对于新手来说入门门槛比较高。

    所以如果有一个框架能够将NIO的API进行封装,让开发者只需要关心具体的IO数据而不需要关心IO数据的发送和接收过程,那么就会大大降低开发NIO程序的入门门槛。而MINA框架就是基于Java NIO实现的一款基于异步事件的NIO框架。

    一、MINA简介

    MINA是基于Java NIO技术的网络应用程序框架,通过Java NIO通过TCP/UDP等网络协议提供了一套抽象的实践驱动的异步API。MINA提供了网络通信的服务端和客户端,无论是哪段在Min通信框架中的工作流程基本上一致,所以Mina的优点就是将网络通信的操作与业务系统隔离开,业务系统只需要关系数据的发送、接收和具体的业务处理即可。衡量一个开源框架是否成熟,框架对于业务系统代码的侵入性高低是很大一个衡量标准。

    二、MINA的组件

    NIO的三大核心组件为channel(通道)、Buffer(缓冲区)以及Selector(多路复用选择器),而MINA在NIO的三大核心基础之上又衍生出了很多的组件,分别如下:

    2.1、IoBuffer

    IO数据缓冲区,相当于NIO的Buffer,而MINA的IoBuffer实际上也就是NIO的Buffer的封装,因为NIO的Buffer只能对基本数据类型进行操作,而不可以对于字符串或自定义对象类型的操作,而IoBuffer可以对于任何对象类型进行操作。另外Buffer的使用是定长的,而IoBuffer的长度是可变的,使用起来伸缩性更好。

    2.2、IoSession

    客户端和服务端的IO会话,相当于NIO的channel,在MINA中整个IO操作流程中都是建立在IoSession之上,所以IoSession会贯穿整个IO操作流程。IO数据的读写也分别是调用IoSession的write和read方法来进行操作的

    2.3、IoService

    最底层的IO服务,负责具体的IO操作,分别包括服务端的IoSocketAcceptor和IoSocketChannel,类似于NIO中的Selecotr,分别用于IoSession的创建,以及IO数据具体的发送和接收工作。IOService最大的作用是将底层IO数据的操作进行封装,再通过异步的方式将IO事件上报给上层业务层处理,所以相当于将NIO的同步IO改成了异步IO操作

    2.4、IoFilter

    IO过滤器,IoService将IO数据封装成IoBuffer上报给上层,但是上层可需要对数据进行统一的编解码解析操作,而这些统一的编解码操作就可以交给IO过滤器来处理,过滤器将IoService提供的IO数据进行编解码、解析、日志打印等功能处理之后再交给业务层。

    2.5、IoFilterChain

    一组IoFilter链,将多个IoFilter组合成一个IO过滤器链,接收数据就从尾部依次过滤到尾部,发送数据从头部依次过滤到尾部

    2.6、IoProcessor

    为IO数据处理提供多线程环境,IoProcessor通常会有多个,IoService有IO数据交互时会交给IoProcessor,IoProcessor再来异步处理。每一个IoProcessor内部都有一个Selector,相当于将Java NIO的单个Selector改成了多个Selector

    2.7、IoHandler

    IO数据的业务处理器,通过IoFilter过滤之后,会将IO数据封装成业务层需要的个数,比如字符串格式,JSON格式,此时将业务层需要的格式的数据交给业务层处理器IoHandler再进行真正的业务逻辑处理,也就是IO数据流的终点

    总结MINA的整体IO交互架构如下图示:

    三、MINA的工作流程

    服务端核心步骤如下:

    1、创建IoService的实例IoAcceptor对象,负责监听客户端的连接请求

    2、初始化IoAcceptor对象,主要是绑定IoFilter和IoHandler,依次添加多个IoFilter到IoAccetpor上,并且设置IoAcceptor的业务处理器IoHandler对象,最好绑定IoAcceptor的监听端口

    3、IoAcceptor接收客户端的连接请求之后,将IO操作分配给IoProcessor,IoProcessor通常是多线程的,之后的IO处理都是由IoProcessor执行

    4、IoProcessor依次调用IoFilterChain上的所有IO过滤器的过滤方法

    5、IO数据经过所有的IO过滤器处理之后交给业务处理器IoHandler对象进行最终的业务处理

     

    总结MINA的整体工作如下图示:

     

    四、MINA的使用案例

    服务端代码案例如下:

    1、自定义IoFilter的实现类TimeFilter,由于IoFilter定义的方法较多,而不是每一个IoFilter实现类都需要实现所有的方法,比如消息发送和消息接收方法,正常情况下需要对于接收数据进行解码处理,而需要丢发送数据进行编码处理,所以可以定义不同的过滤器实现不同的IoFilter定义的方法,此时就可以不直接实现IoFilter,而是可以通过继承IoFilter的适配器IoFilterAdater,IoFilterAdater实现了所有的IoFilter接口定义的方法,并且都作了默认的处理,所以如果需要自定义IoFilter实现类,就可以直接继承之IoFilterAdapter,并且再重写需要实现的方法即可,如案例中的TimeFilter仅仅只重写了messageReceived和messageSent两个方法

     1 /**
     2  * @Auther: Lucky
     3  * @Date: 2020/8/13 下午10:31
     4  * @Desc: 自定义过滤器实现IoFilter接口
     5  */
     6 public class TimeFilter extends IoFilterAdapter {
     7 
     8     @Override
     9     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
    10         System.out.println("接收消息时过滤,打印当前时间:" + System.currentTimeMillis());
    11     }
    12 
    13     @Override
    14     public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
    15         System.out.println("发送消息时过滤,打印当前时间:" + System.currentTimeMillis());
    16     }
    17 }

    2、自定义业务处理器IoHandler实现类,同样的IoHandler接口也定义了很多的方法,但是并不是所有的业务处理器都需要实现的,同样有一个IoHandler的适配器IoHandlerAdapter类,自定义IoHandler实现类只需要继承之IoHandlerAdapter,重写感兴趣的方法即可

     1 public class GameServerHandler extends IoHandlerAdapter {
     2 
     3     public static GameServer gameServer = new GameServer();
     4 
     5 
     6     @Override
     7     public void messageReceived(IoSession session, Object message) throws Exception {
     8         System.out.println("服务器接收到消息:" + message.toString());
     9         GameEvent event = JSON.parseObject(message.toString(), GameEvent.class);
    10         if(event.getCmd() == 0){//进入房间
    11             gameServer.enter(event.getName(), session);
    12         }else if(event.getCmd() == 1){//准备游戏
    13             gameServer.ready(event.getName(), session);
    14         }
    15     }
    16 }

    3、创建服务器端并开启服务器

     1 public class MinaServer {
     2 
     3     public static void main(String[] args) throws IOException {
     4         System.out.println("开始启动服务器...");
     5         startServer(8000);
     6     }
     7 
     8     public static void startServer(int port) throws IOException {
     9         /** 1.创建IoService对象,服务端是IoAcceptor用于接收客户端连接 */
    10         IoAcceptor acceptor = new NioSocketAcceptor();
    11 
    12         /** 2.给IoService添加过滤器链 */
    13         acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
    14         acceptor.getFilterChain().addLast("log", new LoggingFilter());
    15         acceptor.getFilterChain().addLast("time", new TimeFilter());
    16 
    17         /** 3.添加业务处理器 */
    18         acceptor.setHandler(new GameServerHandler());
    19 
    20         /** 绑定监听端口号 */
    21         acceptor.bind(new InetSocketAddress(port));
    22     }
    23 }

    过程比较清晰,首先是创建IoService的实现类IoAcceptor对象,然后给IoAcceptor添加过滤器,如编解码过滤器,打印日志过滤器以及自定义的过滤器等,然后再给IoAccetor添加业务处理器,最后绑定需要监听的端口即可。

    4、客户端代码如下:

     1 public class PlayerClient {
     2 
     3     private String name;
     4 
     5     public void load(){
     6         /** 创建 IoService实例 */
     7         NioSocketConnector connector = new NioSocketConnector();
     8 
     9         /** 添加编解码过滤器*/
    10         connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"))));
    11 
    12         /** 设置业务处理器*/
    13         connector.setHandler(new PlayerHandler(name));
    14 
    15         /** 连接服务器*/
    16         connector.connect(new InetSocketAddress("localhost", 8000));
    17     }
    18 
    19     public PlayerClient(String name){
    20         this.name = name;
    21     }
    22 }
     1 public class PlayerHandler extends IoHandlerAdapter {
     2     private String name;
     3     public PlayerHandler(String name){
     4         this.name = name;
     5     }
     6 
     7     @Override
     8     public void sessionOpened(IoSession session) throws Exception {
     9         /** 当连接创建成功之后向服务器发送消息*/
    10         GameEvent event = new GameEvent();
    11         event.setCmd(0);
    12         event.setName(name);
    13         session.write(JSON.toJSON(event).toString());
    14     }
    15 
    16     @Override
    17     public void messageReceived(IoSession session, Object message) throws Exception {
    18         System.out.println("玩家:" + name + "接收到消息:" + message.toString());
    19         if(message.toString().equals("进入房间成功")){
    20             GameEvent event = new GameEvent();
    21             event.setName(name);
    22             event.setCmd(1);
    23             session.write(JSON.toJSON(event));
    24         }
    25     }
    26 }

    自定义客户端的IoHandler,重写了sessionOpend方法,作用是和服务器连接成功之后执行该方法,messageReceived方法是接收到服务器发送过来的数据之后执行该方法

    5、其他对象

     1 public class GameEvent {
     2 
     3     private String name;
     4 
     5     private int cmd;
     6 
     7     public String getName() {
     8         return name;
     9     }
    10 
    11     public void setName(String name) {
    12         this.name = name;
    13     }
    14 
    15     public int getCmd() {
    16         return cmd;
    17     }
    18 
    19     public void setCmd(int cmd) {
    20         this.cmd = cmd;
    21     }
    22 }
     1 public class GameServer {
     2 
     3     public static List<PlayerSession> playerList = new ArrayList<>();
     4     public static AtomicInteger readyCount = new AtomicInteger(0);
     5 
     6     public void enter(String name, IoSession session){
     7         System.out.println("玩家:" + name + "进入房间");
     8         if(playerList.size() >=5){
     9             session.write("房间已满,您进入房间失败");
    10         }else {
    11             System.out.println("玩家:" + name + "进入房间成功");
    12             PlayerSession player = new PlayerSession(name, session);
    13             playerList.add(player);
    14             session.write("进入房间成功");
    15         }
    16     }
    17 
    18     public void ready(String name, IoSession session){
    19         System.out.println("玩家:" + name + "已准备");
    20         if(readyCount.incrementAndGet() ==5){
    21             for (PlayerSession player: playerList) {
    22                 player.session.write("游戏开始,欢迎进入王者荣耀");
    23             }
    24         }
    25     }
    26 
    27     static class PlayerSession{
    28         public PlayerSession(String name,IoSession session){
    29             this.name = name;
    30             this.session = session;
    31         }
    32         private String name;
    33         private IoSession session;
    34 
    35         public String getName() {
    36             return name;
    37         }
    38 
    39         public void setName(String name) {
    40             this.name = name;
    41         }
    42 
    43         public IoSession getSession() {
    44             return session;
    45         }
    46 
    47         public void setSession(IoSession session) {
    48             this.session = session;
    49         }
    50     }
    51 
    52 }

    这里定义了一个PlayerSession对象,用于封装IoSession对象,给IoSession对象添加一个用户名称

    6、模拟多个客户端连接服务器的测试代码如下:

     1 public class MinaTest {
     2 
     3     public static void main(String[] args) throws InterruptedException {
     4         PlayerClient client1 = new PlayerClient("赵一");
     5         PlayerClient client2 = new PlayerClient("王二");
     6         PlayerClient client3 = new PlayerClient("张三");
     7         PlayerClient client4 = new PlayerClient("李四");
     8         PlayerClient client5 = new PlayerClient("孙五");
     9         PlayerClient client6 = new PlayerClient("小六");
    10 
    11         client1.load();
    12         Thread.sleep(3000L);
    13 
    14         client2.load();
    15         Thread.sleep(3000L);
    16 
    17         client3.load();
    18         Thread.sleep(3000L);
    19 
    20         client4.load();
    21         Thread.sleep(3000L);
    22 
    23         client5.load();
    24         Thread.sleep(3000L);
    25 
    26         client6.load();
    27     }
    28 }

    执行服务器和客户端main方法,执行结果如下:

    服务端打印结果:

     1 开始启动服务器...
     2 17:16:39.263 [NioProcessor-2] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1
     3 服务器接收到消息:{"name":"赵一","cmd":0}
     4 玩家:赵一进入房间
     5 玩家:赵一进入房间成功
     6 17:16:39.362 [NioProcessor-2] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1
     7 服务器接收到消息:{"name":"赵一","cmd":1}
     8 玩家:赵一已准备
     9 17:16:42.064 [NioProcessor-3] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 2
    10 服务器接收到消息:{"name":"王二","cmd":0}
    11 玩家:王二进入房间
    12 玩家:王二进入房间成功
    13 17:16:42.065 [NioProcessor-3] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 2
    14 服务器接收到消息:{"name":"王二","cmd":1}
    15 玩家:王二已准备
    16 17:16:45.066 [NioProcessor-4] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 3
    17 服务器接收到消息:{"name":"张三","cmd":0}
    18 玩家:张三进入房间
    19 玩家:张三进入房间成功
    20 17:16:45.067 [NioProcessor-4] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 3
    21 服务器接收到消息:{"name":"张三","cmd":1}
    22 玩家:张三已准备
    23 17:16:48.071 [NioProcessor-5] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 4
    24 服务器接收到消息:{"name":"李四","cmd":0}
    25 玩家:李四进入房间
    26 玩家:李四进入房间成功
    27 17:16:48.073 [NioProcessor-5] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 4
    28 服务器接收到消息:{"name":"李四","cmd":1}
    29 玩家:李四已准备
    30 17:16:51.074 [NioProcessor-6] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 5
    31 服务器接收到消息:{"name":"孙五","cmd":0}
    32 玩家:孙五进入房间
    33 玩家:孙五进入房间成功
    34 17:16:51.076 [NioProcessor-6] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 5
    35 服务器接收到消息:{"name":"孙五","cmd":1}
    36 玩家:孙五已准备
    37 17:16:54.078 [NioProcessor-7] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 6
    38 服务器接收到消息:{"name":"小六","cmd":0}
    39 玩家:小六进入房间

    客户端打印结果:

     1 17:16:39.357 [NioProcessor-2] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1
     2 玩家:赵一接收到消息:进入房间成功
     3 17:16:42.064 [NioProcessor-16] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 2
     4 玩家:王二接收到消息:进入房间成功
     5 17:16:45.067 [NioProcessor-30] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 3
     6 玩家:张三接收到消息:进入房间成功
     7 17:16:48.072 [NioProcessor-44] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 4
     8 玩家:李四接收到消息:进入房间成功
     9 17:16:51.075 [NioProcessor-58] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 5
    10 玩家:孙五接收到消息:进入房间成功
    11 17:16:51.076 [NioProcessor-2] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1
    12 17:16:51.076 [NioProcessor-16] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 2
    13 玩家:赵一接收到消息:游戏开始,欢迎进入王者荣耀
    14 17:16:51.077 [NioProcessor-30] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 3
    15 玩家:王二接收到消息:游戏开始,欢迎进入王者荣耀
    16 17:16:51.077 [NioProcessor-44] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 4
    17 玩家:张三接收到消息:游戏开始,欢迎进入王者荣耀
    18 17:16:51.077 [NioProcessor-58] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 5
    19 玩家:李四接收到消息:游戏开始,欢迎进入王者荣耀
    20 玩家:孙五接收到消息:游戏开始,欢迎进入王者荣耀
    21 17:16:54.079 [NioProcessor-72] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 6
    22 玩家:小六接收到消息:房间已满,您进入房间失败

    本案例主要模拟了王者荣耀开始游戏的场景,六个客户端分别和游戏服务器进行连接,连接成功就表示进入房间成功,连接成功之后服务器回复客户端进入房间成功,然后客户端再开始准备,准备完成之后服务器就回复正式进入游戏,而本案例模拟了6个客户端,当5个客户端准备了之后就进入了游戏,第六个房间提升进入房间失败。

    五、MINA的线程模型

    Java NIO的服务端一般线程模型为开启一个线程通过Selecotor监听客户端的连接请求和IO操作,只有有事件发生(连接请求和IO操作)就会立即去处理,这样的线程模型有一个弊端Selector对于客户端连接请求的channel和客户端IO操作的channel是用同一个Selector来监控的,如下图示

    这样就会有一个问题,但IO操作比较耗时时就会导致连接请求的channel也会被阻塞,因为Selector是单线程同步处理所有IO事件的,如上图假设客户端A连接成功之后发送IO数据,Selector接收到IO数据之后开始处理,假设处理时间需要10秒,而在10秒之内客户端B通过ServerSocketChannel请求连接服务器,此时Selector还是处于处理IO事件的状态,只要客户端A的IO事件处理完成,才会继续执行select方法处理下一轮的IO事件。整个流程就好比Selector处理IO事件是一批一批的处理,处理完了第一批之后才会处理第二批的IO事件,如果第一批处理比较慢,就会影响到第二批,而第二批中很可能都是耗时比较短的OP_ACCEPT事件。所以为了让服务器在处理OP_READ事件时不会影响OP_ACCEPT事件,MINA在NIO的线程模型之上进行了优化,单独开启一个线程用于处理客户端的连接请求,而对于IO事件再由于IoProcessor线程去处理,而IoProcessor又可以创建多个,这就形成了一个Selector用于负责监听客户的连接请求,然后有多个Selector分别监听多个Channel的IO事件的线程模型,如下图示:

    总结:

    1、服务端创建IoAcceptor,开启单线程通过Selector用来监听所有客户端请求连接的OP_ACCEPT事件,连接成功之后将客户端封装成IoSession,将IoSession交给IoProcessor线程组

    2、IoSession通过取模算法被分配给一个IoProcessor,IoProcessor通过Selector监听注册到此Selecotr上的所有IoSession的IO状态,当有IO事件发生时,进行IO数据的读写并将IO数据先通过IoFilterChain进行过滤,最后交给业务层IoHandler进行业务处理

      

    后结:本文主要介绍来MINA的基本理论和实验案例,下一文将分析MINA的源码实现:NIO之路--MINA框架源码解析

  • 相关阅读:
    win10安装virtualBox创建CentOS6.5虚拟机
    ES系列二、CentOS7安装ES head6.3.1
    ES系列一、CentOS7安装ES 6.3.1、集成IK分词器
    Python easyGUI 猜数字
    Python easyGUI 登录框 非空验证
    Python easyGUI 文件浏览 显示文件内容
    Python easyGUI 文件对比 覆盖保存
    Python 统计代码量
    什么是一个人真正的魅力?
    Python学习笔记(15)- osos.path 操作文件
  • 原文地址:https://www.cnblogs.com/jackion5/p/11013222.html
Copyright © 2011-2022 走看看