zoukankan      html  css  js  c++  java
  • JAVA NIO异步通信框架MINA选型和使用的几个细节(概述入门,UDP, 心跳)

        Apache MINA 2 是一个开发高性能和高可伸缩性网络应用程序的网络应用框架。它提供了一个抽象的事件驱动的异步 API,可以使用 TCP/IP、UDP/IP、串口和虚拟机内部的管道等传输方式。Apache MINA 2 可以作为开发网络应用程序的一个良好基础。

        Apache MINA是非常著名的基于java nio的通信框架,以前都是自己直接使用udp编程,新项目选型中考虑到网络通信可能会用到多种通信方式,因此使用了MINA。

         本文结构:

         (1)客户端和服务器代码 ;虽然是udp的,但是mina的优美的设计使得所有的通信方式能够以统一的形式使用,perfect。当然注意的是,不同的通信方式,背后的机理和有效的变量、状态是有区别的,所以要精通,那还是需要经验积累和学习的。

         (2)超时 和Session的几个实际问题

         (3)心跳 ,纠正几个错误

         既然是使用,废话少说,直接整个可用的例子。当然了,这些代码也不是直接可用的,我们应用的逻辑有点复杂,不会这么简单使用的。

    请参考mina的example包和文档http://mina.apache.org/udp-tutorial.html 。

    版本2.0 RC1

    1.1 服务器端

    1.                 NioDatagramAcceptor acceptor =  new  NioDatagramAcceptor();  
    2.                 acceptor.setHandler(new  MyIoHandlerAdapter()); //你的业务处理,最简单的,可以extends IoHandlerAdapter   
    3.   
    4. DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();  
    5. chain.addLast("keep-alive" ,  new  HachiKeepAliveFilterInMina());  //心跳   
    6. chain.addLast("toMessageTyep" ,  new  MyMessageEn_Decoder());   
    7.               //将传输的数据转换成你的业务数据格式。比如下面的是将数据转换成一行行的文本   
    8.                 //acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));    
    9.   
    10. chain.addLast("logger" ,  new  LoggingFilter());  
    11. DatagramSessionConfig dcfg = acceptor.getSessionConfig();  
    12. dcfg.setReuseAddress(true );  
    13. acceptor.bind(new  InetSocketAddress(ClusterContext.getHeartBeatPort()));  

    1.2 客户端

    1.               NioDatagramConnector connector =  new  NioDatagramConnector();  
    2. connector.setConnectTimeoutMillis(60000L);  
    3. connector.setConnectTimeoutCheckInterval(10000 );  
    4. connector.setHandler(handler);  
    5.   
    6. DefaultIoFilterChainBuilder chain = connector.getFilterChain();  
    7. chain.addLast("keep-alive" ,  new  HachiKeepAliveFilterInMina()); //心跳   
    8. chain.addLast("toMessageTyep" ,  new  MyMessageEn_Decoder());  
    9. chain.addLast("logger" ,  new  LoggingFilter());  
    10. ConnectFuture connFuture = connector.connect(new  InetSocketAddress( "10.1.1.1" , 8001 ));  
    11. connFuture.awaitUninterruptibly();  
    12. IoSession session = connFuture.getSession();  
    13.                 //发送消息长整型 1000   
    14.               IoBuffer buffer = IoBuffer.allocate(8 );  
    15.               buffer.putLong(1000 );  
    16.               buffer.flip();  
    17.               session.write(buffer);  
    18.                  //关闭连接   
    19.                  session.getCloseFuture().awaitUninterruptibly();  
    20.  connector.dispose();  

    2. 超时的几个经验总结:

        udp session默认是60秒钟超时,此时状态为closing,数据就发不出去了。

    Session的接口是IoSession,udp的最终实现是NioSession。如果交互在60秒内不能处理完成,就需要使用Keep-alive机制,即心跳机制。

    3. 心跳 机制

        在代码中已经使用了心跳机制,是通过mina的filter实现的,mina自身带的心跳机制好处在于,它附加了处理,让心跳消息不会传到业务层,在底层就完成了。

        在上面代码实现中的HachiKeepAliveFilterInMina如下:

    1. public   class  HachiKeepAliveFilterInMina  extends  KeepAliveFilter {  
    2.     private   static   final   int  INTERVAL =  30 ; //in seconds   
    3.     private   static   final   int  TIMEOUT =  10 ;  //in seconds   
    4.       
    5.     public  HachiKeepAliveFilterInMina(KeepAliveMessageFactory messageFactory) {  
    6.         super (messageFactory, IdleStatus.BOTH_IDLE,  new  ExceptionHandler(), INTERVAL, TIMEOUT);  
    7.     }  
    8.       
    9.     public  HachiKeepAliveFilterInMina() {  
    10.         super ( new  KeepAliveMessageFactoryImpl(), IdleStatus.BOTH_IDLE,  new  ExceptionHandler(), INTERVAL, TIMEOUT);  
    11.         this .setForwardEvent( false );  //此消息不会继续传递,不会被业务层看见   
    12.     }  
    13. }  
    14.   
    15. class  ExceptionHandler  implements  KeepAliveRequestTimeoutHandler {     
    16.     public   void  keepAliveRequestTimedOut(KeepAliveFilter filter, IoSession session)  throws  Exception {     
    17.         System.out.println("Connection lost, session will be closed" );     
    18.         session.close(true );   
    19.     }     
    20. }  
    21.   
    22. /**  
    23.  * 继承于KeepAliveMessageFactory,当心跳机制启动的时候,需要该工厂类来判断和定制心跳消息  
    24.  * @author Liu Liu  
    25.  *  
    26.  */   
    27. class  KeepAliveMessageFactoryImpl  implements  KeepAliveMessageFactory {  
    28.     private   static   final   byte  int_req = - 1 ;  
    29.     private   static   final   byte  int_rep = - 2 ;   
    30.     private   static   final  IoBuffer KAMSG_REQ = IoBuffer.wrap( new   byte []{int_req});     
    31.     private   static   final  IoBuffer KAMSG_REP = IoBuffer.wrap( new   byte []{int_rep});    
    32.          
    33.     public  Object getRequest(IoSession session) {     
    34.         return  KAMSG_REQ.duplicate();     
    35.     }     
    36.   
    37.     public  Object getResponse(IoSession session, Object request) {     
    38.         return  KAMSG_REP.duplicate();     
    39.     }     
    40.   
    41.     public   boolean  isRequest(IoSession session, Object message) {    
    42.         if (!(message  instanceof  IoBuffer))  
    43.             return   false ;  
    44.         IoBuffer realMessage = (IoBuffer)message;  
    45.         if (realMessage.limit() !=  1 )  
    46.             return   false ;  
    47.           
    48.         boolean  result = (realMessage.get() == int_req);  
    49.         realMessage.rewind();  
    50.         return  result;  
    51.     }     
    52.   
    53.     public   boolean  isResponse(IoSession session, Object message) {      
    54.         if (!(message  instanceof  IoBuffer))  
    55.             return   false ;  
    56.         IoBuffer realMessage = (IoBuffer)message;  
    57.         if (realMessage.limit() !=  1 )  
    58.             return   false ;  
    59.           
    60.         boolean  result = (realMessage.get() == int_rep);     
    61.         realMessage.rewind();  
    62.         return  result;  
    63.     }     
    64. }  

      有人说:心跳机制的filter只需要服务器端具有即可——这是错误 的,拍着脑袋想一想,看看factory,你就知道了。心跳需要通信两端的实现 。

      另外,版本2.0 RC1中,经过测试,当心跳的时间间隔INTERVAL设置为60s(Session的存活时间)的时候心跳会失效,所以最好需要小于60s的间隔。

  • 相关阅读:
    DGA域名可以是色情网站域名
    使用cloudflare加速你的网站隐藏你的网站IP
    167. Two Sum II
    leetcode 563. Binary Tree Tilt
    python 多线程
    leetcode 404. Sum of Left Leaves
    leetcode 100. Same Tree
    leetcode 383. Ransom Note
    leetcode 122. Best Time to Buy and Sell Stock II
    天津Uber优步司机奖励政策(12月28日到12月29日)
  • 原文地址:https://www.cnblogs.com/hyl8218/p/4654653.html
Copyright © 2011-2022 走看看