本文所有涉及的内容均为2.2.0版本中呈现。
概述:
Job在创建Job并且提交的人的眼中,可以在创建的时候通过配置Job的内容,控制Job的执行,以及查询Job的运行状态。一旦Job提交以后,将不能对其进行配置,否则将会出现IllegalStateException异常。
正常情况下用户通过Job类来创建、描述、提交Job,以及监控Job的处理过程。下面是一个简单的例子:
// Create a new Job Job job = new Job(new Configuration()); job.setJarByClass(MyJob.class); // Specify various job-specific parameters job.setJobName("myjob"); job.setInputPath(new Path("in")); job.setOutputPath(new Path("out")); job.setMapperClass(MyJob.MyMapper.class); job.setReducerClass(MyJob.MyReducer.class); // Submit the job, then poll for progress until the job is complete job.waitForCompletion(true);
基本结构:
Job类在org.apache.hadoop.mapreduce包中,继承了JobContextImpl类以及实现了JobContext接口。
Job定义的静态常量:
private static final Log LOG = LogFactory.getLog(Job.class); @InterfaceStability.Evolving public static enum JobState {DEFINE, RUNNING}; private static final long MAX_JOBSTATUS_AGE = 1000 * 2; public static final String OUTPUT_FILTER = "mapreduce.client.output.filter"; /** Key in mapred-*.xml that sets completionPollInvervalMillis */ public static final String COMPLETION_POLL_INTERVAL_KEY = "mapreduce.client.completion.pollinterval"; /** Default completionPollIntervalMillis is 5000 ms. */ static final int DEFAULT_COMPLETION_POLL_INTERVAL = 5000; /** Key in mapred-*.xml that sets progMonitorPollIntervalMillis */ public static final String PROGRESS_MONITOR_POLL_INTERVAL_KEY = "mapreduce.client.progressmonitor.pollinterval"; /** Default progMonitorPollIntervalMillis is 1000 ms. */ static final int DEFAULT_MONITOR_POLL_INTERVAL = 1000; public static final String USED_GENERIC_PARSER = "mapreduce.client.genericoptionsparser.used"; public static final String SUBMIT_REPLICATION = "mapreduce.client.submit.file.replication"; private static final String TASKLOG_PULL_TIMEOUT_KEY = "mapreduce.client.tasklog.timeout"; private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
Job定义的私有变量:
private JobState state = JobState.DEFINE; private JobStatus status; private long statustime; private Cluster cluster;
Job类加载的时候就要执行的加载配置文件的方法:
static { ConfigUtil.loadResources(); }
加载的配置文件包括mapred-default.xml、mapred-site.xml、yarn-default.xml、yarn-site.xml。
Job的构造函数:
@Deprecated public Job() throws IOException { this(new Configuration()); } @Deprecated public Job(Configuration conf) throws IOException { this(new JobConf(conf)); } @Deprecated public Job(Configuration conf, String jobName) throws IOException { this(conf); setJobName(jobName); } Job(JobConf conf) throws IOException { super(conf, null); // propagate existing user credentials to job this.credentials.mergeAll(this.ugi.getCredentials()); this.cluster = null; } Job(JobStatus status, JobConf conf) throws IOException { this(conf); setJobID(status.getJobID()); this.status = status; state = JobState.RUNNING; }
可以注意到Hadoop不鼓励通过缺省的构造函数和通过Configuration类来构造Job对象。通过JobConf对象来构建Job是一个不错的选择。
获取Job对象的实例化方法:
除了通过构造函数,Job类中还提供了通过一些静态方法来获取Job的事例对象,看一下具体定义:
/** * Creates a new {@link Job} with no particular {@link Cluster} . * A Cluster will be created with a generic {@link Configuration}. * * @return the {@link Job} , with no connection to a cluster yet. * @throws IOException */ public static Job getInstance() throws IOException { // create with a null Cluster return getInstance(new Configuration()); } /** * Creates a new {@link Job} with no particular {@link Cluster} and a * given {@link Configuration}. * * The <code>Job</code> makes a copy of the <code>Configuration</code> so * that any necessary internal modifications do not reflect on the incoming * parameter. * * A Cluster will be created from the conf parameter only when it's needed. * * @param conf the configuration * @return the {@link Job} , with no connection to a cluster yet. * @throws IOException */ public static Job getInstance(Configuration conf) throws IOException { // create with a null Cluster JobConf jobConf = new JobConf(conf); return new Job(jobConf); } /** * Creates a new {@link Job} with no particular {@link Cluster} and a given jobName. * A Cluster will be created from the conf parameter only when it's needed. * * The <code>Job</code> makes a copy of the <code>Configuration</code> so * that any necessary internal modifications do not reflect on the incoming * parameter. * * @param conf the configuration * @return the {@link Job} , with no connection to a cluster yet. * @throws IOException */ public static Job getInstance(Configuration conf, String jobName) throws IOException { // create with a null Cluster Job result = getInstance(conf); result.setJobName(jobName); return result; } /** * Creates a new {@link Job} with no particular {@link Cluster} and given * {@link Configuration} and {@link JobStatus}. * A Cluster will be created from the conf parameter only when it's needed. * * The <code>Job</code> makes a copy of the <code>Configuration</code> so * that any necessary internal modifications do not reflect on the incoming * parameter. * * @param status job status * @param conf job configuration * @return the {@link Job} , with no connection to a cluster yet. * @throws IOException */ public static Job getInstance(JobStatus status, Configuration conf) throws IOException { return new Job(status, new JobConf(conf)); } /** * Creates a new {@link Job} with no particular {@link Cluster}. * A Cluster will be created from the conf parameter only when it's needed. * * The <code>Job</code> makes a copy of the <code>Configuration</code> so * that any necessary internal modifications do not reflect on the incoming * parameter. * * @param ignored * @return the {@link Job} , with no connection to a cluster yet. * @throws IOException * @deprecated Use {@link #getInstance()} */ @Deprecated public static Job getInstance(Cluster ignored) throws IOException { return getInstance(); } /** * Creates a new {@link Job} with no particular {@link Cluster} and given * {@link Configuration}. * A Cluster will be created from the conf parameter only when it's needed. * * The <code>Job</code> makes a copy of the <code>Configuration</code> so * that any necessary internal modifications do not reflect on the incoming * parameter. * * @param ignored * @param conf job configuration * @return the {@link Job} , with no connection to a cluster yet. * @throws IOException * @deprecated Use {@link #getInstance(Configuration)} */ @Deprecated public static Job getInstance(Cluster ignored, Configuration conf) throws IOException { return getInstance(conf); } /** * Creates a new {@link Job} with no particular {@link Cluster} and given * {@link Configuration} and {@link JobStatus}. * A Cluster will be created from the conf parameter only when it's needed. * * The <code>Job</code> makes a copy of the <code>Configuration</code> so * that any necessary internal modifications do not reflect on the incoming * parameter. * * @param cluster cluster * @param status job status * @param conf job configuration * @return the {@link Job} , with no connection to a cluster yet. * @throws IOException */ @Private public static Job getInstance(Cluster cluster, JobStatus status, Configuration conf) throws IOException { Job job = getInstance(status, conf); job.setCluster(cluster); return job; }
可见通过这种方式获取Job实例的时候会有可能涉及到Cluster。
轮询周期的方法:
/** The interval at which monitorAndPrintJob() prints status */ public static int getProgressPollInterval(Configuration conf) { // Read progress monitor poll interval from config. Default is 1 second. int progMonitorPollIntervalMillis = conf.getInt( PROGRESS_MONITOR_POLL_INTERVAL_KEY, DEFAULT_MONITOR_POLL_INTERVAL); if (progMonitorPollIntervalMillis < 1) { LOG.warn(PROGRESS_MONITOR_POLL_INTERVAL_KEY + " has been set to an invalid value; " + " replacing with " + DEFAULT_MONITOR_POLL_INTERVAL); progMonitorPollIntervalMillis = DEFAULT_MONITOR_POLL_INTERVAL; } return progMonitorPollIntervalMillis; } /** The interval at which waitForCompletion() should check. */ public static int getCompletionPollInterval(Configuration conf) { int completionPollIntervalMillis = conf.getInt( COMPLETION_POLL_INTERVAL_KEY, DEFAULT_COMPLETION_POLL_INTERVAL); if (completionPollIntervalMillis < 1) { LOG.warn(COMPLETION_POLL_INTERVAL_KEY + " has been set to an invalid value; " + "replacing with " + DEFAULT_COMPLETION_POLL_INTERVAL); completionPollIntervalMillis = DEFAULT_COMPLETION_POLL_INTERVAL; } return completionPollIntervalMillis; }
上面两个方法分别为获取并且打印Job的运行状态的周期,以及查看Job是否完成的周期。
需要做异步处理的方法:
synchronized void ensureFreshStatus() throws IOException { if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) { updateStatus(); } } /** Some methods need to update status immediately. So, refresh * immediately * @throws IOException */ synchronized void updateStatus() throws IOException { try { this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { @Override public JobStatus run() throws IOException, InterruptedException { return cluster.getClient().getJobStatus(status.getJobID()); } }); } catch (InterruptedException ie) { throw new IOException(ie); } if (this.status == null) { throw new IOException("Job status not available "); } this.statustime = System.currentTimeMillis(); } private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
设置配置参数的方法:
/** * Set the number of reduce tasks for the job. * @param tasks the number of reduce tasks * @throws IllegalStateException if the job is submitted */ public void setNumReduceTasks(int tasks) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setNumReduceTasks(tasks); } /** * Set the current working directory for the default file system. * * @param dir the new current working directory. * @throws IllegalStateException if the job is submitted */ public void setWorkingDirectory(Path dir) throws IOException { ensureState(JobState.DEFINE); conf.setWorkingDirectory(dir); } /** * Set the {@link InputFormat} for the job. * @param cls the <code>InputFormat</code> to use * @throws IllegalStateException if the job is submitted */ public void setInputFormatClass(Class<? extends InputFormat> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class); } /** * Set the {@link OutputFormat} for the job. * @param cls the <code>OutputFormat</code> to use * @throws IllegalStateException if the job is submitted */ public void setOutputFormatClass(Class<? extends OutputFormat> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(OUTPUT_FORMAT_CLASS_ATTR, cls, OutputFormat.class); } /** * Set the {@link Mapper} for the job. * @param cls the <code>Mapper</code> to use * @throws IllegalStateException if the job is submitted */ public void setMapperClass(Class<? extends Mapper> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(MAP_CLASS_ATTR, cls, Mapper.class); } /** * Set the Jar by finding where a given class came from. * @param cls the example class */ public void setJarByClass(Class<?> cls) { ensureState(JobState.DEFINE); conf.setJarByClass(cls); } /** * Set the job jar */ public void setJar(String jar) { ensureState(JobState.DEFINE); conf.setJar(jar); } /** * Set the reported username for this job. * * @param user the username for this job. */ public void setUser(String user) { ensureState(JobState.DEFINE); conf.setUser(user); } /** * Set the combiner class for the job. * @param cls the combiner to use * @throws IllegalStateException if the job is submitted */ public void setCombinerClass(Class<? extends Reducer> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class); } /** * Set the {@link Reducer} for the job. * @param cls the <code>Reducer</code> to use * @throws IllegalStateException if the job is submitted */ public void setReducerClass(Class<? extends Reducer> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(REDUCE_CLASS_ATTR, cls, Reducer.class); } /** * Set the {@link Partitioner} for the job. * @param cls the <code>Partitioner</code> to use * @throws IllegalStateException if the job is submitted */ public void setPartitionerClass(Class<? extends Partitioner> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setClass(PARTITIONER_CLASS_ATTR, cls, Partitioner.class); } /** * Set the key class for the map output data. This allows the user to * specify the map output key class to be different than the final output * value class. * * @param theClass the map output key class. * @throws IllegalStateException if the job is submitted */ public void setMapOutputKeyClass(Class<?> theClass ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setMapOutputKeyClass(theClass); } /** * Set the value class for the map output data. This allows the user to * specify the map output value class to be different than the final output * value class. * * @param theClass the map output value class. * @throws IllegalStateException if the job is submitted */ public void setMapOutputValueClass(Class<?> theClass ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setMapOutputValueClass(theClass); } /** * Set the key class for the job output data. * * @param theClass the key class for the job output data. * @throws IllegalStateException if the job is submitted */ public void setOutputKeyClass(Class<?> theClass ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setOutputKeyClass(theClass); } /** * Set the value class for job outputs. * * @param theClass the value class for job outputs. * @throws IllegalStateException if the job is submitted */ public void setOutputValueClass(Class<?> theClass ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setOutputValueClass(theClass); } /** * Define the comparator that controls how the keys are sorted before they * are passed to the {@link Reducer}. * @param cls the raw comparator * @throws IllegalStateException if the job is submitted */ public void setSortComparatorClass(Class<? extends RawComparator> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setOutputKeyComparatorClass(cls); } /** * Define the comparator that controls which keys are grouped together * for a single call to * {@link Reducer#reduce(Object, Iterable, * org.apache.hadoop.mapreduce.Reducer.Context)} * @param cls the raw comparator to use * @throws IllegalStateException if the job is submitted */ public void setGroupingComparatorClass(Class<? extends RawComparator> cls ) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setOutputValueGroupingComparator(cls); } /** * Set the user-specified job name. * * @param name the job's new name. * @throws IllegalStateException if the job is submitted */ public void setJobName(String name) throws IllegalStateException { ensureState(JobState.DEFINE); conf.setJobName(name); } /** * Turn speculative execution on or off for this job. * * @param speculativeExecution <code>true</code> if speculative execution * should be turned on, else <code>false</code>. */ public void setSpeculativeExecution(boolean speculativeExecution) { ensureState(JobState.DEFINE); conf.setSpeculativeExecution(speculativeExecution); } /** * Turn speculative execution on or off for this job for map tasks. * * @param speculativeExecution <code>true</code> if speculative execution * should be turned on for map tasks, * else <code>false</code>. */ public void setMapSpeculativeExecution(boolean speculativeExecution) { ensureState(JobState.DEFINE); conf.setMapSpeculativeExecution(speculativeExecution); } /** * Turn speculative execution on or off for this job for reduce tasks. * * @param speculativeExecution <code>true</code> if speculative execution * should be turned on for reduce tasks, * else <code>false</code>. */ public void setReduceSpeculativeExecution(boolean speculativeExecution) { ensureState(JobState.DEFINE); conf.setReduceSpeculativeExecution(speculativeExecution); } /** * Specify whether job-setup and job-cleanup is needed for the job * * @param needed If <code>true</code>, job-setup and job-cleanup will be * considered from {@link OutputCommitter} * else ignored. */ public void setJobSetupCleanupNeeded(boolean needed) { ensureState(JobState.DEFINE); conf.setBoolean(SETUP_CLEANUP_NEEDED, needed); } /** * Set the given set of archives * @param archives The list of archives that need to be localized */ public void setCacheArchives(URI[] archives) { ensureState(JobState.DEFINE); DistributedCache.setCacheArchives(archives, conf); } /** * Set the given set of files * @param files The list of files that need to be localized */ public void setCacheFiles(URI[] files) { ensureState(JobState.DEFINE); DistributedCache.setCacheFiles(files, conf); } /** * Add a archives to be localized * @param uri The uri of the cache to be localized */ public void addCacheArchive(URI uri) { ensureState(JobState.DEFINE); DistributedCache.addCacheArchive(uri, conf); } /** * Add a file to be localized * @param uri The uri of the cache to be localized */ public void addCacheFile(URI uri) { ensureState(JobState.DEFINE); DistributedCache.addCacheFile(uri, conf); } /** * Add an file path to the current set of classpath entries It adds the file * to cache as well. * * Files added with this method will not be unpacked while being added to the * classpath. * To add archives to classpath, use the {@link #addArchiveToClassPath(Path)} * method instead. * * @param file Path of the file to be added */ public void addFileToClassPath(Path file) throws IOException { ensureState(JobState.DEFINE); DistributedCache.addFileToClassPath(file, conf, file.getFileSystem(conf)); } /** * Add an archive path to the current set of classpath entries. It adds the * archive to cache as well. * * Archive files will be unpacked and added to the classpath * when being distributed. * * @param archive Path of the archive to be added */ public void addArchiveToClassPath(Path archive) throws IOException { ensureState(JobState.DEFINE); DistributedCache.addArchiveToClassPath(archive, conf, archive.getFileSystem(conf)); } /** * Originally intended to enable symlinks, but currently symlinks cannot be * disabled. */ @Deprecated public void createSymlink() { ensureState(JobState.DEFINE); DistributedCache.createSymlink(conf); } /** * Expert: Set the number of maximum attempts that will be made to run a * map task. * * @param n the number of attempts per map task. */ public void setMaxMapAttempts(int n) { ensureState(JobState.DEFINE); conf.setMaxMapAttempts(n); } /** * Expert: Set the number of maximum attempts that will be made to run a * reduce task. * * @param n the number of attempts per reduce task. */ public void setMaxReduceAttempts(int n) { ensureState(JobState.DEFINE); conf.setMaxReduceAttempts(n); } /** * Set whether the system should collect profiler information for some of * the tasks in this job? The information is stored in the user log * directory. * @param newValue true means it should be gathered */ public void setProfileEnabled(boolean newValue) { ensureState(JobState.DEFINE); conf.setProfileEnabled(newValue); } /** * Set the profiler configuration arguments. If the string contains a '%s' it * will be replaced with the name of the profiling output file when the task * runs. * * This value is passed to the task child JVM on the command line. * * @param value the configuration string */ public void setProfileParams(String value) { ensureState(JobState.DEFINE); conf.setProfileParams(value); } /** * Set the ranges of maps or reduces to profile. setProfileEnabled(true) * must also be called. * @param newValue a set of integer ranges of the map ids */ public void setProfileTaskRange(boolean isMap, String newValue) { ensureState(JobState.DEFINE); conf.setProfileTaskRange(isMap, newValue); } /** * Sets the flag that will allow the JobTracker to cancel the HDFS delegation * tokens upon job completion. Defaults to true. */ public void setCancelDelegationTokenUponJobCompletion(boolean value) { ensureState(JobState.DEFINE); conf.setBoolean(JOB_CANCEL_DELEGATION_TOKEN, value); }
要非常注意的地方就是在每项配置的时候都需要检查状态,Job只有处于DEFINE状态下的时候才可以对其进行配置。
屏幕输出的方法:
/** * Dump stats to screen. */ @Override public String toString() { ensureState(JobState.RUNNING); String reasonforFailure = " "; int numMaps = 0; int numReduces = 0; try { updateStatus(); if (status.getState().equals(JobStatus.State.FAILED)) reasonforFailure = getTaskFailureEventString(); numMaps = getTaskReports(TaskType.MAP).length; numReduces = getTaskReports(TaskType.REDUCE).length; } catch (IOException e) { } catch (InterruptedException ie) { } StringBuffer sb = new StringBuffer(); sb.append("Job: ").append(status.getJobID()).append(" "); sb.append("Job File: ").append(status.getJobFile()).append(" "); sb.append("Job Tracking URL : ").append(status.getTrackingUrl()); sb.append(" "); sb.append("Uber job : ").append(status.isUber()).append(" "); sb.append("Number of maps: ").append(numMaps).append(" "); sb.append("Number of reduces: ").append(numReduces).append(" "); sb.append("map() completion: "); sb.append(status.getMapProgress()).append(" "); sb.append("reduce() completion: "); sb.append(status.getReduceProgress()).append(" "); sb.append("Job state: "); sb.append(status.getState()).append(" "); sb.append("retired: ").append(status.isRetired()).append(" "); sb.append("reason for failure: ").append(reasonforFailure); return sb.toString(); }
获取任务进程的方法:
/** * Get the <i>progress</i> of the job's map-tasks, as a float between 0.0 * and 1.0. When all map tasks have completed, the function returns 1.0. * * @return the progress of the job's map-tasks. * @throws IOException */ public float mapProgress() throws IOException { ensureState(JobState.RUNNING); ensureFreshStatus(); return status.getMapProgress(); } /** * Get the <i>progress</i> of the job's reduce-tasks, as a float between 0.0 * and 1.0. When all reduce tasks have completed, the function returns 1.0. * * @return the progress of the job's reduce-tasks. * @throws IOException */ public float reduceProgress() throws IOException { ensureState(JobState.RUNNING); ensureFreshStatus(); return status.getReduceProgress(); } /** * Get the <i>progress</i> of the job's cleanup-tasks, as a float between 0.0 * and 1.0. When all cleanup tasks have completed, the function returns 1.0. * * @return the progress of the job's cleanup-tasks. * @throws IOException */ public float cleanupProgress() throws IOException, InterruptedException { ensureState(JobState.RUNNING); ensureFreshStatus(); return status.getCleanupProgress(); } /** * Get the <i>progress</i> of the job's setup-tasks, as a float between 0.0 * and 1.0. When all setup tasks have completed, the function returns 1.0. * * @return the progress of the job's setup-tasks. * @throws IOException */ public float setupProgress() throws IOException { ensureState(JobState.RUNNING); ensureFreshStatus(); return status.getSetupProgress(); }