zoukankan      html  css  js  c++  java
  • Yarn系列(三)——AsyncDispatcher分析

    1. 前言

    基于Hadoop-3.2 。

    1.1. 简介

    AsyncDispatcher既是服务也是事件调度器(Eventhandler),如下AsyncDispatcher的定义:

    public class AsyncDispatcher extends AbstractService implements Dispatcher {...}
    

    作为服务其具备serviceStart()、serviceStop()等能力;作为事件调度器,其可以根据事件获取对应的事件调度器和其他事件调度器可以向AsyncDispatcher注册的功能。

    1.2. 重要的数据结构

    AsyncDispatcher有两个重要的属性,理解了两个属性作用,就基本上可以明白其工作原理。

    private final BlockingQueue<Event> eventQueue;
    ......
    protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
    

    eventQueue是阻塞对列,用于存放事件;eventDispatchers的key是事件类型,value放对应的事件处理器。所以整体流程大致是:事件被放入到阻塞对列中,事件和对应的事件handler注册到AsyncDispatcher,工作时就是从阻塞对列中取出事件,然后从eventDispatchers取得对应的事件处理器,然后执行handle()方法。

    2. 源码分析

    2.1. AsyncDispatcher服务启动

    当服务启动时,会创建一个线程不断的从阻塞对列中取出事件,然后调用该事件对应的处理方法。

    @Override
    protected void serviceStart() throws Exception {
      //start all the components
      super.serviceStart();
      //创建异步线程用于调度事件
      eventHandlingThread = new Thread(createThread());
      eventHandlingThread.setName(dispatcherThreadName);
      eventHandlingThread.start();
    }
    

    创建的线程就是不断从阻塞对列中取出事件然后执行,如下:

    Runnable createThread() {
      return new Runnable() {
        @Override
        public void run() {
          //服务没有被停止且线程没有被中断
          while (!stopped && !Thread.currentThread().isInterrupted()) {
            drained = eventQueue.isEmpty();
            // blockNewEvents is only set when dispatcher is draining to stop,
            // adding this check is to avoid the overhead of acquiring the lock
            // and calling notify every time in the normal run of the loop.
            //服务停止时将会阻止新事件放入阻塞对列中
            if (blockNewEvents) {
              synchronized (waitForDrained) {
                if (drained) {
                  waitForDrained.notify();
                }
              }
            }
            Event event;
            try {
              //从对列中获取事件
              event = eventQueue.take();
            } catch(InterruptedException ie) {
              if (!stopped) {
                LOG.warn("AsyncDispatcher thread interrupted", ie);
              }
              return;
            }
            if (event != null) {
              //不为空则调度
              dispatch(event);
            }
          }
        }
      };
    }
    

    其调度的实现逻辑就是调用每个事件的Handle方法,如下:

     protected void dispatch(Event event) {
     .......
    
      Class<? extends Enum> type = event.getType().getDeclaringClass();
    
      try{
        //获取对应的事件调度处理器
        EventHandler handler = eventDispatchers.get(type);
        if(handler != null) {
          //执行事件调度handle的handle方法
          handler.handle(event);
        } else {
          throw new Exception("No handler for registered for " + type);
        }
      } catch (Throwable t) {
        .....
      }
    

    2.2. 例子

    RMAppImpl是EventHandle,其handle方法主要是根据接收的事件类型去改变RMAppImpl的状态,同时触发一种行为,其具体过程可以参考Yarn系列(二)

    public void handle(RMAppEvent event) {
    
      this.writeLock.lock();
    
      try {
        ApplicationId appID = event.getApplicationId();
        LOG.debug("Processing event for " + appID + " of type "
            + event.getType());
        final RMAppState oldState = getState();
        try {
          /* keep the master in sync with the state machine */
          //具体转变可以参考Yarn系列(二)
          this.stateMachine.doTransition(event.getType(), event);
        } catch (InvalidStateTransitionException e) {
          LOG.error("App: " + appID
              + " can't handle this event at current state", e);
          onInvalidStateTransition(event.getType(), oldState);
        }
    
       ......
      } finally {
        this.writeLock.unlock();
      }
    }
    
  • 相关阅读:
    CCNA 第二章 以太网回顾
    CCNA 第一章 网络互联
    solidworks中 toolbox调用出现未配置的解决方法
    linux之df命令
    linux之du命令
    linux之pid文件
    linux之mysql启动问题
    linux之使用cron,logrotate管理日志文件
    wordpress(一)wordpress环境的搭建
    phpwind8.7升级9.0.1过程(四)20130207升级到20141228
  • 原文地址:https://www.cnblogs.com/love-yh/p/13929319.html
Copyright © 2011-2022 走看看