zoukankan      html  css  js  c++  java
  • [hadoop源码阅读][6]org.apache.hadoop.ipcipc.server

    1.      nioreactor模式

    1334842640_7902 

    具体的处理方式:

    ·     1.一个线程来处理所有连接(使用一个Selector

    ·     2.一组线程来读取已经建立连接的数据(多个Selector,这里的线程数一般和cpu的核数相当);

    ·     3.一个线程池(这个线程池大小可以根据业务需求进行设置)

    ·     4.一个线程处理所有的连接的数据的写操作(一个Selector

    2.      简明流程图

    e23d9744-85f4-3c22-aafb-33d09a36ab34

     

    3.      RPC Server主要流程

    RPC Server作为服务提供者由两个部分组成:接收Call调用和处理Call调用。

     hadoop_rpc

    接收Call调用负责接收来自RPC Client的调用请求,编码成Call对象后放入到Call队列中。这一过程由Listener线程完成。具体步骤:

    l      Listener线程监视RPC Client发送过来的数据。

    l      当有数据可以接收时,调用ConnectionreadAndProcess方法。

    l      Connection边接收边对数据进行处理,如果接收到一个完整的Call包,则构建一个Call对象PUSHCall队列中,由Handler线程来处理Call队列中的所有Call

     

    处理Call调用负责处理Call队列中的每个调用请求,由Handler线程完成:

    l      Handler线程监听Call队列,如果Call队列非空,按FIFO规则从Call队列取出Call

    l      Call交给RPC.Server处理。

    l      借助JDK提供的Method,完成对目标方法的调用,目标方法由具体的业务逻辑实现。

    l      返回响应。Server.Handler按照异步非阻塞的方式向RPC Client发送响应,如果有未发送出的数据,则交由Server.Responder来完成。

     

    4. server类的结构

    0)抽象类

    这里的Server类是个抽象类,唯一抽象的地方,就是

    public abstract Writable call(Writable param, long receiveTime) throws IOException;

    RPC.server来实现

    1Call

    用以存储客户端发来的请求,这个请求会放入一个BlockQueue中;

    2Listener

    监听类,用以监听客户端发来的请求。同时Listener下面还有一个静态类,Listener.Reader,当监听器监听到用户请求,便用让Reader读取用户请求。

    Listener主要负责Socket的监听以及Connection的建立,同时监控ClientSocket的数据可读事件,通知Connection进行processData,收到完成请求包以后,封装为一个Call对象(包含Connection对象,从网络流中读取的参数信息,调用方法信息),将其放入队列

    3Responder

    响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。

    它不断地检查响应队列中是否有调用信息,如果有的话,就把调用的结果返回给客户端

    4Connection

    连接类,真正的客户端请求读取逻辑在这个类中。

    Connection,代表与Client端的连接,读取客户端的call并放到一个阻塞队列中,Handler负责从这个队列中读取数据并处理

    5Handler

    请求(blockQueueCall)处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。

    真正做事的实体。它从调用队列中获取调用信息,然后反射调用真正的对象,得到结果,然后再把此次调用放到响应队列(response queue) 

     

    5.      Server的启动,运行

    5.1.Namenodegetserver实例使用

    private void initialize(Configuration conf) throws IOException { this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); serviceRpcServer.start(); }

     

    5.2.Start服务启动

    public synchronized void start() { responder.start(); listener.start(); handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); } }

    responderlistenerhandlers三个对象的线程均阻塞了,前两个阻塞在selector.select()方法上,handler阻塞在callQueue.take()方法,都在等待客户端请求。Responder设置了超时时间,为15分钟。而listener还开启了Reader线程,该线程也阻塞了。

     

    5.3. Listener线程做的工作

    Listener监听到请求,获得所有请求的SelectionKey,执行doAccept(key)方法,该方法将所有的连接对象放入list中,并将connection对象与key绑定,以供reader使用。初始化玩所有的conne对象之后,就可以激活Reader线程了.

    Readerrun方法和Listener基本一致,也是获得所有的SelectionKey,再执行doRead(key)方法。该方法获得key中绑定的connection,并执行conectionreadAndProcess()方法

    简明调用函数过程:

    Listener:: run-> Listener:: doAccept( 激活Reader线程)->>Reader:: doRead->>connection:: readAndProcess->> connection::processOneRpc->>connection:: processData

     
    private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // 尝试读取id Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数 param.readFields(dis);//这个就是client传递过来的Invocation,包含了函数名和参数 Call call = new Call(id, param, this); //封装成call callQueue.put(call); // 将call存入callQueue incRpcCount(); // 增加rpc请求的计数 }

     

    5.4. Handler线程做的工作

    Handler线程的run函数

    while (running) { try { final Call call = callQueue.take(); //弹出call,可能会阻塞 //调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中 value = call(call.connection.protocol, call.param, call.timestamp); synchronized (call.connection.responseQueue) { setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error); //给客户端响应请求 responder.doRespond(call); } } }

    关于call函数的调用,稍后分析

     

    5.5.Responder线程做的工作

    void doRespond(Call call) throws IOException { synchronized (call.connection.responseQueue) { call.connection.responseQueue.addLast(call);//放到队列里面去 if (call.connection.responseQueue.size() == 1) { processResponse(call.connection.responseQueue, true); } } }

    简明调用结构为:

    Responder::run->>doAsyncWrite->>processResponse

     

    5.6.最后来看server.call函数是怎么执行的

    public Writable call(Class<?> protocol, Writable param, long receivedTime) throws IOException { try { Invocation call = (Invocation) param; Method method = protocol.getMethod(call.getMethodName(), call.getParameterClasses());//获取client端调用的函数 Object value = method.invoke(instance, call.getParameters());//instance即启动服务的对象,也即实现protocol的对象 return new ObjectWritable(method.getReturnType(), value);//将结果序列化 } catch (InvocationTargetException e) { } catch (Throwable e) { } }

    6.      用户可以做的操作

    1. Reader数量

          正常情况下,一个客户端关联一个Reader,如果有很多客户端(clientDataNode),那么就可以相应增加这个配置

          参数:ipc.server.read.threadpool.size,默认是1,需要注意的是,这个配置参数是0.21版本的,不同版本的参数可能不一样

    2. Handler数量

          对于这种做事的线程,不好把握度,到底多少才是合适。

          参数:dfs.namenode.handler.count, 这里是以NameNode举例

    3. 客户端重试次数

          客户端在调用时发生异常,重试是无可厚非。但如果对实时性有要求,那么这里的重试就有考量。Fackbook在做的Realtime分析就有提到RPC的重试是需要修改的

          参数:ipc.client.connect.max.retries,默认是10

    4. tcp no delay

          不建议对它有什么设置。如果我们对整个调用的过程中数据量大小及网络环境不清楚的话,就是设置了也不知道它是否有作用。

          参数:ipc.client.tcpnodelay,默认是false

    7. 时序图

    4

     

    8. 类图

    09a22207-eec0-321c-a990-d8aa248c1609

     

    9.参考

    http://blog.csdn.net/sxf_824/article/details/4842153

    http://www.wikieno.com/2012/02/hadoop-ipc-server/

    http://caibinbupt.iteye.com/blog/281281

    http://www.tbdata.org/archives/1413

    http://blog.csdn.net/shirdrn/article/details/4598295

    http://lidejiasw.wordpress.com/2011/05/07/hadoop-rpc%E5%88%86%E6%9E%90/

  • 相关阅读:
    iOS——归档对象的创建,数据写入与读取
    iOS——plist的创建,数据写入与读取
    SQL SERVER 2005快捷键
    图片放大源码
    验证url 地址是否是图片
    JS三大经典变量命名法
    载入锁频
    SQL Server 查询分析器键盘快捷方式
    关于ajax get方式请求 url地址参数怎么变成空了的问题
    SQL计算表的列数
  • 原文地址:https://www.cnblogs.com/xuxm2007/p/2558599.html
Copyright © 2011-2022 走看看