zoukankan      html  css  js  c++  java
  • Yarn系列(三)——AsyncDispatcher分析

    1. 前言

    基于Hadoop-3.2 。

    1.1. 简介

    AsyncDispatcher既是服务也是事件调度器(Eventhandler),如下AsyncDispatcher的定义:

    public class AsyncDispatcher extends AbstractService implements Dispatcher {...}
    

    作为服务其具备serviceStart()、serviceStop()等能力;作为事件调度器,其可以根据事件获取对应的事件调度器和其他事件调度器可以向AsyncDispatcher注册的功能。

    1.2. 重要的数据结构

    AsyncDispatcher有两个重要的属性,理解了两个属性作用,就基本上可以明白其工作原理。

    private final BlockingQueue<Event> eventQueue;
    ......
    protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
    

    eventQueue是阻塞对列,用于存放事件;eventDispatchers的key是事件类型,value放对应的事件处理器。所以整体流程大致是:事件被放入到阻塞对列中,事件和对应的事件handler注册到AsyncDispatcher,工作时就是从阻塞对列中取出事件,然后从eventDispatchers取得对应的事件处理器,然后执行handle()方法。

    2. 源码分析

    2.1. AsyncDispatcher服务启动

    当服务启动时,会创建一个线程不断的从阻塞对列中取出事件,然后调用该事件对应的处理方法。

    @Override
    protected void serviceStart() throws Exception {
      //start all the components
      super.serviceStart();
      //创建异步线程用于调度事件
      eventHandlingThread = new Thread(createThread());
      eventHandlingThread.setName(dispatcherThreadName);
      eventHandlingThread.start();
    }
    

    创建的线程就是不断从阻塞对列中取出事件然后执行,如下:

    Runnable createThread() {
      return new Runnable() {
        @Override
        public void run() {
          //服务没有被停止且线程没有被中断
          while (!stopped && !Thread.currentThread().isInterrupted()) {
            drained = eventQueue.isEmpty();
            // blockNewEvents is only set when dispatcher is draining to stop,
            // adding this check is to avoid the overhead of acquiring the lock
            // and calling notify every time in the normal run of the loop.
            //服务停止时将会阻止新事件放入阻塞对列中
            if (blockNewEvents) {
              synchronized (waitForDrained) {
                if (drained) {
                  waitForDrained.notify();
                }
              }
            }
            Event event;
            try {
              //从对列中获取事件
              event = eventQueue.take();
            } catch(InterruptedException ie) {
              if (!stopped) {
                LOG.warn("AsyncDispatcher thread interrupted", ie);
              }
              return;
            }
            if (event != null) {
              //不为空则调度
              dispatch(event);
            }
          }
        }
      };
    }
    

    其调度的实现逻辑就是调用每个事件的Handle方法,如下:

     protected void dispatch(Event event) {
     .......
    
      Class<? extends Enum> type = event.getType().getDeclaringClass();
    
      try{
        //获取对应的事件调度处理器
        EventHandler handler = eventDispatchers.get(type);
        if(handler != null) {
          //执行事件调度handle的handle方法
          handler.handle(event);
        } else {
          throw new Exception("No handler for registered for " + type);
        }
      } catch (Throwable t) {
        .....
      }
    

    2.2. 例子

    RMAppImpl是EventHandle,其handle方法主要是根据接收的事件类型去改变RMAppImpl的状态,同时触发一种行为,其具体过程可以参考Yarn系列(二)

    public void handle(RMAppEvent event) {
    
      this.writeLock.lock();
    
      try {
        ApplicationId appID = event.getApplicationId();
        LOG.debug("Processing event for " + appID + " of type "
            + event.getType());
        final RMAppState oldState = getState();
        try {
          /* keep the master in sync with the state machine */
          //具体转变可以参考Yarn系列(二)
          this.stateMachine.doTransition(event.getType(), event);
        } catch (InvalidStateTransitionException e) {
          LOG.error("App: " + appID
              + " can't handle this event at current state", e);
          onInvalidStateTransition(event.getType(), oldState);
        }
    
       ......
      } finally {
        this.writeLock.unlock();
      }
    }
    
  • 相关阅读:
    泛微云桥e-Bridge 目录遍历,任意文件读取
    (CVE-2020-8209)XenMobile-控制台存在任意文件读取漏洞
    selenium 使用初
    将HTML文件转换为MD文件
    Python对word文档进行操作
    使用java安装jar包出错,提示不是有效的JDK java主目录
    Windows server 2012安装VM tools异常解决办法
    ifconfig 命令,改变主机名,改DNS hosts、关闭selinux firewalld netfilter 、防火墙iptables规则
    iostat iotop 查看硬盘的读写、 free 查看内存的命令 、netstat 命令查看网络、tcpdump 命令
    使用w uptime vmstat top sar nload 等命令查看系统负载
  • 原文地址:https://www.cnblogs.com/love-yh/p/13929319.html
Copyright © 2011-2022 走看看