前言:
上一文分析了Java NIO的实现逻辑, 可以发现NIO相比于BIO来说性能提升了不止一个档次,但是Java的NIO使用还有一个弊端,比如Buffer的API设计太过复杂,想要用好NIO就需要对于NIO的各个组件的所以API使用非常熟练才行,对于新手来说入门门槛比较高。
所以如果有一个框架能够将NIO的API进行封装,让开发者只需要关心具体的IO数据而不需要关心IO数据的发送和接收过程,那么就会大大降低开发NIO程序的入门门槛。而MINA框架就是基于Java NIO实现的一款基于异步事件的NIO框架。
一、MINA简介
MINA是基于Java NIO技术的网络应用程序框架,通过Java NIO通过TCP/UDP等网络协议提供了一套抽象的实践驱动的异步API。MINA提供了网络通信的服务端和客户端,无论是哪段在Min通信框架中的工作流程基本上一致,所以Mina的优点就是将网络通信的操作与业务系统隔离开,业务系统只需要关系数据的发送、接收和具体的业务处理即可。衡量一个开源框架是否成熟,框架对于业务系统代码的侵入性高低是很大一个衡量标准。
二、MINA的组件
NIO的三大核心组件为channel(通道)、Buffer(缓冲区)以及Selector(多路复用选择器),而MINA在NIO的三大核心基础之上又衍生出了很多的组件,分别如下:
2.1、IoBuffer
IO数据缓冲区,相当于NIO的Buffer,而MINA的IoBuffer实际上也就是NIO的Buffer的封装,因为NIO的Buffer只能对基本数据类型进行操作,而不可以对于字符串或自定义对象类型的操作,而IoBuffer可以对于任何对象类型进行操作。另外Buffer的使用是定长的,而IoBuffer的长度是可变的,使用起来伸缩性更好。
2.2、IoSession
客户端和服务端的IO会话,相当于NIO的channel,在MINA中整个IO操作流程中都是建立在IoSession之上,所以IoSession会贯穿整个IO操作流程。IO数据的读写也分别是调用IoSession的write和read方法来进行操作的
2.3、IoService
最底层的IO服务,负责具体的IO操作,分别包括服务端的IoSocketAcceptor和IoSocketChannel,类似于NIO中的Selecotr,分别用于IoSession的创建,以及IO数据具体的发送和接收工作。IOService最大的作用是将底层IO数据的操作进行封装,再通过异步的方式将IO事件上报给上层业务层处理,所以相当于将NIO的同步IO改成了异步IO操作
2.4、IoFilter
IO过滤器,IoService将IO数据封装成IoBuffer上报给上层,但是上层可需要对数据进行统一的编解码解析操作,而这些统一的编解码操作就可以交给IO过滤器来处理,过滤器将IoService提供的IO数据进行编解码、解析、日志打印等功能处理之后再交给业务层。
2.5、IoFilterChain
一组IoFilter链,将多个IoFilter组合成一个IO过滤器链,接收数据就从尾部依次过滤到尾部,发送数据从头部依次过滤到尾部
2.6、IoProcessor
为IO数据处理提供多线程环境,IoProcessor通常会有多个,IoService有IO数据交互时会交给IoProcessor,IoProcessor再来异步处理。每一个IoProcessor内部都有一个Selector,相当于将Java NIO的单个Selector改成了多个Selector
2.7、IoHandler
IO数据的业务处理器,通过IoFilter过滤之后,会将IO数据封装成业务层需要的个数,比如字符串格式,JSON格式,此时将业务层需要的格式的数据交给业务层处理器IoHandler再进行真正的业务逻辑处理,也就是IO数据流的终点
总结MINA的整体IO交互架构如下图示:
三、MINA的工作流程
服务端核心步骤如下:
1、创建IoService的实例IoAcceptor对象,负责监听客户端的连接请求
2、初始化IoAcceptor对象,主要是绑定IoFilter和IoHandler,依次添加多个IoFilter到IoAccetpor上,并且设置IoAcceptor的业务处理器IoHandler对象,最好绑定IoAcceptor的监听端口
3、IoAcceptor接收客户端的连接请求之后,将IO操作分配给IoProcessor,IoProcessor通常是多线程的,之后的IO处理都是由IoProcessor执行
4、IoProcessor依次调用IoFilterChain上的所有IO过滤器的过滤方法
5、IO数据经过所有的IO过滤器处理之后交给业务处理器IoHandler对象进行最终的业务处理
总结MINA的整体工作如下图示:
四、MINA的使用案例
服务端代码案例如下:
1、自定义IoFilter的实现类TimeFilter,由于IoFilter定义的方法较多,而不是每一个IoFilter实现类都需要实现所有的方法,比如消息发送和消息接收方法,正常情况下需要对于接收数据进行解码处理,而需要丢发送数据进行编码处理,所以可以定义不同的过滤器实现不同的IoFilter定义的方法,此时就可以不直接实现IoFilter,而是可以通过继承IoFilter的适配器IoFilterAdater,IoFilterAdater实现了所有的IoFilter接口定义的方法,并且都作了默认的处理,所以如果需要自定义IoFilter实现类,就可以直接继承之IoFilterAdapter,并且再重写需要实现的方法即可,如案例中的TimeFilter仅仅只重写了messageReceived和messageSent两个方法
1 /** 2 * @Auther: Lucky 3 * @Date: 2020/8/13 下午10:31 4 * @Desc: 自定义过滤器实现IoFilter接口 5 */ 6 public class TimeFilter extends IoFilterAdapter { 7 8 @Override 9 public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { 10 System.out.println("接收消息时过滤,打印当前时间:" + System.currentTimeMillis()); 11 } 12 13 @Override 14 public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { 15 System.out.println("发送消息时过滤,打印当前时间:" + System.currentTimeMillis()); 16 } 17 }
2、自定义业务处理器IoHandler实现类,同样的IoHandler接口也定义了很多的方法,但是并不是所有的业务处理器都需要实现的,同样有一个IoHandler的适配器IoHandlerAdapter类,自定义IoHandler实现类只需要继承之IoHandlerAdapter,重写感兴趣的方法即可
1 public class GameServerHandler extends IoHandlerAdapter { 2 3 public static GameServer gameServer = new GameServer(); 4 5 6 @Override 7 public void messageReceived(IoSession session, Object message) throws Exception { 8 System.out.println("服务器接收到消息:" + message.toString()); 9 GameEvent event = JSON.parseObject(message.toString(), GameEvent.class); 10 if(event.getCmd() == 0){//进入房间 11 gameServer.enter(event.getName(), session); 12 }else if(event.getCmd() == 1){//准备游戏 13 gameServer.ready(event.getName(), session); 14 } 15 } 16 }
3、创建服务器端并开启服务器
1 public class MinaServer { 2 3 public static void main(String[] args) throws IOException { 4 System.out.println("开始启动服务器..."); 5 startServer(8000); 6 } 7 8 public static void startServer(int port) throws IOException { 9 /** 1.创建IoService对象,服务端是IoAcceptor用于接收客户端连接 */ 10 IoAcceptor acceptor = new NioSocketAcceptor(); 11 12 /** 2.给IoService添加过滤器链 */ 13 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8")))); 14 acceptor.getFilterChain().addLast("log", new LoggingFilter()); 15 acceptor.getFilterChain().addLast("time", new TimeFilter()); 16 17 /** 3.添加业务处理器 */ 18 acceptor.setHandler(new GameServerHandler()); 19 20 /** 绑定监听端口号 */ 21 acceptor.bind(new InetSocketAddress(port)); 22 } 23 }
过程比较清晰,首先是创建IoService的实现类IoAcceptor对象,然后给IoAcceptor添加过滤器,如编解码过滤器,打印日志过滤器以及自定义的过滤器等,然后再给IoAccetor添加业务处理器,最后绑定需要监听的端口即可。
4、客户端代码如下:
1 public class PlayerClient { 2 3 private String name; 4 5 public void load(){ 6 /** 创建 IoService实例 */ 7 NioSocketConnector connector = new NioSocketConnector(); 8 9 /** 添加编解码过滤器*/ 10 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8")))); 11 12 /** 设置业务处理器*/ 13 connector.setHandler(new PlayerHandler(name)); 14 15 /** 连接服务器*/ 16 connector.connect(new InetSocketAddress("localhost", 8000)); 17 } 18 19 public PlayerClient(String name){ 20 this.name = name; 21 } 22 }
1 public class PlayerHandler extends IoHandlerAdapter { 2 private String name; 3 public PlayerHandler(String name){ 4 this.name = name; 5 } 6 7 @Override 8 public void sessionOpened(IoSession session) throws Exception { 9 /** 当连接创建成功之后向服务器发送消息*/ 10 GameEvent event = new GameEvent(); 11 event.setCmd(0); 12 event.setName(name); 13 session.write(JSON.toJSON(event).toString()); 14 } 15 16 @Override 17 public void messageReceived(IoSession session, Object message) throws Exception { 18 System.out.println("玩家:" + name + "接收到消息:" + message.toString()); 19 if(message.toString().equals("进入房间成功")){ 20 GameEvent event = new GameEvent(); 21 event.setName(name); 22 event.setCmd(1); 23 session.write(JSON.toJSON(event)); 24 } 25 } 26 }
自定义客户端的IoHandler,重写了sessionOpend方法,作用是和服务器连接成功之后执行该方法,messageReceived方法是接收到服务器发送过来的数据之后执行该方法
5、其他对象
1 public class GameEvent { 2 3 private String name; 4 5 private int cmd; 6 7 public String getName() { 8 return name; 9 } 10 11 public void setName(String name) { 12 this.name = name; 13 } 14 15 public int getCmd() { 16 return cmd; 17 } 18 19 public void setCmd(int cmd) { 20 this.cmd = cmd; 21 } 22 }
1 public class GameServer { 2 3 public static List<PlayerSession> playerList = new ArrayList<>(); 4 public static AtomicInteger readyCount = new AtomicInteger(0); 5 6 public void enter(String name, IoSession session){ 7 System.out.println("玩家:" + name + "进入房间"); 8 if(playerList.size() >=5){ 9 session.write("房间已满,您进入房间失败"); 10 }else { 11 System.out.println("玩家:" + name + "进入房间成功"); 12 PlayerSession player = new PlayerSession(name, session); 13 playerList.add(player); 14 session.write("进入房间成功"); 15 } 16 } 17 18 public void ready(String name, IoSession session){ 19 System.out.println("玩家:" + name + "已准备"); 20 if(readyCount.incrementAndGet() ==5){ 21 for (PlayerSession player: playerList) { 22 player.session.write("游戏开始,欢迎进入王者荣耀"); 23 } 24 } 25 } 26 27 static class PlayerSession{ 28 public PlayerSession(String name,IoSession session){ 29 this.name = name; 30 this.session = session; 31 } 32 private String name; 33 private IoSession session; 34 35 public String getName() { 36 return name; 37 } 38 39 public void setName(String name) { 40 this.name = name; 41 } 42 43 public IoSession getSession() { 44 return session; 45 } 46 47 public void setSession(IoSession session) { 48 this.session = session; 49 } 50 } 51 52 }
这里定义了一个PlayerSession对象,用于封装IoSession对象,给IoSession对象添加一个用户名称
6、模拟多个客户端连接服务器的测试代码如下:
1 public class MinaTest { 2 3 public static void main(String[] args) throws InterruptedException { 4 PlayerClient client1 = new PlayerClient("赵一"); 5 PlayerClient client2 = new PlayerClient("王二"); 6 PlayerClient client3 = new PlayerClient("张三"); 7 PlayerClient client4 = new PlayerClient("李四"); 8 PlayerClient client5 = new PlayerClient("孙五"); 9 PlayerClient client6 = new PlayerClient("小六"); 10 11 client1.load(); 12 Thread.sleep(3000L); 13 14 client2.load(); 15 Thread.sleep(3000L); 16 17 client3.load(); 18 Thread.sleep(3000L); 19 20 client4.load(); 21 Thread.sleep(3000L); 22 23 client5.load(); 24 Thread.sleep(3000L); 25 26 client6.load(); 27 } 28 }
执行服务器和客户端main方法,执行结果如下:
服务端打印结果:
1 开始启动服务器... 2 17:16:39.263 [NioProcessor-2] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1 3 服务器接收到消息:{"name":"赵一","cmd":0} 4 玩家:赵一进入房间 5 玩家:赵一进入房间成功 6 17:16:39.362 [NioProcessor-2] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1 7 服务器接收到消息:{"name":"赵一","cmd":1} 8 玩家:赵一已准备 9 17:16:42.064 [NioProcessor-3] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 2 10 服务器接收到消息:{"name":"王二","cmd":0} 11 玩家:王二进入房间 12 玩家:王二进入房间成功 13 17:16:42.065 [NioProcessor-3] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 2 14 服务器接收到消息:{"name":"王二","cmd":1} 15 玩家:王二已准备 16 17:16:45.066 [NioProcessor-4] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 3 17 服务器接收到消息:{"name":"张三","cmd":0} 18 玩家:张三进入房间 19 玩家:张三进入房间成功 20 17:16:45.067 [NioProcessor-4] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 3 21 服务器接收到消息:{"name":"张三","cmd":1} 22 玩家:张三已准备 23 17:16:48.071 [NioProcessor-5] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 4 24 服务器接收到消息:{"name":"李四","cmd":0} 25 玩家:李四进入房间 26 玩家:李四进入房间成功 27 17:16:48.073 [NioProcessor-5] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 4 28 服务器接收到消息:{"name":"李四","cmd":1} 29 玩家:李四已准备 30 17:16:51.074 [NioProcessor-6] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 5 31 服务器接收到消息:{"name":"孙五","cmd":0} 32 玩家:孙五进入房间 33 玩家:孙五进入房间成功 34 17:16:51.076 [NioProcessor-6] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 5 35 服务器接收到消息:{"name":"孙五","cmd":1} 36 玩家:孙五已准备 37 17:16:54.078 [NioProcessor-7] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 6 38 服务器接收到消息:{"name":"小六","cmd":0} 39 玩家:小六进入房间
客户端打印结果:
1 17:16:39.357 [NioProcessor-2] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1 2 玩家:赵一接收到消息:进入房间成功 3 17:16:42.064 [NioProcessor-16] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 2 4 玩家:王二接收到消息:进入房间成功 5 17:16:45.067 [NioProcessor-30] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 3 6 玩家:张三接收到消息:进入房间成功 7 17:16:48.072 [NioProcessor-44] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 4 8 玩家:李四接收到消息:进入房间成功 9 17:16:51.075 [NioProcessor-58] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 5 10 玩家:孙五接收到消息:进入房间成功 11 17:16:51.076 [NioProcessor-2] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 1 12 17:16:51.076 [NioProcessor-16] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 2 13 玩家:赵一接收到消息:游戏开始,欢迎进入王者荣耀 14 17:16:51.077 [NioProcessor-30] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 3 15 玩家:王二接收到消息:游戏开始,欢迎进入王者荣耀 16 17:16:51.077 [NioProcessor-44] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 4 17 玩家:张三接收到消息:游戏开始,欢迎进入王者荣耀 18 17:16:51.077 [NioProcessor-58] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 5 19 玩家:李四接收到消息:游戏开始,欢迎进入王者荣耀 20 玩家:孙五接收到消息:游戏开始,欢迎进入王者荣耀 21 17:16:54.079 [NioProcessor-72] DEBUG org.apache.mina.filter.codec.ProtocolCodecFilter - Processing a MESSAGE_RECEIVED for session 6 22 玩家:小六接收到消息:房间已满,您进入房间失败
本案例主要模拟了王者荣耀开始游戏的场景,六个客户端分别和游戏服务器进行连接,连接成功就表示进入房间成功,连接成功之后服务器回复客户端进入房间成功,然后客户端再开始准备,准备完成之后服务器就回复正式进入游戏,而本案例模拟了6个客户端,当5个客户端准备了之后就进入了游戏,第六个房间提升进入房间失败。
五、MINA的线程模型
Java NIO的服务端一般线程模型为开启一个线程通过Selecotor监听客户端的连接请求和IO操作,只有有事件发生(连接请求和IO操作)就会立即去处理,这样的线程模型有一个弊端Selector对于客户端连接请求的channel和客户端IO操作的channel是用同一个Selector来监控的,如下图示
这样就会有一个问题,但IO操作比较耗时时就会导致连接请求的channel也会被阻塞,因为Selector是单线程同步处理所有IO事件的,如上图假设客户端A连接成功之后发送IO数据,Selector接收到IO数据之后开始处理,假设处理时间需要10秒,而在10秒之内客户端B通过ServerSocketChannel请求连接服务器,此时Selector还是处于处理IO事件的状态,只要客户端A的IO事件处理完成,才会继续执行select方法处理下一轮的IO事件。整个流程就好比Selector处理IO事件是一批一批的处理,处理完了第一批之后才会处理第二批的IO事件,如果第一批处理比较慢,就会影响到第二批,而第二批中很可能都是耗时比较短的OP_ACCEPT事件。所以为了让服务器在处理OP_READ事件时不会影响OP_ACCEPT事件,MINA在NIO的线程模型之上进行了优化,单独开启一个线程用于处理客户端的连接请求,而对于IO事件再由于IoProcessor线程去处理,而IoProcessor又可以创建多个,这就形成了一个Selector用于负责监听客户的连接请求,然后有多个Selector分别监听多个Channel的IO事件的线程模型,如下图示:
总结:
1、服务端创建IoAcceptor,开启单线程通过Selector用来监听所有客户端请求连接的OP_ACCEPT事件,连接成功之后将客户端封装成IoSession,将IoSession交给IoProcessor线程组
2、IoSession通过取模算法被分配给一个IoProcessor,IoProcessor通过Selector监听注册到此Selecotr上的所有IoSession的IO状态,当有IO事件发生时,进行IO数据的读写并将IO数据先通过IoFilterChain进行过滤,最后交给业务层IoHandler进行业务处理
后结:本文主要介绍来MINA的基本理论和实验案例,下一文将分析MINA的源码实现:NIO之路--MINA框架源码解析