zoukankan      html  css  js  c++  java
  • Mina(Multipurpose Infrastructure for Network Applications)完全文档 by 康康柳丁

    Mina文档

    Nio:

    NIO是一个基于事件的IO架构,最基本的思想就是:有事件我通知你,你再去做你的事情,没事件时你大可以节约大把时间去做其它任何事情。而且NIO的主线程only one,不像传统的模型,需要N个线程去,也减轻了JVM的工作量,使得JVM处理任务时显得更加高效。

    Mina 综述

    •         Multipurpose Infrastructure for Network Applications

    •         一个基于非阻塞I/O的网络框架。

    •         高可维护性,高可复用性:网络I/O编码,消息的编/解码,业务逻辑互相分离。

    •         与JMX结合。

    •         使用sfj4作为log

    •         支持UDP,支持客户端API。

    •         由Netty2的作者Trustin Lee开始开发的。

    •         相对容易进行单元测试

    Mina总体结构:

     

    IoSession

    •持有连接(服务端或客户端)

    •和每个事件一起通过

    •主要方法

    •write

    •close

    •get/setAttribute

    IoHandler

     

    •类似Servlet

    • filter chain的结尾部分。

    •主要方法:

    •sessionOpened

    •messageReceived

    •sessionClosed

     

     

    IoFilterChain

     

    •一连串的IoFilter,和每个session对应

    •可以为第个 IoConnector/IoAcceptor设置IoFilter模板。

    •动态添加和移除。

    IoFilters

     

    •类似一个 ServletFilter

    •观察事件流。

    •主要方法:

    •sessionOpened

    •messageReceived

    •filterWrite

    •sessionClosed

    IoAcceptor

    •服务端入口

    •接收访问的连接和到IoHandler的触发事件.

    •主要方法:

    •bind

    IoConnector

    •客户端入口

    •向远程服务器发起连接,触发事件到IoHandler

    •主要方法:

    •connect

    IoProcessor

    内部组件

    •为其下的连接执行读写数据的操作。

    •每个连接与一个IoProcessor相关联(多个连接之间共享一个IoProcessor)。

     

    Mina的简单示例:

    1.配置环境:

    (1).edu.jar: 类似java.util 和java.util.concurrent中的包.不用jdk5.0的,好像是因为向jdk1.4的兼容。http://dcl.mathcs.emory.edu/util/backport-util-concurrent

    (2).junit-4.1.jar:在eclipse的插件中有 D:。。eclipse\plugins\org.junit4_4.1.0、junit-4.1.jar

    (3).slf4j-api-1.3.0.jar,slf4j-jdk14-1.3.0.jar:

      mina中用slf4j,应该是commons-logging的替代。http://www.slf4j.org/download.html

    察看mina的QA email:

     http://www.mail-archive.com/mina-dev@directory.apache.org/msg02252.htmlServer.java

     

    package com.zkchen.mina.sample;

     

    import java.net.InetSocketAddress;

     

    import org.apache.mina.common.IoAcceptor;

    import org.apache.mina.filter.LoggingFilter;

    import org.apache.mina.filter.codec.ProtocolCodecFilter;

    import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;

    import org.apache.mina.transport.socket.nio.SocketAcceptor;

    import org.apache.mina.transport.socket.nio.SocketAcceptorConfig;

     

    public class Server   

    {   

        private static final int SERVER_PORT = 8080;   

       

        public Server(){

         

        }

        public static void main( String[] args ) throws Throwable   

       {   

            IoAcceptor acceptor = new SocketAcceptor();   

            SocketAcceptorConfig cfg = new SocketAcceptorConfig();   

            cfg.setReuseAddress( true );   

            cfg.getFilterChain().addLast(   

                        "codec",   

                        new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );   

            cfg.getFilterChain().addLast( "logger", new LoggingFilter() );   

            acceptor.bind(   

                    new InetSocketAddress( SERVER_PORT ),   

                    new ServerSessionHandler( ), cfg );   

       

            System.out.println( "The server Listening on port " + SERVER_PORT );   

        }   

    }  

     

    ServerHandler.java

     

    package com.zkchen.mina.sample;

     

    import org.apache.mina.common.ByteBuffer;

    import org.apache.mina.common.IoHandlerAdapter;

    import org.apache.mina.common.IoSession;

     

    public class ServerHandler extends IoHandlerAdapter   

    {   

        public void sessionOpened( IoSession session )   

        {   

        }   

       

        public void messageReceived( IoSession session, Object message )   

        {   

             if (!(message instanceof ByteBuffer)) {

                        return;

                }

                ByteBuffer rb = (ByteBuffer) message;

                // Write the received data back to remote peer

                ByteBuffer wb = ByteBuffer.allocate(rb.remaining());

                wb.put(rb);

                wb.flip();

                session.write("received message: "+wb);

        }   

       

       

        public void exceptionCaught( IoSession session, Throwable cause )   

        {   

            // close the connection on exceptional situation   

            session.close();   

        }   

    }   

       

    3.客户端代码:

    Client.java

     

    package com.zkchen.mina.sample;

     

    import java.net.InetSocketAddress;

     

    import org.apache.mina.common.IoSession;

    import org.apache.mina.filter.LoggingFilter;

    import org.apache.mina.filter.codec.ProtocolCodecFilter;

    import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;

    import org.apache.mina.transport.socket.nio.SocketConnector;

    import org.apache.mina.transport.socket.nio.SocketConnectorConfig;

     

    public class Client   

    {   

        private static final String HOSTNAME = "localhost";   

        private static final int PORT = 8080;   

        private static final int CONNECT_TIMEOUT = 30; // seconds   

       

       

        public static void main( String[] args ) throws Throwable   

        {   

            SocketConnector connector = new SocketConnector();           

            // Configure the service.   

            SocketConnectorConfig cfg = new SocketConnectorConfig();   

            cfg.setConnectTimeout( CONNECT_TIMEOUT );   

              cfg.getFilterChain().addLast(   

                        "codec",   

                        new ProtocolCodecFilter( new ObjectSerializationCodecFactory() ) );   

       

            cfg.getFilterChain().addLast( "logger", new LoggingFilter() );   

            connector.connect(new InetSocketAddress( HOSTNAME, PORT ),    

                            new ClientHandler(), cfg );   

       

        }   

    }   

    ClientHandler.java

     

    package com.zkchen.mina.sample;

     

    import org.apache.mina.common.ByteBuffer;

    import org.apache.mina.common.IoHandlerAdapter;

    import org.apache.mina.common.IoSession;

     

    public class ClientHandler extends IoHandlerAdapter {

     

          public ClientHandler() {

                 super();

          }

     

          public void sessionOpened(IoSession session) {

                 session.write("hello");

          }

     

          public void messageReceived(IoSession session, Object message) {

                 System.out.println("in messageReceived!");

                 if (!(message instanceof ByteBuffer)) {

                         return;

                 }

                 ByteBuffer rb = (ByteBuffer) message;

                 // Write the received data back to remote peer

                 ByteBuffer wb = ByteBuffer.allocate(rb.remaining());

                 wb.put(rb);

                 System.out.println(wb.toString());

                 wb.flip();

                 System.out.println(wb);

          }

     

          public void exceptionCaught(IoSession session, Throwable cause) {

                 session.close();

          }

    }

    编译运行Server.java,编译运行Client.java.。

    Server:

    Client:

     

    线程池的配置:

    默认线程池的设置:

          IoServiceConfig config = acceptor.getDefaultConfig();  

           config .setThreadModel(ThreadModel.MANUAL);

         设置在Mina源代码ExecutorFilter.java中:

    public ExecutorFilter()

        {

            this( new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new LinkedBlockingQueue() ) );

        }

    I/O worker threads 的配置:

    •        Acceptor thread :接收connector的连接,向I/O processor thread 的连接进行读写操作,默认有一个线程,不能配置。

    •        Connector thread :连接到远程节点,向I/O processor thread 的连接进行读写操作,默认有一个线程,不能配置.。

    •        I/O processor thread :执行读写操作。默认一个线程,可配置:

    •        IoAcceptor acceptor = new SocketAcceptor(4, Executors.newCachedThreadPool());

       最好等于CPU的数量。

    添加ExecutorFilter 到一个 IoFilterChain:

    如果没有添加ExecutorFilter。IoHandler的业务逻辑实现将会在I/O processor thread中运行。称为单线程模式。在相应要求不高的应用中是可取的。

    典型的网络应用需要一个ExecutorFilter来添加到IoFilterChain,因为业务逻辑从I/Oprocessor threads有不同的资源使用模式,如果一个执行数据库操作的应用没有添加的话。当一个数据库操作发生时整个服务器可以阻塞,尤其是当数据库超载时。IoServer添加ExecutorFilter当IoSession创建时:

    IoAcceptor acceptor = ...;

    DefaultIoFilterChainBuilder filterChainBuilder = acceptor.getDefaultConfig().getFilterChain();

    filterChainBuilder.addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool());

    ExecutorFilter没有生命周期。必须自己关闭。

    ExecutorService executor = ...;

     

    IoAcceptor acceptor = ...;

    DefaultIoFilterChainBuilder filterChainBuilder = acceptor.getDefaultConfig().getFilterChain();

    filterChainBuilder.addLast("threadPool", new ExecutorFilter(executor);

     

    // Start the server.

    acceptor.bind(...);

    / Shut down the server.

    acceptor.unbind(...);

    executor.shutdown();

     

    当IoHandler应用有数据库操作时,最后添加ExecutorFilter。建议在一个ProtocolCodecFilter应用后添加ExcutorFilter。因为protocol codec的特征是cpu-bound的。和I/O processor threads 相似。

     

    IoAcceptor acceptor = ...;

    DefaultIoFilterChainBuilder filterChainBuilder = acceptor.getDefaultConfig().getFilterChain();

    // Add CPU-bound job first,

    filterChainBuilder.addLast("codec", new ProtocolCodecFactory(...));

    // and then a thread pool.

    filterChainBuilder.addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool());

    完成的server-push

    •        功能: 服务器向客户端发送消息,并实时显示在客户端页面上。

    •        消息:MsgBus运行时,产生Topic,相应的writer和reader。

    •        Mina服务器:当产生新的消息(topic、writer、reader)时。把它发向客户端。

    •        客户端:用flash-socket接收到消息,再用javascript实时添加到页面上。

  • 相关阅读:
    NET 2.0(C#)调用ffmpeg处理视频的方法(转载)
    ffmpeg编解码详细过程(转载)
    使用FFMPEG SDK解码流数据
    FFMPEG解码流程
    web worker,SSE,WebSocket,AJAX 与后端交互的方式
    jQuery基础篇
    Git的基本命令介绍
    项目配置中 提示access denied的问题 解决方案
    http网络协议 学习摘要
    OSI七层模型 学习摘要
  • 原文地址:https://www.cnblogs.com/sunwei2012/p/1686754.html
Copyright © 2011-2022 走看看