zoukankan      html  css  js  c++  java
  • Hadoop Yarn事件处理框架源码分析

    由于想在项目中使用类似yarn的事件处理机制,就看了实现。主要是由Dispatcher.java,EventHandler.java,Service.java这3个类撑起来的。

     在事件处理之前,先注册相应的事件处理handler,收到事件event后,由派发事件的Dispatcher进行派发,默认采用异步事件处理方式将事件放到事件队列(LinkedBlockingQueue)中,消费者会循环从队列中取出事件进行处理。

    要使用事件处理,首先需要创建Dispatcher,示例代码如下:

     dispatcher = new AsyncDispatcher();//创建
      addIfService(dispatcher);// 由于继承AbstractService,可以方便完成服务统一管理,比如初始化和资源释放等操作
      dispatcher.register(EventType.class,new EventHandler());//注册对应的事件处理方法
    

    然后通过AsyncDispatcher调用getEventHandler()返回的EventHandler的处理对应事件,AsyncDispatcher类的getEventHandler()方法如下:

    @Override
      public EventHandler getEventHandler() {
        if (handlerInstance == null) {
          handlerInstance = new GenericEventHandler();//如果没有注册生产事件处理,就走通用事件处理
        }
        return handlerInstance;
      }
    class GenericEventHandler implements EventHandler<Event> {
        public void handle(Event event) {
          if (blockNewEvents) {
            return;
          }
    
    
          /* all this method does is enqueue all the events onto the queue */
          int qSize = eventQueue.size();
          if (qSize !=0 && qSize %1000 == 0) {
            LOG.info("Size of event-queue is " + qSize);
          }
          int remCapacity = eventQueue.remainingCapacity();
          if (remCapacity < 1000) {
            LOG.warn("Very low remaining capacity in the event-queue: "
                + remCapacity);
          }
          try {
            eventQueue.put(event);//放进队列
            drained = false;
          } catch (InterruptedException e) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", e);
            }
            throw new RuntimeException(e);
          }
        };
      }
    

    上述完成生产,再看消费如下实现的:

    @Override
    protected void serviceStart() throws Exception {
      //start all the components
      super.serviceStart();
      eventHandlingThread = new Thread(createThread()); // 调用创建消费eventQueue队列中事件的线程
      eventHandlingThread.setName("AsyncDispatcher event handler");
      eventHandlingThread.start();
    }
    

    查看createThread()方法,如下所示:

    Runnable createThread() {
        return new Runnable() {
          @Override
          public void run() {
            while (!stopped && !Thread.currentThread().isInterrupted()) {
              drained = eventQueue.isEmpty();
              // blockNewEvents is only set when dispatcher is draining to stop,
              // adding this check is to avoid the overhead of acquiring the lock
              // and calling notify every time in the normal run of the loop.
              if (blockNewEvents) {
                synchronized (waitForDrained) {
                  if (drained) {
                    waitForDrained.notify();
                  }
                }
              }
              Event event;
              try {
                event = eventQueue.take();
              } catch(InterruptedException ie) {
                if (!stopped) {
                  LOG.warn("AsyncDispatcher thread interrupted", ie);
                }
                return;
              }
              if (event != null) {
                dispatch(event);//分发事件
              }
            }
          }
        };
      }
    

    从eventQueue队列中取出Event,然后调用dispatch(event);来处理事件,看dispatch(event)方法,如下所示:

    protected void dispatch(Event event) {
      //all events go thru this loop
      if (LOG.isDebugEnabled()) {
        LOG.debug("Dispatching the event " + event.getClass().getName() + "."
            + event.toString());
      }
    
      Class<? extends Enum> type = event.getType().getDeclaringClass();
    
      try{
        EventHandler handler = eventDispatchers.get(type); //通过event获取事件类型,根据事件类型得到注册的EventHandler
        if(handler != null) {
          handler.handle(event); //EventHandler处理事件event
        } else {
          throw new Exception("No handler for registered for " + type);
        }
      } catch (Throwable t) {
        //TODO Maybe log the state of the queue
        LOG.fatal("Error in dispatcher thread", t);
        // If serviceStop is called, we should exit this thread gracefully.
        if (exitOnDispatchException
            && (ShutdownHookManager.get().isShutdownInProgress()) == false
            && stopped == false) {
          LOG.info("Exiting, bbye..");
          System.exit(-1);
        }
      }
    }
    整个过程使用生产--消费者模型,异步事件处理,整体实现起来还是很简单的!


  • 相关阅读:
    建模时选择SVM还是LR?
    decision_function详解
    HIVE SQL与SQL的区别
    数据不平衡如何处理
    Spring Boot 两步集成 日志收集ELK与分布式系统监控CAT
    Web Application Integrate with Box
    字符编解码的故事(ASCII,ANSI,Unicode,Utf-8区别)
    SharePoint Client Api Search user 几种方法
    C#,反射和直接调用的效率差别
    System.Data.SqlClient.SqlConnectionStringBuilder 链接字符串的问题
  • 原文地址:https://www.cnblogs.com/dailidong/p/7571104.html
Copyright © 2011-2022 走看看