zoukankan      html  css  js  c++  java
  • Mina源码阅读笔记(四)—Mina的连接IoConnector1

    上一篇写的是IoAcceptor是服务器端的接收代码,今天要写的是IoConnector,是客户端的连接器。在昨天,我们还留下一些问题没有解决,这些问题今天同样会产生,但是都要等到讲到session的时候才能逐步揭开。先回顾一下问题:

    l  我们已经在AbstractPollingIoAcceptor中看到了,mina是将连接(命令)和业务(读写)分不同线程处理的,但是我们还没有看到mina是如何实现对这些线程的管理的。

    l  在昨天的最后,我们看到在NioSocketAcceptor中的具体NIO实现,但是我们还没有看到mina是在哪里调用了这些具体操作的。当然这也是mina对连接线程管理的一部分。

    这些问题今天也会出现,因为从名字上就能看出,IoConnector和IoAcceptor的构造相差不大,所以在写connector的分析时,主要会从结构和差异上入手,最后再给出昨天没写完的删减版的Acceptor。先回顾一下客户端连接的代码:

    01 // 创建一个非阻塞的客户端程序
    02         IoConnector connector = new NioSocketConnector();
    03         // 设置链接超时时间
    04         connector.setConnectTimeout(30000);
    05         // 添加过滤器
    06         connector.getFilterChain().addLast(
    07                 "codec",
    08                 new ProtocolCodecFilter(new MessageCodecFactory(
    09                         new InfoMessageDecoder(Charset.forName("utf-8")),
    10                         new InfoMessageEncoder(Charset.forName("utf-8")))));
    11         // 添加业务逻辑处理器类
    12         connector.setHandler(new ClientHandler());
    13         IoSession session = null;
    14         try {
    15             ConnectFuture future = connector.connect(new InetSocketAddress(
    16                     HOST, PORT));// 创建连接
    17             future.awaitUninterruptibly();// 等待连接创建完成
    18             session = future.getSession();// 获得session

    还是先看IoConnector的结构图,图来自mina官网,用XMind绘制的。在写构成之前,我们还是先看一下mina官网对这些connector的介绍:

    As we have to use an IoAcceptor for servers, you have to implement the IoConnector. Again, we have many implementation classes :

    • NioSocketConnector : the non-blocking Socket transport Connector
    • NioDatagramConnector : the non-blocking UDP transport * Connector*
    • AprSocketConnector : the blocking Socket transport * Connector*, based on APR
    • ProxyConnector : a Connector providing proxy support
    • SerialConnector : a Connector for a serial transport
    • VmPipeConnector : the in-VM * Connector*

    其中,NioSocketConnector是我们最常用到的,proxy方式虽然在mina的源码中也花了大篇幅去撰写,但可惜的是很少有相关的文档,所以学习的成本还挺高的。今天我们主要还是按照上图画的两条路来看NioSocketConnector

    和昨天一样,我们还是从左边的路走起,看interface IoConnector,这个接口主要定义了连接的方法以及socket连接时用到的参数。在mina中通过IoFuture来描述、侦听在IoSession上实现的异步IO操作,所以这IoConnector中的connect方法都返回了一个ConnectFuture实例。

    而在SocketConnector接口中的定义中就显得更简单了,它和IoConnector之间也是接口的继承关系,在SocketConnector中就定义了两类方法,一个对远程地址的getset,一个拿到session的配置。这些都容易理解。

    再来看右边,AbstractIoConnector,这个抽象类主要作用就是实现IoConnector里定义的操作,至于他又继承了AbstractIoService,一是为了用到父类(AbstractIoService)的方法,二是为了将那些父类没有实现的方法继续传递下去,让它(AbstractIoConnector)的子类去实现。所以看多了,好多结构也能看明白了,这里我觉得主要要学习的还是接口、抽象类之间的引用关系。

    继续看AbstractIoConnector,这个类主要是实现了connect的逻辑操作(封装了连接前后的一些必要执行步骤和check一些状态),具体的连接操作还是让子类去实现,这个和上篇写的AbstractIoAcceptor一模一样,在AbstractIoAcceptor中,主要也是封装了bind的逻辑操作,真正的bind过程是让子类去实现的简单看下代码:

    01 public final ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress,
    02             IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
    03         if (isDisposing()) {
    04             throw new IllegalStateException("The connector has been disposed.");
    05         }
    06  
    07         if (remoteAddress == null) {
    08             throw new IllegalArgumentException("remoteAddress");
    09         }
    10  
    11         if (!getTransportMetadata().getAddressType().isAssignableFrom(remoteAddress.getClass())) {
    12             throw new IllegalArgumentException("remoteAddress type: " + remoteAddress.getClass() + " (expected: "
    13                     + getTransportMetadata().getAddressType() + ")");
    14         }
    15  
    16         if (localAddress != null && !getTransportMetadata().getAddressType().isAssignableFrom(localAddress.getClass())) {
    17             throw new IllegalArgumentException("localAddress type: " + localAddress.getClass() + " (expected: "
    18                     + getTransportMetadata().getAddressType() + ")");
    19         }
    20  
    21         if (getHandler() == null) {
    22             if (getSessionConfig().isUseReadOperation()) {
    23                 setHandler(new IoHandler() {
    24                     public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
    25                         // Empty handler
    26                     }
    27  
    28                     public void messageReceived(IoSession session, Object message) throws Exception {
    29                         // Empty handler
    30                     }
    31  
    32                     public void messageSent(IoSession session, Object message) throws Exception {
    33                         // Empty handler
    34                     }
    35  
    36                     public void sessionClosed(IoSession session) throws Exception {
    37                         // Empty handler
    38                     }
    39  
    40                     public void sessionCreated(IoSession session) throws Exception {
    41                         // Empty handler
    42                     }
    43  
    44                     public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
    45                         // Empty handler
    46                     }
    47  
    48                     public void sessionOpened(IoSession session) throws Exception {
    49                         // Empty handler
    50                     }
    51                 });
    52             else {
    53                 throw new IllegalStateException("handler is not set.");
    54             }
    55         }
    56  
    57         return connect0(remoteAddress, localAddress, sessionInitializer);
    58 }
    59  
    60  
    61    protected abstract ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
    62             IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

    Connect0才是最后具体的操作,而这一步操作在这个类中被没有给出实现。具体实现放在了AbstractPollingIoConnector上。和昨天一样,这些设计都是对称的,我们还是看三点:

    l  implementing client transport using a polling strategy

    l  A Executor will be used for running client connection, and an AbstractPollingIoProcessor will be used for processing connected client I/O operations like reading, writing and closing.

    l  All the low level methods for binding, connecting, closing need to be provided by the subclassing implementation

    至于内部的具体实现,那跟acceptor中没什么区别,连使用的工具类都差别不大,这部分就很容易读懂了,只不过一个是bind一个是connect

    最后我们看NioSocketConnector,具体连接的实现类,只有一个成员变量和NioSocketAcceptor一样:

    01 private volatile Selector selector;
    02  
    03    @Override
    04     protected SocketChannel newHandle(SocketAddress localAddress) throws Exception {
    05         SocketChannel ch = SocketChannel.open();
    06  
    07         int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize();
    08         if (receiveBufferSize > 65535) {
    09             ch.socket().setReceiveBufferSize(receiveBufferSize);
    10         }
    11  
    12         if (localAddress != null) {
    13             ch.socket().bind(localAddress);
    14         }
    15         ch.configureBlocking(false);
    16         return ch;
    17     }

    只是需要注意,这里面专门有个内部类来处理selectionkey,将遍历的过程都抽离出来了,这个和我们用NIO的一般写法稍有不同,这样做的好处也是为了复用:

    01 private static class SocketChannelIterator implements Iterator<SocketChannel> {
    02  
    03         private final Iterator<SelectionKey> i;
    04  
    05         private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
    06             this.i = selectedKeys.iterator();
    07         }
    08  
    09         /**
    10          * {@inheritDoc}
    11          */
    12         public boolean hasNext() {
    13             return i.hasNext();
    14         }
    15  
    16         /**
    17          * {@inheritDoc}
    18          */
    19         public SocketChannel next() {
    20             SelectionKey key = i.next();
    21             return (SocketChannel) key.channel();
    22         }
    23  
    24         /**
    25          * {@inheritDoc}
    26          */
    27         public void remove() {
    28             i.remove();
    29         }
    30     }
    ---------------------------------------------------------

    补一个上篇就应该发的acceptor的阉割版,写这样的东西主要还是为了理清楚结构。我主要是把内容简化了,但是结构都没有变,核心的成员变量也没有少:

    起点IoService:

    01 package org.apache.mina.core.rewrite.service;
    02  
    03 /**
    04  * IO Service --handler/processor/acceptor/connector
    05  *
    06  * @author ChenHui
    07  *
    08  */
    09 public interface IoService {
    10     /** 添加listener */
    11     void addListener(IoServiceListener listener);
    12  
    13     /** 销毁 */
    14     void dispose(boolean awaitTermination);
    15  
    16     /** 设置handler */
    17     IoHandler getHandler();
    18  
    19     void setHandler(IoHandler handler);
    20  
    21     /** 管理session */
    22     int getManagedSessionCount();
    23      
    24     boolean isActive();
    25 }
    左边的路
    01 package org.apache.mina.core.rewrite.service;
    02  
    03 import java.io.IOException;
    04 import java.net.SocketAddress;
    05 import java.util.Set;
    06  
    07 /**
    08  * 注意接口的继承,这里的方法都是新定义的
    09  *  
    10  * Acceptor 主要用于:Accepts incoming connection, communicates with clients, and
    11  * fires events to IoHandler
    12  *
    13  * @author ChenHui
    14  */
    15 public interface IoAcceptor extends IoService {
    16      
    17     SocketAddress getLocalAddress();
    18      
    19     Set<SocketAddress> getLocalAddresses();
    20      
    21     void bind(SocketAddress localAddress) throws IOException;
    22      
    23     void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException;
    24      
    25     void unbind(SocketAddress localAddress);
    26      
    27      
    28     /**没有写到IoSession 所以暂时不用*/
    29     //IoSession newSession(SocketAddress remoteAddress,SocketAddress localAddress);
    30 }
    SocketAcceptor:
    01 package org.apache.mina.rewrite.transport.socket;
    02  
    03 import java.net.InetSocketAddress;
    04  
    05 import org.apache.mina.core.rewrite.service.IoAcceptor;
    06  
    07 public interface SocketAcceptor extends IoAcceptor {
    08  
    09     InetSocketAddress getLocalAddress();
    10  
    11     void setDefaultLocalAddress(InetSocketAddress localAddress);
    12  
    13     public boolean isReuseAddress();
    14      
    15     // ...
    16  
    17     // SocketSessionConfig getSessionConfig();
    18 }
    再看右边的
    001 package org.apache.mina.core.rewrite.service;
    002  
    003 import java.util.concurrent.Executor;
    004 import java.util.concurrent.ExecutorService;
    005 import java.util.concurrent.Executors;
    006 import java.util.concurrent.TimeUnit;
    007 import java.util.concurrent.atomic.AtomicInteger;
    008  
    009 public abstract class AbstractIoService implements IoService {
    010  
    011     private static final AtomicInteger id = new AtomicInteger();
    012  
    013     private final String threadName;
    014  
    015     private final Executor executor;
    016  
    017     private final boolean createdExecutor;
    018  
    019     private IoHandler handler;
    020  
    021     // 用于安全的关闭
    022     protected final Object disposalLock = new Object();
    023  
    024     private volatile boolean disposing;
    025  
    026     private volatile boolean disposed;
    027  
    028     /**
    029      *
    030      * @param param
    031      *            sessionConfig IoSessionConfig
    032      * @param executor
    033      *            used for handling execution of IO event. can be null
    034      */
    035     protected AbstractIoService(Object param, Executor executor) {
    036  
    037         // TODO listener & session config
    038  
    039         if (executor == null) {
    040             this.executor = Executors.newCachedThreadPool();
    041             createdExecutor = true;
    042         else {
    043             this.executor = executor;
    044             createdExecutor = false;
    045         }
    046  
    047         threadName = getClass().getSimpleName() + "-" + id.incrementAndGet();
    048     }
    049      
    050     @Override
    051     public void addListener(IoServiceListener listener) {
    052         // TODO add listener
    053     }
    054  
    055     /**注意这个不是override来的*/
    056     protected final void ececuteWorker(Runnable worker, String suffix){
    057          
    058         String actualThreadName=threadName;
    059         if(suffix!=null){
    060             actualThreadName=actualThreadName+"-"+suffix;
    061         }
    062         executor.execute(worker);
    063     }
    064      
    065     @Override
    066     public void dispose(boolean awaitTermination) {
    067         if (disposed) {
    068             return;
    069         }
    070  
    071         synchronized (disposalLock) {
    072             if (!disposing) {
    073                 disposing = true;
    074                 try {
    075                     /** 真正的关闭方法TODO */
    076                     dispose0();
    077                 catch (Exception e) {
    078                     e.printStackTrace();
    079                 }
    080             }
    081         }
    082  
    083         if (createdExecutor) {
    084             ExecutorService e = (ExecutorService) executor;
    085             e.shutdown();
    086  
    087             if (awaitTermination) {
    088                 try {
    089  
    090                     e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
    091  
    092                 catch (InterruptedException e1) {
    093                     // 注意异常时的中断处理
    094                     Thread.currentThread().interrupt();
    095                 }
    096             }
    097         }
    098         disposed = true;
    099     }
    100  
    101     protected abstract void dispose0() throws Exception;
    102  
    103     @Override
    104     public IoHandler getHandler() {
    105         return this.handler;
    106     }
    107  
    108     @Override
    109     public void setHandler(IoHandler handler) {
    110         if (handler == null) {
    111             throw new IllegalArgumentException("handler cannot be null");
    112         }
    113         // TODO isActive: when service is active, cannot be set handler
    114         if(isActive()){
    115             throw new IllegalStateException("when service is active, cannot be set handler");
    116         }
    117          
    118         this.handler = handler;
    119     }
    120  
    121 }

  • 相关阅读:
    我的Ajax之旅(一):不能加载'AjaxControlToolkit'。。。拒绝访问
    网络编程(一):用C#下载网络文件的2种方法
    Locks
    Threads
    C语言中函数名和struct名可以重名!
    使用#include <pthread.h>
    APUE Chapter 7 (2)main函数的参数
    Creating a shared and static library with the gnu compiler [gcc][转]
    Signals
    APUE Chapter 7(3) – Memory layout of a C program
  • 原文地址:https://www.cnblogs.com/wuyida/p/6300944.html
Copyright © 2011-2022 走看看