zoukankan      html  css  js  c++  java
  • JAVA NIO异步通信框架MINA选型和使用的几个细节(概述入门,UDP, 心跳)

        Apache MINA 2 是一个开发高性能和高可伸缩性网络应用程序的网络应用框架。它提供了一个抽象的事件驱动的异步 API,可以使用 TCP/IP、UDP/IP、串口和虚拟机内部的管道等传输方式。Apache MINA 2 可以作为开发网络应用程序的一个良好基础。

        Apache MINA是非常著名的基于java nio的通信框架,以前都是自己直接使用udp编程,新项目选型中考虑到网络通信可能会用到多种通信方式,因此使用了MINA。

         本文结构:

         (1)客户端和服务器代码 ;虽然是udp的,但是mina的优美的设计使得所有的通信方式能够以统一的形式使用,perfect。当然注意的是,不同的通信方式,背后的机理和有效的变量、状态是有区别的,所以要精通,那还是需要经验积累和学习的。

         (2)超时 和Session的几个实际问题

         (3)心跳 ,纠正几个错误

         既然是使用,废话少说,直接整个可用的例子。当然了,这些代码也不是直接可用的,我们应用的逻辑有点复杂,不会这么简单使用的。

    请参考mina的example包和文档http://mina.apache.org/udp-tutorial.html 。

    版本2.0 RC1

    1.1 服务器端

    1.                 NioDatagramAcceptor acceptor =  new  NioDatagramAcceptor();  
    2.                 acceptor.setHandler(new  MyIoHandlerAdapter()); //你的业务处理,最简单的,可以extends IoHandlerAdapter   
    3.   
    4. DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();  
    5. chain.addLast("keep-alive" ,  new  HachiKeepAliveFilterInMina());  //心跳   
    6. chain.addLast("toMessageTyep" ,  new  MyMessageEn_Decoder());   
    7.               //将传输的数据转换成你的业务数据格式。比如下面的是将数据转换成一行行的文本   
    8.                 //acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));    
    9.   
    10. chain.addLast("logger" ,  new  LoggingFilter());  
    11. DatagramSessionConfig dcfg = acceptor.getSessionConfig();  
    12. dcfg.setReuseAddress(true );  
    13. acceptor.bind(new  InetSocketAddress(ClusterContext.getHeartBeatPort()));  

    1.2 客户端

    1.               NioDatagramConnector connector =  new  NioDatagramConnector();  
    2. connector.setConnectTimeoutMillis(60000L);  
    3. connector.setConnectTimeoutCheckInterval(10000 );  
    4. connector.setHandler(handler);  
    5.   
    6. DefaultIoFilterChainBuilder chain = connector.getFilterChain();  
    7. chain.addLast("keep-alive" ,  new  HachiKeepAliveFilterInMina()); //心跳   
    8. chain.addLast("toMessageTyep" ,  new  MyMessageEn_Decoder());  
    9. chain.addLast("logger" ,  new  LoggingFilter());  
    10. ConnectFuture connFuture = connector.connect(new  InetSocketAddress( "10.1.1.1" , 8001 ));  
    11. connFuture.awaitUninterruptibly();  
    12. IoSession session = connFuture.getSession();  
    13.                 //发送消息长整型 1000   
    14.               IoBuffer buffer = IoBuffer.allocate(8 );  
    15.               buffer.putLong(1000 );  
    16.               buffer.flip();  
    17.               session.write(buffer);  
    18.                  //关闭连接   
    19.                  session.getCloseFuture().awaitUninterruptibly();  
    20.  connector.dispose();  

    2. 超时的几个经验总结:

        udp session默认是60秒钟超时,此时状态为closing,数据就发不出去了。

    Session的接口是IoSession,udp的最终实现是NioSession。如果交互在60秒内不能处理完成,就需要使用Keep-alive机制,即心跳机制。

    3. 心跳 机制

        在代码中已经使用了心跳机制,是通过mina的filter实现的,mina自身带的心跳机制好处在于,它附加了处理,让心跳消息不会传到业务层,在底层就完成了。

        在上面代码实现中的HachiKeepAliveFilterInMina如下:

    1. public   class  HachiKeepAliveFilterInMina  extends  KeepAliveFilter {  
    2.     private   static   final   int  INTERVAL =  30 ; //in seconds   
    3.     private   static   final   int  TIMEOUT =  10 ;  //in seconds   
    4.       
    5.     public  HachiKeepAliveFilterInMina(KeepAliveMessageFactory messageFactory) {  
    6.         super (messageFactory, IdleStatus.BOTH_IDLE,  new  ExceptionHandler(), INTERVAL, TIMEOUT);  
    7.     }  
    8.       
    9.     public  HachiKeepAliveFilterInMina() {  
    10.         super ( new  KeepAliveMessageFactoryImpl(), IdleStatus.BOTH_IDLE,  new  ExceptionHandler(), INTERVAL, TIMEOUT);  
    11.         this .setForwardEvent( false );  //此消息不会继续传递,不会被业务层看见   
    12.     }  
    13. }  
    14.   
    15. class  ExceptionHandler  implements  KeepAliveRequestTimeoutHandler {     
    16.     public   void  keepAliveRequestTimedOut(KeepAliveFilter filter, IoSession session)  throws  Exception {     
    17.         System.out.println("Connection lost, session will be closed" );     
    18.         session.close(true );   
    19.     }     
    20. }  
    21.   
    22. /**  
    23.  * 继承于KeepAliveMessageFactory,当心跳机制启动的时候,需要该工厂类来判断和定制心跳消息  
    24.  * @author Liu Liu  
    25.  *  
    26.  */   
    27. class  KeepAliveMessageFactoryImpl  implements  KeepAliveMessageFactory {  
    28.     private   static   final   byte  int_req = - 1 ;  
    29.     private   static   final   byte  int_rep = - 2 ;   
    30.     private   static   final  IoBuffer KAMSG_REQ = IoBuffer.wrap( new   byte []{int_req});     
    31.     private   static   final  IoBuffer KAMSG_REP = IoBuffer.wrap( new   byte []{int_rep});    
    32.          
    33.     public  Object getRequest(IoSession session) {     
    34.         return  KAMSG_REQ.duplicate();     
    35.     }     
    36.   
    37.     public  Object getResponse(IoSession session, Object request) {     
    38.         return  KAMSG_REP.duplicate();     
    39.     }     
    40.   
    41.     public   boolean  isRequest(IoSession session, Object message) {    
    42.         if (!(message  instanceof  IoBuffer))  
    43.             return   false ;  
    44.         IoBuffer realMessage = (IoBuffer)message;  
    45.         if (realMessage.limit() !=  1 )  
    46.             return   false ;  
    47.           
    48.         boolean  result = (realMessage.get() == int_req);  
    49.         realMessage.rewind();  
    50.         return  result;  
    51.     }     
    52.   
    53.     public   boolean  isResponse(IoSession session, Object message) {      
    54.         if (!(message  instanceof  IoBuffer))  
    55.             return   false ;  
    56.         IoBuffer realMessage = (IoBuffer)message;  
    57.         if (realMessage.limit() !=  1 )  
    58.             return   false ;  
    59.           
    60.         boolean  result = (realMessage.get() == int_rep);     
    61.         realMessage.rewind();  
    62.         return  result;  
    63.     }     
    64. }  

      有人说:心跳机制的filter只需要服务器端具有即可——这是错误 的,拍着脑袋想一想,看看factory,你就知道了。心跳需要通信两端的实现 。

      另外,版本2.0 RC1中,经过测试,当心跳的时间间隔INTERVAL设置为60s(Session的存活时间)的时候心跳会失效,所以最好需要小于60s的间隔。

  • 相关阅读:
    MySQL中去重字段完全相同的数据
    SVN自动更新-win平台
    EF出错:Unable to convert MySQL date/time value to System.DateTime
    微信不支持Object.assign
    记录一下dotnetcore.1.0.0-VS2015Tools.preview2安装不上的问题
    Ajax表单异步上传(包括文件域)
    .NET web开发之WebApi初试水
    遍历数组一次求得数组的平均数、标准差、方差
    记STM32F030多通道ADC DMA读取乱序问题
    RT-Thread入门和模拟器的配置生成
  • 原文地址:https://www.cnblogs.com/hyl8218/p/4654653.html
Copyright © 2011-2022 走看看