zoukankan      html  css  js  c++  java
  • mina 之初体验

    由于项目需要,用到了 mina 框架进行 tcp 通讯。我是初次接触 mina,于是从 Hello world 开始学习了 mina 。期间遇到了一个奇怪的发送数据的延迟问题,解决的过程是曲折的,但找出的原因却令我“吐血”(没真的吐……)。不管怎样,还是贴出来一下作反面案例,希望初次学习 mina 的时候能够绕过这个地雷。
     
    hello world 演示很简单,分为两部分,server 和 client 。
     
    server 在 8800 端口上起 tcp 侦听,将 client 发送来的消息打印到标准输出(System.out)。只有两个类 HelloServer.java 和 HelloHandler.java,代码如下:

     

    public class HelloServer {

        public static void main(String[] args) {

           IoBuffer.setUseDirectBuffer(false);

           IoBuffer.setAllocator(new SimpleBufferAllocator());

     

           IoAcceptor acceptor = new NioSocketAcceptor();

     

           acceptor.getFilterChain().addLast("logger", new LoggingFilter());

           acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(

                  Charset.forName("UTF-8"))));

          

           acceptor.getSessionConfig().setReadBufferSize(2048);

           acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

     

           acceptor.setHandler(new HelloHandler());

     

           try {

               acceptor.bind(new InetSocketAddress(8800));

     

               System.out.println("hello server is running!");

           } catch (IOException e) {

               e.printStackTrace();

           }

        }

    }

     

    public class HelloHandler extends IoHandlerAdapter {

     

        @Override

        public void sessionCreated(IoSession session) throws Exception {

           System.out.println("sessiong created ......");

        }

     

        @Override

        public void sessionOpened(IoSession session) throws Exception {

           System.out.println("session opened ......");

        }

     

        @Override

        public void sessionClosed(IoSession session) throws Exception {

           System.out.println("session closed .");

        }

     

        @Override

        public void messageReceived(IoSession session, Object message)

               throws Exception {

           String msgText = message.toString();

           System.out.println(getNow() + "message received!-- msg:"

                  + msgText);

        }

       

        private String getNow(){

           Date now = new Date();

           DateFormat df = new SimpleDateFormat("[yyyy-MM-dd hh:mm:ss] ");

           return df.format(now);

        }

     

        @Override

        public void messageSent(IoSession session, Object message) throws Exception {

           String msgText = message.toString();

           System.out.println("message sent!-- msg:" + msgText);

        }

     

        @Override

        public void exceptionCaught(IoSession session, Throwable cause)

               throws Exception {

           System.out.println("exception occurred!!! -- " + cause.getMessage());

           cause.printStackTrace();

           session.close(false);

        }

    }

     
    client 通过 8800 端口连接 server ,发送两条字符串消息。只有两个类 HelloClient.java 和 HelloSender.java, 代码如下:

    public class HelloClient {

        public static void main(String[] args) {

           NioSocketConnector connector = new NioSocketConnector();

           connector.setConnectTimeoutMillis(1000 * 15);

     

           connector.getFilterChain().addLast("logger", new LoggingFilter());

           connector.getFilterChain().addLast(

                  "codec",

                  new ProtocolCodecFilter(new TextLineCodecFactory(Charset

                         .forName("UTF-8"))));

          

           connector.setHandler(new HelloSender(new String[] {

                  "Hello message 1 !", "Hello 2" }));

          

           try {

               ConnectFuture future = connector.connect(new InetSocketAddress(

                      "127.0.0.1", 8800));

               System.out.println("connect ...");

               future.awaitUninterruptibly();

               System.out.println("connect future awaitUniterruptibly ...");

               IoSession session = future.getSession();

               System.out.println("get session");

               session.getCloseFuture().awaitUninterruptibly();

               System.out.println("session close future awaitUniterruptibly ...");

               connector.dispose();

              

           } catch (Exception e) {

               System.out.println("error !!! --" + e.getMessage());

               e.printStackTrace();

           }

        }

    }

     

    public class HelloSender extends IoHandlerAdapter {

     

        private String[] msgArray;

     

        public HelloSender(String[] msgArray) {

           this.msgArray = msgArray;

        }

       

        @Override

        public void sessionOpened(IoSession session) throws Exception {      

           SendMessage(session);

           session.close(false);

           System.out.println("client handler close session ......");

        }

       

        private void SendMessage(IoSession session)throws Exception{

           for (int i = 0; i < msgArray.length; i++) {

              

               WriteFuture wf = session.write(msgArray[i]);          

               wf.addListener(new IoFutureListener<IoFuture>(){

     

                  public void operationComplete(IoFuture future) {

                      System.out.println(getNow() + "futrue -- write completed!");

                  }

                 

               });

              

               System.out.println(getNow() + "--write msg " + i);

               Thread.sleep(3000);

           }

          

           System.out.println(getNow() + "send completed");

        }

       

        private String getNow(){

           Date now = new Date();

           DateFormat df = new SimpleDateFormat("[yyyy-MM-dd hh:mm:ss] ");

           return df.format(now);

        }

    }

     
    先启动服务端,再启动客户端。
     
    服务端日志输出如下:
    sessiong created ......
    session opened ......
    [2011-07-13 09:59:19] message received!-- msg:Hello message 1 !
    [2011-07-13 09:59:19] message received!-- msg:Hello 2
    session closed .
     

     

    客户端日志输出如下:
    connect ...
    connect future awaitUniterruptibly ...
    get session
    [2011-07-13 09:59:13] --write msg 0
    [2011-07-13 09:59:16] --write msg 1
    [2011-07-13 09:59:19] send completed
    client handler close session ......
    [2011-07-13 09:59:19] futrue -- write completed!
    [2011-07-13 09:59:19] futrue -- write completed!
    session close future awaitUniterruptibly ...
     
    仔细观察日志输出就能发现奇怪的问题:客户端是前后相隔 3 秒往 session 中写入数据,但是却在最后同时地发送出去,而服务端也是同时收到这两条消息的,而不是前后相隔 3 秒。
     
    一、关于 write 和 flush 的猜想
    这也就是说,session 在 write 完成之后并没有立即发送。初步猜测,write 操作时有可能数据只是被放置到缓冲区而已,只有执行 flush 一类的操作才会真正发送出去。
    但是 IoSession 类型并没有找到类似 flush 的方法。通过查看源码及 API ,发现 write 的时候,主要逻辑如下:

            // Now, we can write the message. First, create a future

            WriteFuture writeFuture = new DefaultWriteFuture(this);

            WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);

     

            // Then, get the chain and inject the WriteRequest into it

            IoFilterChain filterChain = getFilterChain();

            filterChain.fireFilterWrite(writeRequest);

    每一次写入,都产生一个 WriteRequest 实例,并将该实例传递给 IoSession 的过滤链 filterChain 进行处理。
    先说说 IoFilterChain ,其数据结构一个保持 IoFilter 的双向链表。 IoFilterChain 包含两个特殊的 filter : head 和 tail ,用户加入的 filter 都放置于head 和 tail 之间。filterChain.fireFilterWrite(writeRequest) 操作将从 tail 开始往前,逐个地调用 IoFilter 的 filterWrite 方法,最终调用抵达 head 的 filterWrite 方法。想必写入操作就落实在此方法上,进去一探究竟。
     
    处于 head 位置的 IoFilter 为 DefaultIoFilter 的内部私有类 HeadFilter 的实例,其主要逻辑如下:

                s.getWriteRequestQueue().offer(s, writeRequest);

                if (!s.isWriteSuspended()) {

                    s.getProcessor().flush(s);

                }

     
    至此,已证实猜测:writeRequest 被放置到一个队列中,当 IoSession 的 WriteSuspended 状态为 false 时,会直接 flush 。但 flush 操作并不是 IoSession 的方法,而是 IoProcessor 的方法。
    那就在客户端的 HelloSender 的 SendMessage 方法中语句 WriteFuture wf = session.write(msgArray[i]);   之后加上一句

               ((AbstractIoSession) session).getProcessor().flush(session);

    调试运行 client ,但日志输出和之前一样,两次消息最终还是同时被发送出去的,这招不管用。
    跟踪发现 session 的 WriteSuspended 默认就是 false,每次 write 操作都会执行一次 flush,而不需要在 HelloSender 中再次调用 flush 操作。
    既然前面补上 flush 操作是多余的,那原因何在?难道 flush 操作中到调用真实的 Channel 进行 send 之间还有缓存?但这与 flush 的语义并不一致。继续查看源码一探究竟。
    IoProcessor 的 flush 操作的主要逻辑是把 session 加入到一个 flushingSessions  队列中,然后执行 selector.wakeup 线程执行发送。这 flush 并没有诡异之处,造成先后两次写入的消息同时被发送的原因显然不在于此。
     
    二、关于 SocketSessionConfig 的猜想
    在如此简单的演示案例中,既然 HelloSender 中的 write 和 flush 操作似乎都没什么问题,难道问题出在 HelloClient 上?
    HelloClient 的 main 方法的主要逻辑就以下几步:
    1、创建 connector;
    2、设置 filterChain;
    3、设置 IoHandler 为 HelloSender 的实例;
    4、连接服务端,产生会话;
     
    这几步操作都是标准的调用,不应该会有问题。但这里面并未对 IoSessionConfig 进行配置,是否是 IoSessionConfig 的默认配置的引起的呢?
    于是仔细查看了 IoSessionConfig 的所有属性,并没有属性是与此相关的。
    再查看其 IoSessionConfig 派生接口 SocketSessionConfig ,发现有一个 TcpNoDelay 属性。从字面上看,似乎有点关系。
    于是在第2步之后加上语句 

           connector.getSessionConfig().setTcpNoDelay(true);

    运行 HelloClient ……

    日志输出显示的结果还是一如既往。

    NoDelay 是 TCP 层的一个选项,其指示是否将缓存区中的数据合并发送。但在此无论设置为 true 还是 false,都不影响测试的结果。

     

    三、IoHandler 的问题

    在进行了以上两方面的尝试后,将怀疑的目光转向了 HelloSender ,这是本例中的 IoHandler 的实现。

    发送操作是在 HelloSender 的 sessionOpened 方法执行:

        @Override

        public void sessionOpened(IoSession session) throws Exception {      

           SendMessage(session);

           session.close(false);

           System.out.println("client handler close session ......");

        }

     

    当 session 创建后,sessionOpened 便被执行,此时发送数据,并在发送完成后关闭会话的,然后,sessionOpened 方法返回。

    仔细想想,似乎在 SendMessage 和 sessionOpened 方法返回之间有些问题:对 sessionOpened 的调用应该是建立连接和会话后初始化过程的一部分,而在初始化过程尚未返回的时候就对 IoSession 写入数据,这也许不是一个恰当的调用方式。也许正是这样,使得先后两次写入的数据都被保持在队列中,直到会话初始化完成后才被处理。在 sessionOpened 方法中创建一个新线程来执行 SendMessage 操作就能验证这一设想,于是 sessionOpened 方法改为如下: 

        @Override

        public void sessionOpened(IoSession session) throws Exception {

           final IoSession s = session;

           Thread thrd = new Thread(new Runnable() {

              

               public void run() {

                  try {

                      SendMessage(s);

                     

                      Thread.sleep(5000);

                      s.close(false);

                  } catch (Exception e) {

                      System.out.println("Send message error!!!--" + e.getMessage());

                      e.printStackTrace();

                  }

               }

           });

          

           thrd.start();

           System.out.println("client handler close session ......");

        }

     
    再次运行测试,日志输出如下:
    服务端日志:
    hello server is running!
    sessiong created ......
    session opened ......
    [2011-07-14 11:43:01] message received!-- msg:Hello message 1 !
    [2011-07-14 11:43:04] message received!-- msg:Hello 2
    session closed .
     
    客户端日志:
    connect ...
    tcpNoDelay=false
    connect future awaitUniterruptibly ...
    get session
    client handler close session ......
    [2011-07-14 11:43:01] futrue -- write completed!
    [2011-07-14 11:43:01] --write msg 0
    [2011-07-14 11:43:04] --write msg 1
    [2011-07-14 11:43:04] futrue -- write completed!
    [2011-07-14 11:43:07] send completed
    session close future awaitUniterruptibly ...
     
    日志显示先后间隔 3 秒写入的两条消息在服务端也同样是先后间隔 3 秒被接收到的,一切恢复正常。前面的设想得到证实,原因就是:向 IoSession 写入数据的时机不对。
     
    虽然结果是有点雷人,但探索的过程还是收获不少,加深了对 mina 框架的理解。
     
    2011-07-13

    黄海泉
  • 相关阅读:
    小程序获知渠道弹出层设计
    小程序下拉菜单筛选
    align-conten和align-items的区别
    微信小程序文本如何换行
    微信小程序最新获取用户头像昵称的方法
    JQ版本对比
    选择收货地址列表的某一项将数据传到订单页面
    inline、block、inline-block属性的区别
    微博资料
    zookeeper知识点学习
  • 原文地址:https://www.cnblogs.com/haiq/p/2124292.html
Copyright © 2011-2022 走看看