zoukankan      html  css  js  c++  java
  • MINA2.0原理

    转自:http://blog.csdn.net/liuzhenwen/article/details/5894279

    客户端通信过程 
    1.通过SocketConnector同服务器端建立连接 
    2.链接建立之后I/O的读写交给了I/O Processor线程,I/O Processor是多线程的 
    3.通过I/O Processor读取的数据经过IoFilterChain里所有配置的IoFilter,IoFilter进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议 
    4.最后IoFilter将数据交给Handler进行业务处理,完成了整个读取的过程 
    5.写入过程也是类似,只是刚好倒过来,通过IoSession.write写出数据,然后Handler进行写入的业务处理,处理完成后交给IoFilterChain,进行消息过滤和协议的转换,最后通过I/O Processor将数据写出到socket通道 
    IoFilterChain作为消息过滤链 
    1.读取的时候是从低级协议到高级协议的过程,一般来说从byte字节逐渐转换成业务对象的过程 
    2.写入的时候一般是从业务对象到字节byte的过程 
    IoSession贯穿整个通信过程的始终 

    整个过程可以用一个图来表现 

    消息箭头都是有NioProcessor-N线程发起调用,默认情况下也在NioProcessor-N线程中执行 

    类图 
    http://mina.apache.org/class-diagrams.html#ClassDiagrams-ProtocolDecoderclassdiagram 

    Connector 
    作为连接客户端,SocketConector用来和服务器端建立连接,连接成功,创建IoProcessor Thread(不能超过指定的processorCount),Thread由指定的线程池进行管理,IoProcessor 利用NIO框架对IO进行处理,同时创建IoSession。连接的建立是通过Nio的SocketChannel进行。 

    NioSocketConnector connector = new NioSocketConnector(processorCount); 
    ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));建立一个I/O通道 

    Acceptor 
    作为服务器端的连接接受者,SocketAcceptor用来监听端口,同客户端建立连接,连接建立之后的I/O操作全部交给IoProcessor进行处理 
    IoAcceptor acceptor = new NioSocketAcceptor(); 
    acceptor.bind( new InetSocketAddress(PORT) ); 
    Protocol 
    利用IoFilter,对消息进行解码和编码,如以下代码通过 MyProtocolEncoder 将java对象转成byte串,通过MyProtocalDecoder 将byte串恢复成java对象 

    Java代码
    1. connector.getFilterChain().addLast("codec",  new  ProtocolCodecFilter( new MyProtocalFactory()));  
    2. ......  
    3. public   class  MyProtocalFactory  implements  ProtocolCodecFactory {  
    4.  ProtocolEncoderAdapter encoder = new  MyProtocolEncoder();  
    5.  ProtocolDecoder decoder = new  MyProtocalDecoder() ;  
    6.  public  ProtocolDecoder getDecoder(IoSession session)  throws  Exception {  
    7.   return  decoder;  
    8.  }  
    9.  public  ProtocolEncoder getEncoder(IoSession session)  throws  Exception {  
    10.   return  encoder;  
    11.  }  
    12. }  
    13. ......  
    14. public   class  MyProtocalDecoder  extends  ProtocolDecoderAdapter  {  
    15.   
    16.  public   void  decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)  
    17.    throws  Exception {  
    18.     
    19.   int   id  = in.getInt();  
    20.   int   len = in.getInt();  
    21.   byte []  dst =  new   byte [len];  
    22.     
    23.   in.get(dst);  
    24.     
    25.   String name = new  String(dst,"GBK");  
    26.     
    27.   Item item = new  Item();  
    28.   item.setId(id);  
    29.   item.setName(name);  
    30.   out.write(item);  
    31.  }  
    32. }  
    33. ......  
    34. public   class  MyProtocolEncoder  extends  ProtocolEncoderAdapter {  
    35.   
    36.  public   void  encode(IoSession session, Object message,  
    37.    ProtocolEncoderOutput out) throws  Exception {  
    38.   Item item = (Item)message;  
    39.   int  byteLen =  8  + item.getName().getBytes("GBK").length ;  
    40.   IoBuffer buf = IoBuffer.allocate(byteLen);  
    41.   buf.putInt(item.getId());  
    42.   buf.putInt(item.getName().getBytes("GBK").length);  
    43.   buf.put(item.getName().getBytes("GBK"));  
    44.   buf.flip();  
    45.   out.write(buf);  
    46.     
    47.  }  
    48. }  
    [java] view plaincopy
    1. connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalFactory()));  
    2. ......  
    3. public class MyProtocalFactory implements ProtocolCodecFactory {  
    4.  ProtocolEncoderAdapter encoder = new MyProtocolEncoder();  
    5.  ProtocolDecoder decoder = new MyProtocalDecoder() ;  
    6.  public ProtocolDecoder getDecoder(IoSession session) throws Exception {  
    7.   return decoder;  
    8.  }  
    9.  public ProtocolEncoder getEncoder(IoSession session) throws Exception {  
    10.   return encoder;  
    11.  }  
    12. }  
    13. ......  
    14. public class MyProtocalDecoder extends ProtocolDecoderAdapter  {  
    15.   
    16.  public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)  
    17.    throws Exception {  
    18.     
    19.   int  id  = in.getInt();  
    20.   int  len = in.getInt();  
    21.   byte[]  dst = new byte[len];  
    22.     
    23.   in.get(dst);  
    24.     
    25.   String name = new String(dst,"GBK");  
    26.     
    27.   Item item = new Item();  
    28.   item.setId(id);  
    29.   item.setName(name);  
    30.   out.write(item);  
    31.  }  
    32. }  
    33. ......  
    34. public class MyProtocolEncoder extends ProtocolEncoderAdapter {  
    35.   
    36.  public void encode(IoSession session, Object message,  
    37.    ProtocolEncoderOutput out) throws Exception {  
    38.   Item item = (Item)message;  
    39.   int byteLen = 8 + item.getName().getBytes("GBK").length ;  
    40.   IoBuffer buf = IoBuffer.allocate(byteLen);  
    41.   buf.putInt(item.getId());  
    42.   buf.putInt(item.getName().getBytes("GBK").length);  
    43.   buf.put(item.getName().getBytes("GBK"));  
    44.   buf.flip();  
    45.   out.write(buf);  
    46.     
    47.  }  
    48. }  


    handler 
    具体处理事件,事件包括:sessionCreated、sessionOpened、sessionClosed、sessionIdle、exceptionCaught、messageReceived、messageSent。 
    connector.setHandler(new MyHandler());MyHandler继承IoHandlerAdapter类或者实现IoHandler接口.事件最终由IoProcessor线程发动调用。 
    Processor 

    Processor线程主要负责具体的IO操作、filterChain、IoHandler执行。Processor线程的数量默认为cpu数量+1,主要是为了充分利用多核的处理能力。Processor线程的数量可以根据实际情况进行配置。
    多个IoSession会被分配到多个Processor线程中,可以理解为一个Processor线程“服务”一个或多个IoSession对象。值得一提的是,N个IoProcessor形成一个处理池(SimpleIoProcessorPoll),分配Processor的时候根据IoSession的id绝对值模N进行分配。

    具体代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    private IoProcessor<S> getProcessor(S session) {
            IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
      
            if (processor == null) {
                if (disposed || disposing) {
                    throw new IllegalStateException("A disposed processor cannot be accessed.");
                }
           //取模分配
                processor = pool[Math.abs((int) session.getId()) % pool.length];
      
                if (processor == null) {
                    throw new IllegalStateException("A disposed processor cannot be accessed.");
                }
            //session保定具体的processor线程
                session.setAttributeIfAbsent(PROCESSOR, processor);
            }
      
            return processor;
        }

    在默认情况下filterChain、IoHandler的操作都是有IoProcessor顺序执行(执行完一个再执行下一个),如果IoHandler中的业务处理比较耗时,那么Processor线程将阻塞,后续的请求将得不到处理。即同时处理的请求数最多只有N个(N为Processor线程数量)。在高并发的情况下这种模式有待改进。

    接着上面的类比,前台(acceptor、connector)接待客户(session)分配给业务员(IoProcessor)进行服务, 业务员对客户进行交流(IO操作、编码、解码等)并执行相应的业务服务(IoHandler)。同理业务员每次只能服务一个客户,其他客户只能等待(假设其他业务员都有自己的客户)。

    IoSession 
    IoSession是用来保持IoService的上下文,一个IoService在建立Connect之后建立一个IoSession(一个连接一个session),IoSession的生命周期从Connection建立到断开为止 
    IoSession做两件事情: 
    1.通过IoSession可以获取IoService的所有相关配置对象(持有对IoService,Processor池,SocketChannel,SessionConfig和IoService.IoHandler的引用) 
    2.通过IoSession.write 是数据写出的入口 

    MINA有3种worker线程 
    Acceptor、Connector、I/O processor 线程 
    Acceptor Thread 
    一般作为服务器端链接的接收线程,实现了接口IoService,线程的数量就是创建SocketAcceptor 的数量 
    Connector Thread 
    一般作为客户端的请求建立链接线程,实现了接口IoService,维持了一个和服务器端Acceptor的一个链接,线程数量就是创建SocketConnector 的数量 

    Mina的SocketAcceptor和SocketConnector均是继承了BaseIoService,是对IoService的两种不同的实现 
    I/O processor Thread 
    作为I/O真正处理的线程,存在于服务器端和客户端,用来处理I/O的读写操作,线程的数量是可以配置的,默认最大数量是CPU个数+1 
    服务器端:在创建SocketAcceptor的时候指定ProcessorCount 
    SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); 
    客户端:在创建SocketConnector 的时候指定ProcessorCount 
    SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); 
    I/O Processor Thread,是依附于IoService,类似上面的例子SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());是指SocketConnector这个线程允许CPU+1个I/O Processor Thread 
    NioProcessor虽然是多线程,但是对与一个连接的时候业务处理只会使用一个线程进行处理(Processor线程对于一个客户端连接只使用一个线程NioProcessor-n)如果handler的业务比较耗时,会导致NioProcessor线程堵塞 ,在2个客户端同时连接上来的时候会创建第2个(前提是第1个NioProcessor正在忙),创建的最大数量由Acceptor构造方法的时候指定。如果:一个客户端连接同服务器端有很多通信,并且I/O的开销不大,但是Handler处理的业务时间比较长,那么需要采用独立的线程模式,在 FilterChain的最后增加一个ExecutorFitler : 
    acceptor.getFilterChain().addLast(&quot;threadPool&quot;, new ExecutorFilter(Executors.newCachedThreadPool())); 
    这样可以保证processor和handler的线程是分开的,否则:客户端发送3个消息,而服务器对于每个消息要处理10s左右,那么这3个消息是被串行处理,在处理第一个消息的时候,后面的消息将被堵塞,同样反过来客户端也有同样的问题。 

    Mina 线程模型

    接上文,上文提到“若IoHandler中的业务处理比较耗时,将阻塞当前Processor线程”,那么可以通过添加线程池处理这个问题。
    Mina提供了一个很好的扩展点,可以在FilterChain添加线程池,让“下面”的业务处理过程并行起来。如图:

    第一种模式:Mina默认的模式,IoProcessor全程服务。(One by one)

    第二种模式:在FilterChain中加入ThreadPoolFilter,此时的处理流程变为Processor线程读取完数据后,执行IoFilterChain的逻辑。当执行到Thread Pool Filter的时候,该Filter会将后续的处理流程封装到一个Runnable对象中,并交由Filter自身的线程池来执行,而Processor线程则能立即返回来处理下一个IO请求。这样如果后面的IoFilter或IoHandler中有阻塞操作,只会引起Filter线程池里的线程阻塞,而不会阻塞住Processor线程,从而提高了服务器的处理能力。Mina提供了Thread Pool Filter的一个实现:ExecutorFilter。(本段内容为转载)

    第三种模式:在合适的环节添加多个线程池,这种适合于FilterChain、IoHandler过程中存在多个计算密集型的任务。一般不需要使用,加大了代码复杂程度。

    接着上面的类比,如果公司业务发展很好,客户增多,那么一个业务员(Processor)将很难及时服务到众多客户。于是公司决定业务员只负责与客户交流,具体的业务操作交由专门的业务服务团队(ThreadPool)。大家意会一下即可!



    客户端Porcessor堵塞测试情况: 
    1.以下代码在建立连接后连续发送了5个消息(item) 
    Java代码
    1. ConnectFuture future = connector.connect( new  InetSocketAddress(HOSTNAME, PORT));  
    2.                 future.awaitUninterruptibly();  
    3.                 session = future.getSession();  
    4.                 Item item = new  Item();  
    5.                 item.setId(12345 );  
    6.                 item.setName(&quot;hi&quot;);  
    7.                 session.write(item);  
    8.                 session.write(item);  
    9.                 session.write(item);  
    10.                 session.write(item);  
    11.                 session.write(item);  
    [java] view plaincopy
    1. ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));  
    2.                 future.awaitUninterruptibly();  
    3.                 session = future.getSession();  
    4.                 Item item = new Item();  
    5.                 item.setId(12345);  
    6.                 item.setName(&quot;hi&quot;);  
    7.                 session.write(item);  
    8.                 session.write(item);  
    9.                 session.write(item);  
    10.                 session.write(item);  
    11.                 session.write(item);  




    2.在handle的messageSent方法进行了延时处理,延时3秒 

     
    Java代码
    1. public   void  messageSent(IoSession session, Object message)  throws  Exception {  
    2.       Thread.sleep(3000 );  
    3.       System.out.println(message);  
    4.         
    5.   }  
    [java] view plaincopy
    1. public void messageSent(IoSession session, Object message) throws Exception {  
    2.       Thread.sleep(3000);  
    3.       System.out.println(message);  
    4.         
    5.   }  



    3.测试结果 
    5个消息是串行发送,都由同一个IoPorcessor线程处理

                 
    Java代码
    1. session.write(item);  
    2.               session.write(item);  
    3.               session.write(item);  
    4.               session.write(item);  
    5.               session.write(item);  
    [java] view plaincopy
    1. session.write(item);  
    2.               session.write(item);  
    3.               session.write(item);  
    4.               session.write(item);  
    5.               session.write(item);  


    服务器端每隔3秒收到一个消息。因为调用是由IoProcessor触发,而一个connector只会使用一个IoProcessor线程 

    4.增加ExecutorFilter,ExecutorFilter保证在处理handler的时候是独立线程 
    connector.getFilterChain().addLast(&quot;threadPool&quot;, new ExecutorFilter(Executors.newCachedThreadPool())); 
    5.测试结果 
    4个session.wirte变成了并行处理,服务器端同时收到了5条消息


  • 相关阅读:
    (单例)使用同步基元变量来检测程序是否已运行
    使用委托解决方法的跨线程调用问题
    Rtmp/Hls直播、点播服务器部署与配置
    关于C#调用广州医保HG_Interface.dll调用的一些总结(外部组件异常)
    redhat7.3配置163 yum源
    模块化InnoSetup依赖项安装
    [迷宫中的算法实践]迷宫生成算法——递归分割算法
    [新手学Java]使用beanUtils控制javabean
    【HTML5】Canvas绘图详解-1
    【Swift 】- 闭包
  • 原文地址:https://www.cnblogs.com/marcotan/p/4256875.html
Copyright © 2011-2022 走看看