zoukankan      html  css  js  c++  java
  • xsocket分析一,启动

    从一个简单的服务器Hander分析Xsocket的启动,首先定义一个简单的EchoHandler继承IDataHandler

    public class EchoHandler implements IDataHandler{
        public boolean onData(INonBlockingConnection nbc) 
                   throws IOException, 
                   BufferUnderflowException, 
                   MaxReadSizeExceededException {
    
             String data = nbc.readStringByDelimiter("
    ");
             nbc.write(data + "
    ");
             return true;
          }
    }

    然后利用这个IDataHandler启动服务器

    public class XsocketTest {
    
        public static void main(String[] args) throws UnknownHostException, IOException {
            // TODO Auto-generated method stub
             IServer srv = new Server(8090, new EchoHandler());
             
            // run it within the current thread. 
            srv.run();  // the call will not return
             
            // ... or start it by using a dedicated thread
            //srv.start(); // returns after the server has been started
        }
    
    }

     这样就实现了一个将客户端发送的字符串回发给客户端的服务器,接口还是很简单的。

    从Server的构造开始看起,Server继承自IServer,类描述是接收新的连接,将INonBlockingConnection的处理代理给 分配给Server的 Handler。Server包含Dispatcher,Dispatcher负责执行IO操作,每个Connection都会被分配给一个Dispatcher。为了处理应用相关的事件如onData、onConnected等,Server的Handler的相应的回调函数会被调用。在Server启动阶段,Handler支持的回调函数会通过反射进行解析,回调函数是通过实现不同的接口如IConnectHandler、IDataHandler等进行标记的。通常一个服务器Handler会实现多个Handler接口。这段描述基本介绍了XSocket的整体结构,主要的概念都讲到了,后面还会具体分析。

    Server重载了多个构造函数,上面那种构造方式调用了

        protected Server(InetSocketAddress address, Map<String, Object> options, IHandler handler, SSLContext sslContext, boolean sslOn, int backlog, int minPoolsize, int maxPoolsize, int taskqueueSize) throws UnknownHostException, IOException {
                    
            defaultWorkerPool = new WorkerPool(minPoolsize, maxPoolsize, taskqueueSize);
            workerpool = defaultWorkerPool;
            
            if (sslContext != null) {
                acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options, sslContext, sslOn);
                
            } else {
                acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options);
            }
                
            localHostname = acceptor.getLocalAddress().getHostName();
            localPort = acceptor.getLocalPort();
            
            setHandler(handler);
        }

    主要参数是端口,Ihandler,是否支持ssl,工作线程池参数,除了我们赋予的其他都是默认值。首先创建工作线程池WorkPool,WorkerPool继承自ThreadPoolExecutor,WorkPool构造函数需要一个在任务执行之前存放任务的队列,这里是WorkerPoolAwareQueue extends LinkedBlockingQueue<Runnable>,这个WorkerPoolAwareQueue重写了offer函数加入了日志。

    然后是创建Acceptor。这里创建了一个LifeCycleHandler(重要,后面还会讲),实现了IIoAcceptorCallback接口,接口里定义了onCoonected()等方法在Acceptor绑定到socket或者接收到请求时回调。ConnectionUtils提供了一些集合、ByteBuffer操作的便捷方法,其中的静态变量IoProvider定义了Socket IO的各种参数,createAcceptor函数调用

    IoAcceptor acceptor = new IoAcceptor(callback, address, backlog, isReuseAddress);

    IoAcceptor构造函数如下

        public IoAcceptor(IIoAcceptorCallback callback, InetSocketAddress address, int backlog, SSLContext sslContext, boolean sslOn, boolean isReuseAddress) throws IOException {
            this.callback = callback; //这个callback就是刚刚说的LifeCycleHandler
            this.sslContext = sslContext;
            this.sslOn = sslOn;
    
    
            LOG.fine("try to bind server on " + address);
    
            // create a new server socket
            serverChannel = ServerSocketChannel.open();//构造serverChannel
            assert (serverChannel != null);
            
            serverChannel.configureBlocking(true);
            serverChannel.socket().setSoTimeout(0);  // accept method never times out
            serverChannel.socket().setReuseAddress(isReuseAddress);  //在channel关闭但还存活时是否能够重用端口
            
            
            try {
                serverChannel.socket().bind(address, backlog);
                //构建IoSocketDispatcherPool
                dispatcherPool = new IoSocketDispatcherPool("Srv" + getLocalPort(), IoProvider.getServerDispatcherInitialSize());
                
            } catch (BindException be) {
                serverChannel.close();
                LOG.warning("could not bind server to " + address + ". Reason: " + DataConverter.toString(be));
                throw be;
            }
    
        }

     在构造函数中创建了IoSocketDispatcherPool,同时在构建该Pool时实例化了两个IoSocketDispatcher并运行

    for (int i = currentRunning; i < size; i++) {
        IoUnsynchronizedMemoryManager memoryManager = null;
        if (preallocation) {
    //内存管理,采用预分配内存的方法先不管 memoryManager
    = IoUnsynchronizedMemoryManager.createPreallocatedMemoryManager(preallocationSize, bufferMinsize, useDirect); } else {memoryManager = IoUnsynchronizedMemoryManager.createNonPreallocatedMemoryManager(useDirect); } IoSocketDispatcher dispatcher = new IoSocketDispatcher(memoryManager, name + "#" + i); dispatchers.addLast(dispatcher); Thread t = new Thread(dispatcher); t.setDaemon(true); t.start(); for (IIoDispatcherPoolListener listener : listeners) { listener.onDispatcherAdded(dispatcher); } }

     IoSocketDispatcher构造函数调用了selector = Selector.open(); run方法如下

        public void run() {
    
            // set thread name and attach dispatcher id to thread
            Thread.currentThread().setName(name);
            THREADBOUND_ID.set(id);
            
            DIRECT_CALL_COUNTER.set(0);
    
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("selector " + name + " listening ...");
            }
    
            int handledTasks = 0;
    
            while(isOpen.get()) {
                try {
                    int eventCount = selector.select(5000); 
                
                    handledTasks = performRegisterHandlerTasks(); //处理注册任务,重要
                    handledTasks += performKeyUpdateTasks(); //处理key更新任务,重要
    
                    if (eventCount > 0) {
                        handleReadWriteKeys(); //处理读写事件,重要
                    }
    
                    handledTasks += performDeregisterHandlerTasks();
                
                    checkForLooping(eventCount + handledTasks, lastTimeWokeUp);
                    
                } catch (Throwable e) {
                    // eat and log exception
                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine("[" + Thread.currentThread().getName() + "] exception occured while processing. Reason " + DataConverter.toString(e));
                    }
                }
            }
    
    
            for (IoSocketHandler socketHandler : getRegistered()) {
                socketHandler.onDeregisteredEvent();
            }
    
            try {
                selector.close();
            } catch (Exception e) {
                // eat and log exception
                if (LOG.isLoggable(Level.FINE)) {
                    LOG.fine("error occured by close selector within tearDown " + DataConverter.toString(e));
                }
            }
        }

    再回到最初的main方法,其中调用了srv.run(); run函数最后调用了Acceptor的accept函数

        private void accept() {
    
            while (isOpen.get()) {
                try {
    
                    // blocking accept call
                    SocketChannel channel = serverChannel.accept();
    
                    // create IoSocketHandler
                    IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();
                    IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel, sslContext, sslOn);
    
                    // notify call back
                    callback.onConnectionAccepted(ioHandler);
                    acceptedConnections++;
    
                } catch (Exception e) {
                    // if acceptor is running (<socket>.close() causes that any
                    // thread currently blocked in accept() will throw a SocketException)
                    if (serverChannel.isOpen()) {
                        LOG.warning("error occured while accepting connection: " + DataConverter.toString(e));
                    }
                }
            }
        }

     重要就完成server的启动,主要启动了一个Acceptor线程和两个Dispatcher线程。

  • 相关阅读:
    mfc cef<转>
    js人形时钟
    opencv给图片添加文字水印<转>
    strcore.cpp(156) 内存泄漏
    WebAssembly相关
    mingw 搭建Emscripten 环境
    mingw 设置python 设置git环境变量
    android平台yuv缩放相关<转>
    多媒体基础知识之PCM数据《 转》
    iOS 5 故事板进阶(2)
  • 原文地址:https://www.cnblogs.com/coderway/p/4220108.html
Copyright © 2011-2022 走看看