zoukankan      html  css  js  c++  java
  • Java IO:SocketChannel和Selector在ZooKeeper中应用

    转载请注明出处:jiq•钦's technical Blog 

    假设不了解SocketChannel和Selector。请先阅读我的还有一篇博文:点击打开链接

     

    ZooKeeper的启动从QuorumPeerMain类的main函数開始:

    调用顺序是: Main -> initializeAndRun-> runFromConfig

     

    一、默认的NIOServerCnxnFactory通信方式

    当中runFromConfig主要做了两件事情:

    (1)初始化client与服务端的网络通信处理类ServerCnxnFactory:

    ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();

     

    而createFactory函数的内部实现是:

    if (serverCnxnFactoryName == null) {
     serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();
    }
     
    try {
     return(ServerCnxnFactory) Class.forName(serverCnxnFactoryName).newInstance();
    } catch (Exception e) {
                IOException ioe = new IOException("Couldn't instantiate "
                        + serverCnxnFactoryName);
                ioe.initCause(e);
                throw ioe;
    }
    能够看到默认初始化的ServerCnxnFactory是NIOServerCnxnFactory,这是Java NIO方式的网络通信,另外还有NettyServerCnxnFactory类提供Netty通信方式。

     

    (2)启动QuorumPeer:

    quorumPeer.start(); 

    内部会调用初始化好的ServerCnxnFactory类的start函数:

    cnxnFactory.start();

     

    二、NIOServerCnxnFactory.start()方法

    @Override
       publicvoidstart() {
            stopped = false;
            if (workerPool == null){
                workerPool = new WorkerService(
                    "NIOWorker", numWorkerThreads, false);
            }
           
            //先启动一堆selector线程
            for(SelectorThread thread : selectorThreads) {
                if (thread.getState() == Thread.State.NEW) {
                    thread.start();
                }
            }
           
            //再启动accept线程
            if (acceptThread.getState() == Thread.State.NEW) {
                acceptThread.start();
            }
           
            //最后启动expire线程
            if (expirerThread.getState() ==Thread.State.NEW){
                expirerThread.start();
            }
        }

    三、NIOServerCnxnFactory中几个线程类的关系

    NIOServerCnxnFactory中包括了三个类:

    (1)AbstractSelectThread:SelectorThread类和AcceptThread类的共同父类,维护了一个selector选择器对象

    (2)AcceptThread:管理新的ZooKeeper client连接请求,实际上就是利用父类的选择器selector监听ServerSocketChannel的“SelectionKey.OP_ACCEPT”事件,一旦来新的请求,负责建立好和client的连接SocketChannel,并从SelectorThread线程池分配一个线程。将该SocketChannel连接放入SelectorThread线程维护的acceptedQueue队列中。

    (3)SelectorThread:监听AcceptThread分配的已经建立的SocketChannel连接上发生的数据读写事件,并运行实际数据读写。

    实际上就是利用父类的选择器selector监听在acceptedQueue队列中建立好的连接的数据读写事件。一旦读写事件发生,调用handleIO函数处理读写请求。

     

    以下是读写事件监听线程的线程池selectorThreads的初始化:

    for(inti=0; i<numSelectorThreads;++i) {
                selectorThreads.add(newSelectorThread(i));
           }

    以下是新连接管理线程acceptThread的初始化:

    this.ss =ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port " + addr);
    ss.socket().bind(addr);
    ss.configureBlocking(false);
    acceptThread= newAcceptThread(ss, addr, selectorThreads);

    然后NIOServerCnxnFactory.start()将会启动这些线程。

    AcceptThread的run函数非常easy。就是监听连接SelectionKey.op_accept事件,然后建立SocketChannel连接。并分配一个SelectorThread线程来处理该连接,详细就不详述了。

     

    SelectorThread的run函数核心就是监听到SelectionKey.OP_READ事件后运行handleIO函数,详细怎样和client进行数据交互计划放在其它文章中介绍。

     


  • 相关阅读:
    Entity Framework 批量操作
    Tsak多线程学习记录
    .net webservice 动态更换地址
    .NET EF Core2.0 (DBFirst)使用与配置
    MVC发布IIS后提示未配置默认文档
    未能加载文件或程序集“Microsoft.Web.Infrastructure, Version=1.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35”或它的某一个依赖项。系统找不到指定的文件。
    Redis教程
    使用git将项目上传到github
    Redis集群(一)搭建Cluster模式[超简单]
    Redis 常见5大数据类型结构,附录3个数据类型
  • 原文地址:https://www.cnblogs.com/blfbuaa/p/6962293.html
Copyright © 2011-2022 走看看