zoukankan      html  css  js  c++  java
  • hadoop-2

    AsyncDispatcher,直接看代码

      @Override
      protected void serviceStart() throws Exception {
        //start all the components
        super.serviceStart();
        eventHandlingThread = new Thread(createThread());
        eventHandlingThread.setName(dispatcherThreadName);
        eventHandlingThread.start();
      }
      Runnable createThread() {
        return new Runnable() {
          @Override
          public void run() {
            while (!stopped && !Thread.currentThread().isInterrupted()) {
              drained = eventQueue.isEmpty();
              // blockNewEvents is only set when dispatcher is draining to stop,
              // adding this check is to avoid the overhead of acquiring the lock
              // and calling notify every time in the normal run of the loop.
              if (blockNewEvents) {
                synchronized (waitForDrained) {
                  if (drained) {
                    waitForDrained.notify();
                  }
                }
              }
              Event event;
              try {
                event = eventQueue.take();
              } catch(InterruptedException ie) {
                if (!stopped) {
                  LOG.warn("AsyncDispatcher thread interrupted", ie);
                }
                return;
              }
              if (event != null) {
                dispatch(event);
              }
            }
          }
        };
      }
      protected void dispatch(Event event) {
        //all events go thru this loop
        if (LOG.isDebugEnabled()) {
          LOG.debug("Dispatching the event " + event.getClass().getName() + "."
              + event.toString());
        }
    
        Class<? extends Enum> type = event.getType().getDeclaringClass();
    
        try{
          EventHandler handler = eventDispatchers.get(type);
          if(handler != null) {
            handler.handle(event);
          } else {
            throw new Exception("No handler for registered for " + type);
          }
        } catch (Throwable t) {
          //TODO Maybe log the state of the queue
          LOG.fatal("Error in dispatcher thread", t);
          // If serviceStop is called, we should exit this thread gracefully.
          if (exitOnDispatchException
              && (ShutdownHookManager.get().isShutdownInProgress()) == false
              && stopped == false) {
            stopped = true;
            Thread shutDownThread = new Thread(createShutDownThread());
            shutDownThread.setName("AsyncDispatcher ShutDown handler");
            shutDownThread.start();
          }
        }
      }

    -------------------------------------------------------------------------------------------------------------------------------------

    一个handler的实现

    public class JobHistoryEventHandler extends AbstractService implements EventHandler<JobHistoryEvent> {
          protected BlockingQueue<JobHistoryEvent> eventQueue = new LinkedBlockingQueue<JobHistoryEvent>();
          protected boolean handleTimelineEvent = false;
          protected AsyncDispatcher atsEventDispatcher = null;
    ...... @Override
    public void handle(JobHistoryEvent event) { try { eventQueue.put(event); // Process it for ATS (if enabled) if (handleTimelineEvent) { atsEventDispatcher.getEventHandler().handle(event); } } } }
  • 相关阅读:
    linux addr2line 定位so库崩溃位置
    转:关于Android机型适配这件小事儿
    转:android studio 改编译区背景色
    转:ios review推送与执行
    k2pdfopt下载页
    转:让kindle更好的支持pdf
    转:各种文本格式转换的网站
    转: iOS崩溃堆栈符号表使用与用途
    转: 腾讯Bugly干货分享:Android应用性能评测调优
    转: git的图文使用教程(巨详细)
  • 原文地址:https://www.cnblogs.com/it-worker365/p/9924562.html
Copyright © 2011-2022 走看看