zoukankan      html  css  js  c++  java
  • xsocket分析一,启动

    从一个简单的服务器Hander分析Xsocket的启动,首先定义一个简单的EchoHandler继承IDataHandler

    public class EchoHandler implements IDataHandler{
        public boolean onData(INonBlockingConnection nbc) 
                   throws IOException, 
                   BufferUnderflowException, 
                   MaxReadSizeExceededException {
    
             String data = nbc.readStringByDelimiter("
    ");
             nbc.write(data + "
    ");
             return true;
          }
    }

    然后利用这个IDataHandler启动服务器

    public class XsocketTest {
    
        public static void main(String[] args) throws UnknownHostException, IOException {
            // TODO Auto-generated method stub
             IServer srv = new Server(8090, new EchoHandler());
             
            // run it within the current thread. 
            srv.run();  // the call will not return
             
            // ... or start it by using a dedicated thread
            //srv.start(); // returns after the server has been started
        }
    
    }

     这样就实现了一个将客户端发送的字符串回发给客户端的服务器,接口还是很简单的。

    从Server的构造开始看起,Server继承自IServer,类描述是接收新的连接,将INonBlockingConnection的处理代理给 分配给Server的 Handler。Server包含Dispatcher,Dispatcher负责执行IO操作,每个Connection都会被分配给一个Dispatcher。为了处理应用相关的事件如onData、onConnected等,Server的Handler的相应的回调函数会被调用。在Server启动阶段,Handler支持的回调函数会通过反射进行解析,回调函数是通过实现不同的接口如IConnectHandler、IDataHandler等进行标记的。通常一个服务器Handler会实现多个Handler接口。这段描述基本介绍了XSocket的整体结构,主要的概念都讲到了,后面还会具体分析。

    Server重载了多个构造函数,上面那种构造方式调用了

        protected Server(InetSocketAddress address, Map<String, Object> options, IHandler handler, SSLContext sslContext, boolean sslOn, int backlog, int minPoolsize, int maxPoolsize, int taskqueueSize) throws UnknownHostException, IOException {
                    
            defaultWorkerPool = new WorkerPool(minPoolsize, maxPoolsize, taskqueueSize);
            workerpool = defaultWorkerPool;
            
            if (sslContext != null) {
                acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options, sslContext, sslOn);
                
            } else {
                acceptor = ConnectionUtils.getIoProvider().createAcceptor(new LifeCycleHandler(), address, backlog, options);
            }
                
            localHostname = acceptor.getLocalAddress().getHostName();
            localPort = acceptor.getLocalPort();
            
            setHandler(handler);
        }

    主要参数是端口,Ihandler,是否支持ssl,工作线程池参数,除了我们赋予的其他都是默认值。首先创建工作线程池WorkPool,WorkerPool继承自ThreadPoolExecutor,WorkPool构造函数需要一个在任务执行之前存放任务的队列,这里是WorkerPoolAwareQueue extends LinkedBlockingQueue<Runnable>,这个WorkerPoolAwareQueue重写了offer函数加入了日志。

    然后是创建Acceptor。这里创建了一个LifeCycleHandler(重要,后面还会讲),实现了IIoAcceptorCallback接口,接口里定义了onCoonected()等方法在Acceptor绑定到socket或者接收到请求时回调。ConnectionUtils提供了一些集合、ByteBuffer操作的便捷方法,其中的静态变量IoProvider定义了Socket IO的各种参数,createAcceptor函数调用

    IoAcceptor acceptor = new IoAcceptor(callback, address, backlog, isReuseAddress);

    IoAcceptor构造函数如下

        public IoAcceptor(IIoAcceptorCallback callback, InetSocketAddress address, int backlog, SSLContext sslContext, boolean sslOn, boolean isReuseAddress) throws IOException {
            this.callback = callback; //这个callback就是刚刚说的LifeCycleHandler
            this.sslContext = sslContext;
            this.sslOn = sslOn;
    
    
            LOG.fine("try to bind server on " + address);
    
            // create a new server socket
            serverChannel = ServerSocketChannel.open();//构造serverChannel
            assert (serverChannel != null);
            
            serverChannel.configureBlocking(true);
            serverChannel.socket().setSoTimeout(0);  // accept method never times out
            serverChannel.socket().setReuseAddress(isReuseAddress);  //在channel关闭但还存活时是否能够重用端口
            
            
            try {
                serverChannel.socket().bind(address, backlog);
                //构建IoSocketDispatcherPool
                dispatcherPool = new IoSocketDispatcherPool("Srv" + getLocalPort(), IoProvider.getServerDispatcherInitialSize());
                
            } catch (BindException be) {
                serverChannel.close();
                LOG.warning("could not bind server to " + address + ". Reason: " + DataConverter.toString(be));
                throw be;
            }
    
        }

     在构造函数中创建了IoSocketDispatcherPool,同时在构建该Pool时实例化了两个IoSocketDispatcher并运行

    for (int i = currentRunning; i < size; i++) {
        IoUnsynchronizedMemoryManager memoryManager = null;
        if (preallocation) {
    //内存管理,采用预分配内存的方法先不管 memoryManager
    = IoUnsynchronizedMemoryManager.createPreallocatedMemoryManager(preallocationSize, bufferMinsize, useDirect); } else {memoryManager = IoUnsynchronizedMemoryManager.createNonPreallocatedMemoryManager(useDirect); } IoSocketDispatcher dispatcher = new IoSocketDispatcher(memoryManager, name + "#" + i); dispatchers.addLast(dispatcher); Thread t = new Thread(dispatcher); t.setDaemon(true); t.start(); for (IIoDispatcherPoolListener listener : listeners) { listener.onDispatcherAdded(dispatcher); } }

     IoSocketDispatcher构造函数调用了selector = Selector.open(); run方法如下

        public void run() {
    
            // set thread name and attach dispatcher id to thread
            Thread.currentThread().setName(name);
            THREADBOUND_ID.set(id);
            
            DIRECT_CALL_COUNTER.set(0);
    
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("selector " + name + " listening ...");
            }
    
            int handledTasks = 0;
    
            while(isOpen.get()) {
                try {
                    int eventCount = selector.select(5000); 
                
                    handledTasks = performRegisterHandlerTasks(); //处理注册任务,重要
                    handledTasks += performKeyUpdateTasks(); //处理key更新任务,重要
    
                    if (eventCount > 0) {
                        handleReadWriteKeys(); //处理读写事件,重要
                    }
    
                    handledTasks += performDeregisterHandlerTasks();
                
                    checkForLooping(eventCount + handledTasks, lastTimeWokeUp);
                    
                } catch (Throwable e) {
                    // eat and log exception
                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine("[" + Thread.currentThread().getName() + "] exception occured while processing. Reason " + DataConverter.toString(e));
                    }
                }
            }
    
    
            for (IoSocketHandler socketHandler : getRegistered()) {
                socketHandler.onDeregisteredEvent();
            }
    
            try {
                selector.close();
            } catch (Exception e) {
                // eat and log exception
                if (LOG.isLoggable(Level.FINE)) {
                    LOG.fine("error occured by close selector within tearDown " + DataConverter.toString(e));
                }
            }
        }

    再回到最初的main方法,其中调用了srv.run(); run函数最后调用了Acceptor的accept函数

        private void accept() {
    
            while (isOpen.get()) {
                try {
    
                    // blocking accept call
                    SocketChannel channel = serverChannel.accept();
    
                    // create IoSocketHandler
                    IoSocketDispatcher dispatcher = dispatcherPool.nextDispatcher();
                    IoChainableHandler ioHandler = ConnectionUtils.getIoProvider().createIoHandler(false, dispatcher, channel, sslContext, sslOn);
    
                    // notify call back
                    callback.onConnectionAccepted(ioHandler);
                    acceptedConnections++;
    
                } catch (Exception e) {
                    // if acceptor is running (<socket>.close() causes that any
                    // thread currently blocked in accept() will throw a SocketException)
                    if (serverChannel.isOpen()) {
                        LOG.warning("error occured while accepting connection: " + DataConverter.toString(e));
                    }
                }
            }
        }

     重要就完成server的启动,主要启动了一个Acceptor线程和两个Dispatcher线程。

  • 相关阅读:
    北京燃气IC卡充值笔记
    随机分析、随机控制等科目在量化投资、计算金融方向有哪些应用?
    量化交易平台大全
    Doctor of Philosophy in Computational and Mathematical Engineering
    Institute for Computational and Mathematical Engineering
    Requirements for the Master of Science in Computational and Mathematical Engineering
    MSc in Mathematical and Computational Finance
    万字长文:详解多智能体强化学习的基础和应用
    数据处理思想和程序架构: 使用Mbedtls包中的SSL,和服务器进行网络加密通信
    31-STM32+W5500+AIR202/302基本控制篇-功能优化-W5500移植mbedtls库以SSL方式连接MQTT服务器(单向忽略认证)
  • 原文地址:https://www.cnblogs.com/coderway/p/4220108.html
Copyright © 2011-2022 走看看