zoukankan      html  css  js  c++  java
  • Mina

    1.MINA框架简介 
    MINA(Multipurpose(万能的,多种用途的) Infrastructure for Network Applications)是用于开发高性能和高可用性的网络应用程序的基础框架。通过使用MINA框架可以可以省下处理底层I/O和线程并发等复杂工作,开发人员能够把更多的精力投入到业务设计和开发当中。MINA框架的应用比较广泛,应用的开源项目有Apache Directory、AsyncWeb、Apache Qpid、QuickFIX/J、Openfire、SubEthaSTMP、red5等。MINA框架当前稳定版本是1.1.6,最新的2.0版本目前已经发布了M1版本。 
    MINA框架的特点有:基于java NIO类库开发;采用非阻塞方式异步传输;事件驱动;支持批量数据传输;支持TCP、UDP协议;控制反转的设计模式(支持Spring);采用优雅的松耦合架构;可灵活的加载过滤器机制;单元测试更容易实现;可自定义线程的数量,以提高运行于多处理器上的性能;采用回调的方式完成调用,线程的使用更容易。 

    2.MINA框架的常用类 
    类NioSocketAcceptor用于创建服务端监听; 
    类NioSocketConnector用于创建客户端连接; 
    类IoSession用来保存会话属性和发送消息; 
    类IoHandlerAdapter用于定义业务逻辑,常用的方法有: 
    方法 定义 
    sessionCreated() 当会话创建时被触发 
    sessionOpened() 当会话开始时被触发 
    sessionClosed() 当会话关闭时被触发 
    sessionIdle() 当会话空闲时被触发 
    exceptionCaught() 当接口中其他方法抛出异常未被捕获时触发此方法 
    messageRecieved() 当接收到消息后被触发 
    messageSent() 当发送消息后被触发 


    3.服务端应用开发示例 
    下面将以MINA2.0M1版本为基础,通过一个范例来演示一下如何使用MINA框架进行开发。开发环境为jdk6.0,开发工具NetBeans6.0,所需jar包slf4j-api.jar、slf4j-jdk14.jar、MINA-core-2.0.0-M1.jar。 
    首先定义一个业务逻辑处理器TimeServerHandler,继承自IoHandlerAdapter,实现的功能有:当客户端创建会话时会显示客户端设备的IP和端口;当客户端输入quit时结束会话;客户端输入其它内容时则向客户端发送当前时间。代码如下: 

     1 public class TimeServerHandler extends IoHandlerAdapter 
     2 { 
     3 @Override 
     4 public void sessionCreated(IoSession session) { 
     5 //显示客户端的ip和端口 
     6 System.out.println(session.getRemoteAddress().toString()); 
     7 } 
     8 @Override 
     9 public void messageReceived( IoSession session, Object message ) throws Exception 
    10 { 
    11 String str = message.toString(); 
    12 if( str.trim().equalsIgnoreCase("quit") ) { 
    13 session.close();//结束会话 
    14 return; 
    15 } 
    16 Date date = new Date(); 
    17 session.write( date.toString() );//返回当前时间的字符串 
    18 System.out.println("Message written..."); 
    19 } 
    20 } 
    21 再定义一个类MinaTimeServer用来启动服务端: 
    22 public class MinaTimeServer 
    23 { 
    24 private static final int PORT = 9123;//定义监听端口 
    25 public static void main( String[] args ) throws IOException 
    26 { 
    27 IoAcceptor acceptor = new NioSocketAcceptor(); 
    28 acceptor.getFilterChain().addLast( "logger", new LoggingFilter() ); 
    29 acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));//指定编码过滤器 
    30 acceptor.setHandler( new TimeServerHandler() );//指定业务逻辑处理器 
    31 acceptor.setDefaultLocalAddress( new InetSocketAddress(PORT) );//设置端口号 
    32 acceptor.bind();//启动监听 
    33 } 
    34 } 

    4.测试 
    首先运行MinaTimeServer,启动服务端,接着在命令行运行“telnet 127.0.0.1 9123”,来登录,这时会看到服务端输出如下: 
    2008-2-21 16:15:29 org.apache.MINA.filter.logging.LogLevel$4 log 
    /10.64.2.137:4140 IP和端口号 
    信息: CREATED 
    2008-2-21 16:15:29 org.apache.MINA.filter.logging.LogLevel$4 log 
    信息: OPENED 在客户端输入回车,在客户端可以看到服务端返回当前的时间: 
    Thu Feb 21 16:20:14 CST 2008 
    服务端输出: 
    2008-2-21 16:20:14 org.apache.MINA.filter.logging.LogLevel$4 log 
    信息: RECEIVED: HeapBuffer[pos=0 lim=2 cap=2048: 0D 0A] 接收收到回车符 
    Message written... 
    2008-2-21 16:20:14 org.apache.MINA.filter.logging.LogLevel$4 log 
    信息: SENT: HeapBuffer[pos=0 lim=29 cap=30: 54 68 75 20 46 65 62 20 32 31 20 31 36 3A 32 30...] 
    2008-2-21 16:20:14 org.apache.MINA.filter.logging.LogLevel$4 log 
    信息: SENT: HeapBuffer[pos=0 lim=0 cap=0: empty] 5.客户端开发示例 


    首先定义类TimeClientHandler来处理消息接收事件: 

     1 class TimeClientHandler extends IoHandlerAdapter{ 
     2 public TimeClientHandler() { 
     3 } 
     4 @Override 
     5 public void messageReceived(IoSession session, Object message) throws Exception { 
     6 System.out.println(message);//显示接收到的消息 
     7 } 
     8 } 
     9 接着定义MinaTimeClient类用于连接服务端,并向服务端发送消息: 
    10 public class MinaTimeClient { 
    11 public static void main(String[] args) { 
    12 // 创建客户端连接器. 
    13 NioSocketConnector connector = new NioSocketConnector(); 
    14 connector.getFilterChain().addLast( "logger", new LoggingFilter() ); 
    15 connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" )))); //设置编码过滤器 
    16 connector.setConnectTimeout(30); 
    17 connector.setHandler(new TimeClientHandler());//设置事件处理器 
    18 ConnectFuture cf = connector.connect( 
    19 new InetSocketAddress("127.0.0.1", 9123));//建立连接 
    20 cf.awaitUninterruptibly();//等待连接创建完成 
    21 cf.getSession().write("hello");//发送消息 
    22 cf.getSession().write("quit");//发送消息 
    23 cf.getSession().getCloseFuture().awaitUninterruptibly();//等待连接断开 
    24 connector.dispose(); 
    25 } 
    26 } 

    6.总结 
    通过上述示例可以了解到:使用MINA框架来开发的网络应用程序代码结构更清晰;MINA框架完成了底层的线程管理;MINA内置的编码器可以满足大多数用户的需求,省去了开发人员消息编码解码的工作。具称使用MINA开发服务器程序的性能已经逼近使用 C/C++ 语言开发的网络服务。因此,建议在网络应用程序开发过程中尝试使用MINA框架来提高我们的开发效率和应用程序的执行效率。

    二。

    客户端通信过程 
    1.通过SocketConnector同服务器端建立连接 
    2.链接建立之后I/O的读写交给了I/O Processor线程,I/O Processor是多线程的 
    3.通过I/O Processor读取的数据经过IoFilterChain里所有配置的IoFilter,IoFilter进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议 
    4.最后IoFilter将数据交给Handler进行业务处理,完成了整个读取的过程 
    5.写入过程也是类似,只是刚好倒过来,通过IoSession.write写出数据,然后Handler进行写入的业务处理,处理完成后交给IoFilterChain,进行消息过滤和协议的转换,最后通过I/O Processor将数据写出到socket通道 
    IoFilterChain作为消息过滤链 
    1.读取的时候是从低级协议到高级协议的过程,一般来说从byte字节逐渐转换成业务对象的过程 
    2.写入的时候一般是从业务对象到字节byte的过程 
    IoSession贯穿整个通信过程的始终 

    整个过程可以用一个图来表现 
     
    消息箭头都是有NioProcessor-N线程发起调用,默认情况下也在NioProcessor-N线程中执行 

    类图 
    http://mina.apache.org/class-diagrams.html#ClassDiagrams-ProtocolDecoderclassdiagram 

    Connector 
    作为连接客户端,SocketConector用来和服务器端建立连接,连接成功,创建IoProcessor Thread(不能超过指定的processorCount),Thread由指定的线程池进行管理,IoProcessor 利用NIO框架对IO进行处理,同时创建IoSession。连接的建立是通过Nio的SocketChannel进行。 

    NioSocketConnector connector = new NioSocketConnector(processorCount); 
    ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));建立一个I/O通道 

    Acceptor 
    作为服务器端的连接接受者,SocketAcceptor用来监听端口,同客户端建立连接,连接建立之后的I/O操作全部交给IoProcessor进行处理 
    IoAcceptor acceptor = new NioSocketAcceptor(); 
    acceptor.bind( new InetSocketAddress(PORT) ); 
    Protocol 
    利用IoFilter,对消息进行解码和编码,如以下代码通过 MyProtocolEncoder 将java对象转成byte串,通过MyProtocalDecoder 将byte串恢复成java对象 

     1 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalFactory()));
     2 ......
     3 public class MyProtocalFactory implements ProtocolCodecFactory {
     4  ProtocolEncoderAdapter encoder = new MyProtocolEncoder();
     5  ProtocolDecoder decoder = new MyProtocalDecoder() ;
     6  public ProtocolDecoder getDecoder(IoSession session) throws Exception {
     7   return decoder;
     8  }
     9  public ProtocolEncoder getEncoder(IoSession session) throws Exception {
    10   return encoder;
    11  }
    12 }
    13 ......
    14 public class MyProtocalDecoder extends ProtocolDecoderAdapter  {
    15 
    16  public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
    17    throws Exception {
    18   
    19   int  id  = in.getInt();
    20   int  len = in.getInt();
    21   byte[]  dst = new byte[len];
    22   
    23   in.get(dst);
    24   
    25   String name = new String(dst,"GBK");
    26   
    27   Item item = new Item();
    28   item.setId(id);
    29   item.setName(name);
    30   out.write(item);
    31  }
    32 }
    33 ......
    34 public class MyProtocolEncoder extends ProtocolEncoderAdapter {
    35 
    36  public void encode(IoSession session, Object message,
    37    ProtocolEncoderOutput out) throws Exception {
    38   Item item = (Item)message;
    39   int byteLen = 8 + item.getName().getBytes("GBK").length ;
    40   IoBuffer buf = IoBuffer.allocate(byteLen);
    41   buf.putInt(item.getId());
    42   buf.putInt(item.getName().getBytes("GBK").length);
    43   buf.put(item.getName().getBytes("GBK"));
    44   buf.flip();
    45   out.write(buf);
    46   
    47  }
    48 }

    handler 
    具体处理事件,事件包括:sessionCreated、sessionOpened、sessionClosed、sessionIdle、exceptionCaught、messageReceived、messageSent。 
    connector.setHandler(new MyHandler());MyHandler继承IoHandlerAdapter类或者实现IoHandler接口.事件最终由IoProcessor线程发动调用。 
    Processor 
    I/O处理器、允许多线程读写,开发过程中只需要指定线程数量,Processor通过Nio框架进行I/O的续写操作,Processor包含了Nio的Selector的引用。这点也正是mina的优势,如果直接用Nio编写,则需要自己编写代码来实现类似Processor的功能。正因为I/O Processor是异步处理读写的,所以我们有时候需要识别同一个任务的消息,比如一个任务包括发送消息,接收消息,反馈消息,那么我们需要在制定消息格式的时候,消息头里能包含一个能识别是同一个任务的id。 
    I/O Porcessor线程数的设置 :如果是SocketConnector,则可以在构造方法中指定,如:new SocketConnector(processorCount, Executors.newCachedThreadPool());如果是SocketAcceptor,也是一样的:SocketAcceptor acceptor = new SocketAcceptor(ProcessorCount, Executors.newCachedThreadPool()); 
    processorCount为最大Porcessor线程数,这个值可以通过性能测试进行调优,默认值是cpu核数量+1(Runtime.getRuntime().availableProcessors() + 1)。 
    比较奇怪的是,每个IoProcessor在创建的时候会本地自己和自己建立一个连接? 

    IoSession 
    IoSession是用来保持IoService的上下文,一个IoService在建立Connect之后建立一个IoSession(一个连接一个session),IoSession的生命周期从Connection建立到断开为止 
    IoSession做两件事情: 
    1.通过IoSession可以获取IoService的所有相关配置对象(持有对IoService,Processor池,SocketChannel,SessionConfig和IoService.IoHandler的引用) 
    2.通过IoSession.write 是数据写出的入口 

    关于线程 
    http://mina.apache.org/configuring-thread-model.html 
    ThreadModel 1.x版本的mina还有线程模式选项在2.x之后就没有了 
    1.x版本指定线程模式 
    SocketConnectorConfig cfg = new SocketConnectorConfig(); 
    cfg.setThreadModel(ThreadModel.MANUAL); 

    MINA有3种worker线程 
    Acceptor、Connector、I/O processor 线程 
    Acceptor Thread 
    一般作为服务器端链接的接收线程,实现了接口IoService,线程的数量就是创建SocketAcceptor 的数量 
    Connector Thread 
    一般作为客户端的请求建立链接线程,实现了接口IoService,维持了一个和服务器端Acceptor的一个链接,线程数量就是创建SocketConnector 的数量 

    Mina的SocketAcceptor和SocketConnector均是继承了BaseIoService,是对IoService的两种不同的实现 
    I/O processor Thread 
    作为I/O真正处理的线程,存在于服务器端和客户端,用来处理I/O的读写操作,线程的数量是可以配置的,默认最大数量是CPU个数+1 
    服务器端:在创建SocketAcceptor的时候指定ProcessorCount 
    SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); 
    客户端:在创建SocketConnector 的时候指定ProcessorCount 
    SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); 
    I/O Processor Thread,是依附于IoService,类似上面的例子SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());是指SocketConnector这个线程允许CPU+1个I/O Processor Thread 
    NioProcessor虽然是多线程,但是对与一个连接的时候业务处理只会使用一个线程进行处理(Processor线程对于一个客户端连接只使用一个线程NioProcessor-n)如果handler的业务比较耗时,会导致NioProcessor线程堵塞 ,在2个客户端同时连接上来的时候会创建第2个(前提是第1个NioProcessor正在忙),创建的最大数量由Acceptor构造方法的时候指定。如果:一个客户端连接同服务器端有很多通信,并且I/O的开销不大,但是Handler处理的业务时间比较长,那么需要采用独立的线程模式,在FilterChain的最后增加一个ExecutorFitler: 
    acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); 
    这样可以保证processor和handler的线程是分开的,否则:客户端发送3个消息,而服务器对于每个消息要处理10s左右,那么这3个消息是被串行处理,在处理第一个消息的时候,后面的消息将被堵塞,同样反过来客户端也有同样的问题。 

    客户端Porcessor堵塞测试情况: 
    1.以下代码在建立连接后连续发送了5个消息(item) 
     

    Java代码
     1 ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));
     2                 future.awaitUninterruptibly();
     3                 session = future.getSession();
     4                 Item item = new Item();
     5                 item.setId(12345);
     6                 item.setName("hi");
     7                 session.write(item);
     8                 session.write(item);
     9                 session.write(item);
    10                 session.write(item);
    11                 session.write(item);

    2.在handle的messageSent方法进行了延时处理,延时3秒 

    1   public void messageSent(IoSession session, Object message) throws Exception {
    2         Thread.sleep(3000);
    3         System.out.println(message);
    4         
    5     }

    3.测试结果 
    5个消息是串行发送,都由同一个IoPorcessor线程处理 

    1   session.write(item);
    2                 session.write(item);
    3                 session.write(item);
    4                 session.write(item);
    5                 session.write(item);

    4.增加ExecutorFilter,ExecutorFilter保证在处理handler的时候是独立线程 
    connector.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); 
    5.测试结果 
    4个session.wirte变成了并行处理,服务器端同时收到了5条消息 

  • 相关阅读:
    省市级联
    innerText与innerHTML的区别
    使用代码提交表单
    Date的使用
    Array(数组)的使用
    [贪心]Rest Stops
    [数论][Miller_Rabin] Goldbach
    [简单思维题]Hoofball
    [数论][容斥原理]Co-prime
    [图论][二分图最大匹配]Courses
  • 原文地址:https://www.cnblogs.com/xuanyuanzhuo-blog/p/3987953.html
Copyright © 2011-2022 走看看