zoukankan      html  css  js  c++  java
  • xsocket源码解读

    关于xsocket可见于 我的另外一篇文章http://www.blogjava.net/freeman1984/archive/2011/04/25/302706.html,或者查看官网http://xsocket.org/
    当然阅读xsocket需要一些线程,nio,niosocket,和java.util.concurrent(锁,线程池等)包的一些知识。要不读起来很费劲,建议先去了解下这些知识。可以在我的文章分类concurrent里面有一些,其他去网上找找。
    本文只读了一个主要的流程,对于一些其他的代码例如:ssl相关,读数据相关没有涉及,看有时间能补上。
    首先xsocket的几个关键的类
    Server: 服务器端初始化线程池创建IoAcceptor
    IoAcceptor:采用while循环接收客户端连接,并创建IoSocketDispatcher和IoChainableHandler
    IoSocketDispatcher:负责注册SelectionKey以及事件的分发,并交给IoChainableHandler处理,通过一个while循环来处理注册的SelectionKey事件。
    IHandler:事件处理,数据的读写等等。
    INonBlockingConnection客户端接口。
    (1)首先看下Server创建:
    构造方法

    protected Server(InetSocketAddress address, Map<String, Object> options, IHandler handler, SSLContext sslContext,
     
    boolean sslOn, int backlog, int minPoolsize, int maxPoolsize, int taskqueueSize) 
    这个方法主要是初始化线程池,构件acceptor
            defaultWorkerPool 
    = new WorkerPool(minPoolsize, maxPoolsize, taskqueueSize);
         workerpool 
    = defaultWorkerPool;
         
      
    if (sslContext != null{//是否使用ssl
       acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options, sslContext, sslOn);
       
      }
     else {
       acceptor 
    = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options);
      }



    其中线程池:使用jdk1.5以后的ThreadPoolExecutor,线程池最小默认2,最大100,QUEUE的大小默认也是100
    线程池最小:MIN_SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolMinSize", "2"));
    SIZE_WORKER_POOL = Integer.parseInt(System.getProperty("org.xsocket.connection.server.workerpoolSize", "100"));
    TASK_QUEUE_SIZE = Integer.parseInt(System.getProperty("org.xsocket.connection.server.taskqueuesize", Integer.toString(SIZE_WORKER_POOL)));

    (2)构件acceptor细节,最后server启动的时候会启动acceptor监听客户端

     public IoAcceptor(IIoAcceptorCallback callback, InetSocketAddress address, int backlog, SSLContext sslContext, boolean sslOn, boolean isReuseAddress) throws IOException {
            .
            serverChannel 
    = ServerSocketChannel.open();
            
            serverChannel.configureBlocking(
    true);
            serverChannel.socket().setSoTimeout(
    0);  // accept method never times out
            serverChannel.socket().setReuseAddress(isReuseAddress); 
            .
            serverChannel.socket().bind(address, backlog);
            dispatcherPool 
    = new IoSocketDispatcherPool("Srv" + getLocalPort(), IoProvider.getServerDispatcherInitialSize());
            .
        }


    (3)启动server
    server.start();
    服务的启动用了一个单独的线程,这里面使用到了CountDownLatch可参见另外一篇关于CountDownLatch用法的文章:
    http://www.blogjava.net/freeman1984/archive/2011/07/04/353654.html
    使用CountDownLatch来控制server的启动时间,操作多少时间为启动就,默认是60秒,这里就不讲CountDownLatch的代码了
    整个启动的方法如下,能看懂CountDownLatch的用法基本上就理解了。

    public static void start(IServer server, int timeoutSec) throws SocketTimeoutException {
      
      
      
    // start server within a dedicated thread 
      Thread t = new Thread(server);
      t.setName(
    "xServer");
      t.start();
    //请看下面的run方法分析
      
     }


    看看他的server线程的run方法:

    public void run() {
     
     acceptor.listen();
    //启动前面创建的acceptor开始监听客户端连接
     
    }



    接着查看listen()方法:

    public void listen() throws IOException {
         callback.onConnected();
    //通知server已经启动
         accept();//接受客户端连接
        }

    }



    查看 accept();很明了,使用一个while循环监听客户端连接,并建立可客户端相关的处理类:

    while (isOpen.get()) {

        
    // blocking accept call
        SocketChannel channel = serverChannel.accept();

        
    // create IoSocketHandler
        
    //创建事件分发器
        IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();
        
    //创建io处理
        IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel, sslContext, sslOn);
        
    // notify call back
        callback.onConnectionAccepted(ioHandler);//很关键的一个地方,会注册SelectionKey.OP_READ,此时客户端发来的消息就可以北服务端获取
    }


    查看callback.onConnectionAccepted(ioHandler);
    此方法会初始化server端的ioHandler,查看初始化的代码:
    dispatcher.register(this, SelectionKey.OP_READ);首先会注册read选择器,
    如果有read事件发生dispatcher就会处理。看看dispatcher的run方法(通过一个循环来不停的处理已经注册的事件)

    while(isOpen.get()) {
        
        
    int eventCount = selector.select(5000); 
       
        handledTasks 
    = performRegisterHandlerTasks();//处理事件
        handledTasks += performKeyUpdateTasks();
        
    if (eventCount > 0{
         handleReadWriteKeys();
    //处理读写,调用我们自己定义的hander来处理onData等事件
        }

        handledTasks 
    += performDeregisterHandlerTasks();
          
      }



      
    (4)客户端
    NonBlockingConnection,例如:
    new NonBlockingConnection("localhost", 8090,new MyHandler() )
    构造方法的主要代码:

     .
     SocketChannel channel 
    = openSocket(localAddress, options);//实际调用:SocketChannel channel = SocketChannel.open();
       
     IoConnector connector 
    = getDefaultConnector();
           
        IIoConnectorCallback callback 
    = new AsyncIoConnectorCallback(remoteAddress, channel, sslContext, isSecured, connectTimeoutMillis);
        connector.connectAsync(channel, remoteAddress, connectTimeoutMillis, callback);
          
    }



    建立连接,生成IoConnector用来管理连接,然后connector开始启动,做一些初始化的工作:

    其中connector.connectAsync(…方法会执行会产生一个RegisterTask任务到IoConnector,这个RegisterTask做的事情如下:
    selectionKey = channel.register(selector, SelectionKey.OP_CONNECT);,也就是注册SelectionKey.OP_CONNECT
    当IoConnector运行会执行这个任务:
    看下他的run方法主要代码:

    while(isOpen.get()) {
               
                    handledTasks 
    = performTaskQueue();//首先运行上一步创建的RegisterTask注册SelectionKey.OP_CONNECT
                    int eventCount = selector.select(1000);//查看SelectionKey.OP_CONNECT事件是否已经准备好
                    if (eventCount > 0{
                        handleConnect();
    //如果准备好就处理连接事件
                    }
     else {
                     checkForLooping(handledTasks);
                    }

             
      }



     handleConnect()的代码:
     

     private void handleConnect() {
            Set
    <SelectionKey> selectedEventKeys = selector.selectedKeys();

            Iterator
    <SelectionKey> it = selectedEventKeys.iterator();
            
    while (it.hasNext()) {
                SelectionKey eventKey 
    = it.next();
                it.remove();

                RegisterTask registerTask 
    = (RegisterTask) eventKey.attachment();

                
    if (eventKey.isValid() && eventKey.isConnectable()) {
                    
                    
    try {
                        
    boolean isConnected = ((SocketChannel) eventKey.channel()).finishConnect();//已经通讯连接
                        if (isConnected) {
                         eventKey.cancel();
                         registerTask.callback.onConnectionEstablished();
    //连接建立好就做下一步工作,注册read事件。
                        }

                    
                }

            }

        }


     接着看registerTask.callback.onConnectionEstablished()
     主要初始化iohander并注册read事件

     private void init(IoChainableHandler ioHandler, IIoHandlerCallback handlerCallback) throws IOException, SocketTimeoutException {
      
    this.ioHandler = ioHandler;
      ioHandler.init(handlerCallback);
    //这个方法里面注册了read
      isConnected.set(true);//这个时候通讯连接才真正建立起来了
     }



    继续看ioHandler.init(handlerCallback)方法:

    public void init(IIoHandlerCallback callbackHandler) throws IOException, SocketTimeoutException {
         
      dispatcher.register(
    this, SelectionKey.OP_READ);//注册SelectionKey.OP_READ时间,可以接受服务端的消息了
    }


    服务端和客户端就可以互相通信了。本文大致讲解了xsocket的代码的流程,其中讲解有误的地方请兄弟们指出,多谢!

    http://www.blogjava.net/freeman1984/archive/2011/10/19/361593.html


  • 相关阅读:
    工作一年感想
    launcher项目踩坑小结(1)
    滕王阁序
    PC端/移动端常见的兼容性问题总结
    Java中逻辑&和短路&&,逻辑|和短路||的区别
    Linux常用指令和系统管理命令总结
    Ajax学习笔记
    js放大镜特效
    《Python for Data Science》笔记之着手于数据
    Python2&3学习中遇到的坑
  • 原文地址:https://www.cnblogs.com/cuker919/p/4878557.html
Copyright © 2011-2022 走看看