zoukankan      html  css  js  c++  java
  • Hadoop服务库与事件库的使用及其工作流程

    Hadoop服务库与事件库的使用及其工作流程

     

    Hadoop服务库:

    YARN采用了基于服务的对象管理模型,主要特点有:

    • 被服务化的对象分4个状态:NOTINITED,INITED,STARTED,STOPED
    • 任何服务状态变化都可以触发另外一些动作
    • 可通过组合方式对任意服务进行组合,统一管理

    具体类请参见 org.apache.hadoop.service包下.核心接口是Service,抽象实现是AbstractService

        YARN中,ResourceManager和NodeManager属于组合服务,内部包含多个单一和组合服务.以实现对内部多种服务的统一管理.

     

    Hadoop事件库:

    YARN采用事件驱动并发模型, 把各种逻辑抽象成事件,进入事件队列,然后由中央异步调度器负责传递给相应的事件调度器处理,或者调度器之间再传递,直至完成任务.

    具体参见org.apache.hadoop.yarn.event.主要类和接口是:Event, AsyncDispatcher,EventHandler

     

     

    按照惯例, 先给出一个Demo,然后顺着Demo研究代码实现.

    示例我是直接抄<hadoop技术内幕>:

    例子涉及如下几个模块:

    1. Task

    2. TaskType

    3. Job

    4. JobType

    5. Dispatcher

    package com.demo1;

     

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.service.CompositeService;

    import org.apache.hadoop.service.Service;

    import org.apache.hadoop.yarn.event.AsyncDispatcher;

    import org.apache.hadoop.yarn.event.Dispatcher;

    import org.apache.hadoop.yarn.event.EventHandler;

     

    /**

    * Created by yang on 2014/8/25.

    */

    public class SimpleService extends CompositeService {

     

    private Dispatcher dispatcher;

    private String jobID;

    private int taskNum;

    private String[] taskIDs;

     

    public SimpleService(String name, String jobID, int taskNum) {

    super(name);

    this.jobID = jobID;

    this.taskNum = taskNum;

    this.taskIDs = new String[taskNum];

     

    for (int i = 0; i < taskNum; i++) {

    taskIDs[i] = new String(jobID + "_task_" + i);

    }

    }

     

    public Dispatcher getDispatcher() {

    return dispatcher;

    }

     

    public void serviceInit(Configuration conf) throws Exception {

    dispatcher = new AsyncDispatcher();

    dispatcher.register(JobEventType.class, new JobEventDIspatcher());

    dispatcher.register(TaskEventType.class, new TaskEventDIspatcher());

    addService((Service)dispatcher);

    super.serviceInit(conf);

    }

     

    private class JobEventDIspatcher implements EventHandler<JobEvent> {

     

    @Override

    public void handle(JobEvent jobEvent) {

    if (jobEvent.getType() == JobEventType.JOB_KILL) {

    System.out.println("JOB KILL EVENT");

    for (int i = 0; i < taskNum; i++) {

    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_KILL));

    }

    } else if (jobEvent.getType() == JobEventType.JOB_INIT) {

    System.out.println("JOB INIT EVENT");

    for (int i = 0; i < taskNum; i++) {

    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_SCHEDULE));

    }

    }

    }

    }

     

    private class TaskEventDIspatcher implements EventHandler<TaskEvent> {

     

    @Override

    public void handle(TaskEvent taskEvent) {

    if (taskEvent.getType() == TaskEventType.T_KILL) {

    System.out.println("TASK KILL EVENT" + taskEvent.getTaskID());

    } else if (taskEvent.getType() == TaskEventType.T_SCHEDULE) {

    System.out.println("TASK INIT EVENT" + taskEvent.getTaskID());

    }

    }

    }

    }

     

    1. 测试程序

    package com.demo1;

     

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.yarn.conf.YarnConfiguration;

     

    /**

    * Created by yang on 2014/8/25.

    */

    public class Test {

    public static void main(String[] args) throws Exception {

    String jobID="job_1";

    SimpleService ss = new SimpleService("test",jobID,5);

    YarnConfiguration config = new YarnConfiguration(new Configuration());

     

    ss.serviceInit(config);

    ss.init(config);

    ss.start();

     

    ss.getDispatcher().getEventHandler().handle(new JobEvent(jobID,JobEventType.JOB_KILL));

    ss.getDispatcher().getEventHandler().handle(new JobEvent(jobID,JobEventType.JOB_KILL));

    }

    }

     

     

    不出意外的话,运行结果应该类似:

    14/08/25 16:02:20 INFO event.AsyncDispatcher: Registering class com.yws.demo1.JobEventType for class com.yws.demo1.SimpleService$JobEventDIspatcher

    14/08/25 16:02:42 INFO event.AsyncDispatcher: Registering class com.yws.demo1.TaskEventType for class com.yws.demo1.SimpleService$TaskEventDIspatcher

    14/08/25 16:02:54 INFO event.AsyncDispatcher: Registering class com.yws.demo1.JobEventType for class com.yws.demo1.SimpleService$JobEventDIspatcher

    14/08/25 16:03:03 INFO event.AsyncDispatcher: Registering class com.yws.demo1.TaskEventType for class com.yws.demo1.SimpleService$TaskEventDIspatcher

    JOB KILL EVENT

    JOB KILL EVENT

    TASK KILL EVENTjob_1_task_0

    TASK KILL EVENTjob_1_task_1

    TASK KILL EVENTjob_1_task_2

    TASK KILL EVENTjob_1_task_3

    TASK KILL EVENTjob_1_task_4

    TASK KILL EVENTjob_1_task_0

    TASK KILL EVENTjob_1_task_1

    TASK KILL EVENTjob_1_task_2

    TASK KILL EVENTjob_1_task_3

    TASK KILL EVENTjob_1_task_4

     

     

    我们开始分析:

    所谓的Task,Job,其实是按业务逻辑划分的, 他们都继承AbstractEvent类.

    SimpleService是一个组合服务,里面放了EventHandler和Dispatcher

     

    从Test开始,看看Service是如何创建的

    构造函数比较简单,就是将一个job拆分成taskNum个Task

    ss.serviceInit(config);做了什么呢:

    创建一个中央事件调度器: AsyncDispatcher(具体实现我们在后文分析)

    并把Job和Task的Event及2者对应的EventHandler注册到调度器中.

    这里就是初始化和启动服务了.最后2行就是模拟2个事件的JOB_KILL事件.

     

    我们进到ss.getDispatcher().getEventHandler(),发现他其实是创建一个GenericEventHandler

     

    这个handler干什么是呢?

    就是把

    塞到BlockingQueue<Event> eventQueue; 中.

    不知道你发现没有, 这个方法仅仅是一个入队操作啊. 那具体调用JobEventDIspatcher.handler是在什么地方呢?

    这时联想到之前不是有个中央调度器嘛, AsyncDispatcher, Line 80行, 他创建了一个线程,并不断的从之前说的EventQueue中不断的取Event,然后执行,这里的执行也就是调用了具体的handler了

    就这样一个基于事件驱动的程序这么完成了.

     

    按照hadoop 早起版本中, 业务逻辑之间是通过函数调用方式实现的,也就是串行的. 现在基于事件驱动后,大大提高了并发性.很值得我们学习.

     

    来张全家福:

     

    HandlerThread就是前文说的那个隐藏线程. EventHandler会产生一些新的Event,然后又重新进入队列.循环.

  • 相关阅读:
    如何将baidu地图中的baidu logo 去掉
    漂亮的圆角文本框 CSS3实现
    jQuery数字加减插件
    腾迅股票数据接口 http/javascript
    JS复制内容到剪贴板(兼容FF/Chrome/Safari所有浏览器)
    关于编写性能高效的javascript事件的技术
    想做web开发 就学JavaScript
    git的简单理解及基础操作命令
    《CSS权威指南》基础复习+查漏补缺
    TypeScript Writing .d.ts files(编写声明文件)
  • 原文地址:https://www.cnblogs.com/dycg/p/3935411.html
Copyright © 2011-2022 走看看