zoukankan      html  css  js  c++  java
  • Mina源码阅读笔记(四)—Mina的连接IoConnector2

    接着Mina源码阅读笔记(四)—Mina的连接IoConnector1,,我们继续:

    AbstractIoAcceptor:

    001 package org.apache.mina.core.rewrite.service;
    002  
    003 import java.io.IOException;
    004 import java.net.SocketAddress;
    005 import java.util.ArrayList;
    006 import java.util.Collections;
    007 import java.util.HashSet;
    008 import java.util.List;
    009 import java.util.Set;
    010 import java.util.concurrent.Executor;
    011  
    012 public abstract class AbstractIoAcceptor extends AbstractIoService implements
    013         IoAcceptor {
    014  
    015     private final List<SocketAddress> defaultLocalAddresses = new ArrayList<SocketAddress>();
    016  
    017     private final List<SocketAddress> unmodifiableDeffaultLocalAddresses = Collections
    018             .unmodifiableList(defaultLocalAddresses);
    019  
    020     private final Set<SocketAddress> boundAddresses = new HashSet<SocketAddress>();
    021  
    022     private boolean disconnectOnUnbind = true;
    023  
    024     /** 这里不是很明白,为什么要用protected 而 不是private */
    025     protected final Object bindLock = new Object();
    026  
    027     /**
    028      * 注意这个构造方法是一定要写的,否则编译不通过:抽象类继承时候,构造方法都要写,而且必须包含super
    029      *
    030      * @param param
    031      *            sessionConfig
    032      * @param executor
    033      */
    034     protected AbstractIoAcceptor(Object param, Executor executor) {
    035         super(param, executor);
    036         defaultLocalAddresses.add(null);
    037     }
    038  
    039     @Override
    040     public SocketAddress getLocalAddress() {
    041  
    042         Set<SocketAddress> localAddresses = getLocalAddresses();
    043         if (localAddresses.isEmpty()) {
    044             return null;
    045         }
    046         return localAddresses.iterator().next();
    047     }
    048  
    049     @Override
    050     public final Set<SocketAddress> getLocalAddresses() {
    051         Set<SocketAddress> localAddresses = new HashSet<SocketAddress>();
    052         synchronized (boundAddresses) {
    053             localAddresses.addAll(boundAddresses);
    054         }
    055         return localAddresses;
    056     }
    057  
    058     @Override
    059     public void bind(SocketAddress localAddress) throws IOException {
    060         // TODO Auto-generated method stub
    061  
    062     }
    063  
    064     @Override
    065     public void bind(Iterable<? extends SocketAddress> localAddresses)
    066             throws IOException {
    067         // TODO isDisposing()
    068  
    069         if (localAddresses == null) {
    070             throw new IllegalArgumentException("localAddresses");
    071         }
    072  
    073         List<SocketAddress> localAddressesCopy = new ArrayList<SocketAddress>();
    074  
    075         for (SocketAddress a : localAddresses) {
    076             // TODO check address type
    077             localAddressesCopy.add(a);
    078         }
    079  
    080         if (localAddressesCopy.isEmpty()) {
    081             throw new IllegalArgumentException("localAddresses is empty");
    082         }
    083  
    084         boolean active = false;
    085  
    086         synchronized (bindLock) {
    087             synchronized (boundAddresses) {
    088                 if (boundAddresses.isEmpty()) {
    089                     active = true;
    090                 }
    091             }
    092         }
    093         /** implement in abstractIoService */
    094         if (getHandler() == null) {
    095             throw new IllegalArgumentException("handler is not set");
    096         }
    097  
    098         try {
    099             Set<SocketAddress> addresses = bindInternal(localAddressesCopy);
    100  
    101             synchronized (boundAddresses) {
    102                 boundAddresses.addAll(addresses);
    103             }
    104         catch (IOException e) {
    105             throw e;
    106         catch (RuntimeException e) {
    107             throw e;
    108         catch (Throwable e) {
    109             throw new RuntimeException("Filed ti bind");
    110         }
    111          
    112         if(active){
    113             //do sth
    114         }
    115     }
    116  
    117     protected abstract Set<SocketAddress> bindInternal(
    118             List<? extends SocketAddress> localAddress) throws Exception;
    119  
    120     @Override
    121     public void unbind(SocketAddress localAddress) {
    122         // TODO Auto-generated method stub
    123          
    124     }
    125 }
    polling:
    01 package org.apache.mina.core.rewrite.polling;
    02  
    03 import java.net.SocketAddress;
    04 import java.nio.channels.ServerSocketChannel;
    05 import java.util.List;
    06 import java.util.Set;
    07 import java.util.concurrent.Executor;
    08 import java.util.concurrent.Semaphore;
    09 import java.util.concurrent.atomic.AtomicReference;
    10  
    11 import org.apache.mina.core.rewrite.service.AbstractIoAcceptor;
    12  
    13 public abstract class AbstractPollingIoAcceptor extends AbstractIoAcceptor {
    14  
    15     private final Semaphore lock = new Semaphore(1);
    16  
    17     private volatile boolean selectable;
    18  
    19     private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
    20  
    21     /**
    22      * define the num of sockets that can wait to be accepted.
    23      */
    24     protected int backlog = 50;
    25  
    26     /**
    27      * 一样的,这个构造方法也要写
    28      *
    29      * @param param
    30      * @param executor
    31      */
    32     protected AbstractPollingIoAcceptor(Object param, Executor executor) {
    33         super(param, executor);
    34         // TODO Auto-generated constructor stub
    35     }
    36  
    37     /**
    38      * init the polling system. will be called at construction time
    39      *
    40      * @throws Exception
    41      */
    42     protected abstract void init() throws Exception;
    43  
    44     protected abstract void destory() throws Exception;
    45  
    46     protected abstract int select() throws Exception;
    47     /**这里有点儿变动*/
    48     protected abstract ServerSocketChannel open(SocketAddress localAddress) throws Exception;
    49  
    50     @Override
    51     protected Set<SocketAddress> bindInternal(
    52             List<? extends SocketAddress> localAddress) throws Exception {
    53         // ...
    54         try {
    55             lock.acquire();
    56             Thread.sleep(10);
    57         finally {
    58             lock.release();
    59         }
    60         // ...
    61         return null;
    62     }
    63  
    64     /**
    65      * this class is called by startupAcceptor() method it's a thread accepting
    66      * incoming connections from client
    67      *
    68      * @author ChenHui
    69      *
    70      */
    71     private class Acceptor implements Runnable {
    72         @Override
    73         public void run() {
    74             assert (acceptorRef.get() == this);
    75  
    76             int nHandles = 0;
    77  
    78             lock.release();
    79  
    80             while (selectable) {
    81                 try {
    82                     int selected = select();
    83  
    84                     // nHandles+=registerHandles();
    85  
    86                     if (nHandles == 0) {
    87                         acceptorRef.set(null);
    88                         // ...
    89                     }
    90                 catch (Exception e) {
    91  
    92                 }
    93             }
    94         }
    95     }
    96 }
    好了最后看NioSoeketAcceptor:
    001 package org.apache.mina.rewrite.transport.socket.nio;
    002  
    003 import java.net.InetSocketAddress;
    004 import java.net.ServerSocket;
    005 import java.net.SocketAddress;
    006 import java.nio.channels.SelectionKey;
    007 import java.nio.channels.Selector;
    008 import java.nio.channels.ServerSocketChannel;
    009 import java.util.concurrent.Executor;
    010  
    011 import org.apache.mina.core.rewrite.polling.AbstractPollingIoAcceptor;
    012 import org.apache.mina.rewrite.transport.socket.SocketAcceptor;
    013  
    014 public final class NioSocketAcceptor extends AbstractPollingIoAcceptor
    015         implements SocketAcceptor {
    016  
    017     private volatile Selector selector;
    018  
    019     protected NioSocketAcceptor(Object param, Executor executor) {
    020         super(param, executor);
    021         // TODO Auto-generated constructor stub
    022     }
    023  
    024     @Override
    025     public int getManagedSessionCount() {
    026         // TODO Auto-generated method stub
    027         return 0;
    028     }
    029  
    030     /**
    031      * 这个方法继承自AbstractIoAcceptor
    032      *
    033      * The type NioSocketAcceptor must implement the inherited abstract method
    034      * SocketAcceptor.getLocalAddress() to override
    035      * AbstractIoAcceptor.getLocalAddress()
    036      */
    037     @Override
    038     public InetSocketAddress getLocalAddress() {
    039         // TODO Auto-generated method stub
    040         return null;
    041     }
    042  
    043     @Override
    044     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
    045         // TODO Auto-generated method stub
    046  
    047     }
    048  
    049     @Override
    050     public boolean isReuseAddress() {
    051         // TODO Auto-generated method stub
    052         return false;
    053     }
    054  
    055     @Override
    056     protected void init() throws Exception {
    057         selector = Selector.open();
    058     }
    059  
    060     @Override
    061     protected void destory() throws Exception {
    062         if (selector != null) {
    063             selector.close();
    064         }
    065     }
    066  
    067     @Override
    068     protected int select() throws Exception {
    069         return selector.select();
    070     }
    071  
    072     @Override
    073     protected void dispose0() throws Exception {
    074         // TODO Auto-generated method stub
    075  
    076     }
    077  
    078     protected ServerSocketChannel open(SocketAddress localAddress)
    079             throws Exception {
    080         ServerSocketChannel channel =ServerSocketChannel.open();
    081          
    082         boolean success=false;
    083          
    084         try{
    085             channel.configureBlocking(false);
    086              
    087             ServerSocket socket=channel.socket();
    088              
    089             socket.setReuseAddress(isReuseAddress());
    090              
    091             socket.bind(localAddress);
    092              
    093             channel.register(selector, SelectionKey.OP_ACCEPT);
    094              
    095             success=true;
    096         }finally{
    097             if(!success){
    098                 //close(channel);
    099             }
    100         }
    101         return channel;
    102     }
    103  
    104     @Override
    105     public boolean isActive() {
    106         // TODO Auto-generated method stub
    107         return false;
    108     }
    109  
    110 }
    ------------------------------------------------------

    到此为止将连接部分都写完了,在连接部分还有些零碎的东西,比如handlerpolling,这些都只是稍稍提了一下,具体后面会在介绍其他部分是肯定还会碰上,我还是想把重心放在最主要的部分去写,下一篇应该要写到session了。

  • 相关阅读:
    深度学习(机器学习)tensorflow环境配置及安装
    深度学习(机器学习)tensorflow学习第一课——tensor数据载体中的基本数据类型
    学生选课小项目——麻雀虽小,五脏俱全(Templates)
    素数定理简史
    关于全栈岗位及其成员轮岗问题的思考
    Spring WebFlux 简单业务代码及其Swagger文档
    Spring 5 中函数式webmvc开发中的swagger文档
    创建基于kotlin开发环境的spring项目入门
    折射定律的来历简介
    朴素贝叶斯方法入门
  • 原文地址:https://www.cnblogs.com/wuyida/p/6300943.html
Copyright © 2011-2022 走看看