zoukankan      html  css  js  c++  java
  • hadoop-2

    AsyncDispatcher,直接看代码

      @Override
      protected void serviceStart() throws Exception {
        //start all the components
        super.serviceStart();
        eventHandlingThread = new Thread(createThread());
        eventHandlingThread.setName(dispatcherThreadName);
        eventHandlingThread.start();
      }
      Runnable createThread() {
        return new Runnable() {
          @Override
          public void run() {
            while (!stopped && !Thread.currentThread().isInterrupted()) {
              drained = eventQueue.isEmpty();
              // blockNewEvents is only set when dispatcher is draining to stop,
              // adding this check is to avoid the overhead of acquiring the lock
              // and calling notify every time in the normal run of the loop.
              if (blockNewEvents) {
                synchronized (waitForDrained) {
                  if (drained) {
                    waitForDrained.notify();
                  }
                }
              }
              Event event;
              try {
                event = eventQueue.take();
              } catch(InterruptedException ie) {
                if (!stopped) {
                  LOG.warn("AsyncDispatcher thread interrupted", ie);
                }
                return;
              }
              if (event != null) {
                dispatch(event);
              }
            }
          }
        };
      }
      protected void dispatch(Event event) {
        //all events go thru this loop
        if (LOG.isDebugEnabled()) {
          LOG.debug("Dispatching the event " + event.getClass().getName() + "."
              + event.toString());
        }
    
        Class<? extends Enum> type = event.getType().getDeclaringClass();
    
        try{
          EventHandler handler = eventDispatchers.get(type);
          if(handler != null) {
            handler.handle(event);
          } else {
            throw new Exception("No handler for registered for " + type);
          }
        } catch (Throwable t) {
          //TODO Maybe log the state of the queue
          LOG.fatal("Error in dispatcher thread", t);
          // If serviceStop is called, we should exit this thread gracefully.
          if (exitOnDispatchException
              && (ShutdownHookManager.get().isShutdownInProgress()) == false
              && stopped == false) {
            stopped = true;
            Thread shutDownThread = new Thread(createShutDownThread());
            shutDownThread.setName("AsyncDispatcher ShutDown handler");
            shutDownThread.start();
          }
        }
      }

    -------------------------------------------------------------------------------------------------------------------------------------

    一个handler的实现

    public class JobHistoryEventHandler extends AbstractService implements EventHandler<JobHistoryEvent> {
          protected BlockingQueue<JobHistoryEvent> eventQueue = new LinkedBlockingQueue<JobHistoryEvent>();
          protected boolean handleTimelineEvent = false;
          protected AsyncDispatcher atsEventDispatcher = null;
    ...... @Override
    public void handle(JobHistoryEvent event) { try { eventQueue.put(event); // Process it for ATS (if enabled) if (handleTimelineEvent) { atsEventDispatcher.getEventHandler().handle(event); } } } }
  • 相关阅读:
    C#读写xml文件
    XSD(XML Schema Definition)用法实例介绍以及C#使用xsd文件验证XML格式
    C#异步批量下载文件
    echarts的markline的使用 y轴预警线
    Bootstrap-table 增删改查
    二维数组 和 稀疏数组的相互转换 及 数据存入文件中
    Bootstrap-table实现动态合并相同行
    echarts 中 参数的详讲
    BootstrapTable的简单使用教程
    遍历List 中 Map 的值
  • 原文地址:https://www.cnblogs.com/it-worker365/p/9924562.html
Copyright © 2011-2022 走看看