边数据
边数据(side data)是作业所需的额外的只读数据,以辅助处理主数据集。所面临的挑战在于如何使所有map或reduce任务(这些任务散布在集群内部)都能够方便而高效地使用边数据。
利用Job来配置作业
Configuration类的各种setter方法能够方便地配置作业的任一键值对。如果仅需向任务传递少量元数据则非常有用。用户可以通过Context类的getConfiguration()方法获得配置信息。
一般情况下,基本类型足以应付元数据编码。但对于更复杂的对象,用户要么自己处理序列化工作(这需要实现一个对象与字符串之间的双向转换机制),要么使用Hadoop提供的Stringifier类。DefaultStringifier使用Hadoop的序列化框架来序列化对象。
但是这种机制会加大Hadoop守护进程的内存开销压力,当几百个作业在系统中同时运行时这种现象尤为突出。因此,这种机制并不适合传输多达几千字节的数据量。每次读取配置时,所有项都被读入到内存(即使暂时不用的属性项也不例外)。MR1中,作业配置由jobtracker、tasktracker和子JVM读取,jobtracker和tasktracker均不使用用户的属性,因此这种做法有时既浪费时间,又浪费内存。
代码如下
package com.zhen.mapreduce.sideData.job; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author FengZhen * @date 2018年9月23日 * 边数据(通过job configuration来设置) */ public class SideData extends Configured implements Tool{ static class SideDataMapper extends Mapper<LongWritable, Text, Text, Text>{ String sideDataValue = ""; @Override protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { sideDataValue = context.getConfiguration().get("sideData_test_key"); } @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException { context.write(value, new Text(sideDataValue)); } } public int run(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.set("sideData_test_key", "sideData_test_value"); Job job = Job.getInstance(configuration); job.setJobName("SideData"); job.setJarByClass(getClass()); job.setMapperClass(SideDataMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) { try { String[] params = new String[] { "hdfs://fz/user/hdfs/MapReduce/data/sideData/job/input", "hdfs://fz/user/hdfs/MapReduce/data/sideData/job/output" }; int exitCode = ToolRunner.run(new SideData(), params); System.exit(exitCode); } catch (Exception e) { e.printStackTrace(); } } }
分布式缓存
与在作业配置中序列化边数据的技术相比,Hadoop的分布式缓存机制更受青睐,它能够在任务运行过程中及时地将文件和存档复制到任务节点以供使用。为了节约网络带宽,在每一个作业中,各个文件通常只需复制到一个节点一次。
1.用法
对于使用GenericOptionsParser的工具来说,用户可以使用-files选项指定待分发的文件,文件内包含以逗号隔开的URL列表。文件可以存放在本地文件系统、HDFS或其他Hadoop可读文件系统之中(如S3).如果尚未指定文件系统,则这些文件被默认是本地的。即使默认文件系统并非本地文件系统,这也是成立的。
用户可以使用-archives选项向自己的任务中复制存档文件(JAR文件、ZIP文件、tar文件和gzipped tar文件),这些文件会被解档到任务节点。-libjars选项会把JAR文件添加到mapper和reducer任务的类路径中。如果作业JAR文件并非包含很多库JAR文件,这点会很有用。
代码如下
package com.zhen.mapreduce.sideData.distributedCache; import java.io.File; import java.io.IOException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; /** * @author FengZhen * @date 2018年9月23日 * 读取本地文件边数据 * 读取不到该文件 * hadoop jar SideData.jar com.zhen.mapreduce.sideData.distributedCache.MaxTemperatureByStationNameUsingDistributedCacheFile -files /usr/local/test/mr/stations-fixed-width.txt */ public class MaxTemperatureByStationNameUsingDistributedCacheFile extends Configured implements Tool{ static Logger logger = Logger.getLogger(MaxTemperatureByStationNameUsingDistributedCacheFile.class); static enum StationFile{STATION_SIZE}; static class StationTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { parser.parse(value.toString()); if (parser.isValidTemperature()) { context.write(new Text(parser.getStationId()), new IntWritable(parser.getTemperature())); } } } static class MaxTemperatureReducerWithStationLookup extends Reducer<Text, IntWritable, Text, IntWritable>{ private NcdcStationMetadata metadata; @Override protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { metadata = new NcdcStationMetadata(); File file = new File("stations-fixed-width.txt"); metadata.initialize(file); context.getCounter(StationFile.STATION_SIZE).setValue(metadata.getStationMap().size()); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { String stationName = metadata.getStationName(key.toString()); int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(new Text(stationName), new IntWritable(maxValue)); } } public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf()); job.setJobName("MaxTemperatureByStationNameUsingDistributedCacheFile"); job.setJarByClass(getClass()); job.setMapperClass(StationTemperatureMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(MaxTemperatureReducerWithStationLookup.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) { try { String[] params = new String[] { "hdfs://fz/user/hdfs/MapReduce/data/sideData/distributedCache/input", "hdfs://fz/user/hdfs/MapReduce/data/sideData/distributedCache/output" }; int exitCode = ToolRunner.run(new MaxTemperatureByStationNameUsingDistributedCacheFile(), params); System.exit(exitCode); } catch (Exception e) { e.printStackTrace(); } } }
解析天气数据
package com.zhen.mapreduce.sideData.distributedCache; import java.io.Serializable; /** * @author FengZhen * @date 2018年9月9日 * 解析天气数据 */ public class NcdcRecordParser implements Serializable{ private static final long serialVersionUID = 1L; /** * 气象台ID */ private String stationId; /** * 时间 */ private long timeStamp; /** * 气温 */ private Integer temperature; /** * 解析 * @param value */ public void parse(String value) { String[] values = value.split(","); if (values.length >= 3) { stationId = values[0]; timeStamp = Long.parseLong(values[1]); temperature = Integer.valueOf(values[2]); } } /** * 校验是否合格 * @return */ public boolean isValidTemperature() { return null != temperature; } public String getStationId() { return stationId; } public void setStationId(String stationId) { this.stationId = stationId; } public long getTimeStamp() { return timeStamp; } public void setTimeStamp(long timeStamp) { this.timeStamp = timeStamp; } public Integer getTemperature() { return temperature; } public void setTemperature(Integer temperature) { this.temperature = temperature; } }
解析气象站数据
package com.zhen.mapreduce.sideData.distributedCache; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.HashMap; import java.util.Map; /** * @author FengZhen * @date 2018年9月23日 * 解析边数据 */ public class NcdcStationMetadata { /** * 存放气象站ID和name */ private Map<String, String> stationMap = new HashMap<String, String>(); public Map<String, String> getStationMap() { return stationMap; } public void setStationMap(Map<String, String> stationMap) { this.stationMap = stationMap; } /** * 根据ID获取name * @param stationId * @return */ public String getStationName(String stationId) { return stationMap.get(stationId); } /** * 解析 * @param value */ public boolean parse(String value) { String[] values = value.split(","); if (values.length >= 2) { String stationId = values[0]; String stationName = values[1]; if (null == stationMap) { stationMap = new HashMap<String, String>(); } stationMap.put(stationId, stationName); return true; } return false; } /** * 解析气象站数据文件 * @param file */ public void initialize(File file) { BufferedReader reader=null; String temp=null; try{ reader=new BufferedReader(new FileReader(file)); System.out.println("------------------start------------------"); while((temp=reader.readLine())!=null){ System.out.println(temp); parse(temp); } System.out.println("------------------end------------------"); } catch(Exception e){ e.printStackTrace(); } finally{ if(reader!=null){ try{ reader.close(); } catch(Exception e){ e.printStackTrace(); } } } } }
2.工作机制
当用户启动一个作业,Hadoop会把由-files、-archives和-libjars等选项所指定的文件复制到分布式文件系统(一般是HDFS)之中。接着,在任务运行之前,tasktracker(YARN中NodeManager)将文件从分布式文件系统复制到本地磁盘(缓存)使任务能够访问文件。此时,这些文件就被视为本地化了。从任务的角度看,这些文件就已经在那儿了(它并不关心这些文件是否来自HDFS)。此外,由-libjars指定的文件会在任务启动前添加到任务的类路径(classpath)中。
tasktracker(YARN中NodeManager)为缓存中的文件各维护一个计数器来统计这些文件的被使用情况。当任务即将运行时,该任务所使用的所有文件的对应计数器值增1;当任务执行完毕之后,这些计数器值均减1.当相关计数器值为0时,表明该文件没有被任何任务使用,可以从缓存中移除。缓存的容量是有限的(默认10GB),因此需要经常删除无用的文件以腾出空间来装载新文件。缓存大小可以通过属性local.cache.size进行配置,以字节为单位。
尽管该机制并不确保在同一个tasktracker上运行的同一作业的后续任务肯定能在缓存中找到文件,但是成功的概率相当大。原因在于作业的多个任务在调度之后几乎同时开始运行,因此,不会有足够多的其他作业在运行而导致原始任务的文件从缓存中被删除。
文件存放在tasktracker的${mapred.local.dir}/taskTracker/archive目录下。但是应用程序不必知道这一点,因为这些文件同时以符号链接的方式指向任务的工作目录。
3.分布式缓存API
由于可以通过GenericOptionsParser间接使用分布式缓存,大多数应用不需要使用分布式缓存API。然而,一些应用程序需要用到分布式缓存的更高级的特性,这就需要直接使用API了。API包括两部分:将数据放到缓存中的方法,以及从缓存中读取数据的方法。
public void addCacheFile(URI uri) public void addCacheArchive(URI uri) public void setCacheFiles(URI[] files) public void setCacheArchives(URI[] archives) public void addFileToClassPath(Path file) public void addArchiveToClassPath(Path archive)
在缓存中可以存放两类对象:文件(files)和存档(archives)。文件被直接放置在任务节点上,而存档则会被解档之后再将具体文件放置在任务节点上。每种对象类型都包含三种方法:addCacheXXX()、setCacheXXXs()和addXXXToClassPath()。其中,addCacheXXX()方法将文件或存档添加到分布式缓存,setCacheXXXs()方法将一次性向分布式缓存中添加一组文件或存档(之前调用所生成的集合将被替换),addXXXToClassPath()方法将文件或存档添加到MapReduce任务的类路径。
代码如下(有问题,读不到文件)
package com.zhen.mapreduce.sideData.distributedCache; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; /** * @author FengZhen * @date 2018年9月23日 * 读取本地文件边数据 * 有问题,空指针,读不到文件 * hadoop jar SideData.jar com.zhen.mapreduce.sideData.distributedCache.MaxTemperatureByStationNameUsingDistributedCacheFileAPI */ public class MaxTemperatureByStationNameUsingDistributedCacheFileAPI extends Configured implements Tool{ static Logger logger = Logger.getLogger(MaxTemperatureByStationNameUsingDistributedCacheFileAPI.class); static enum StationFile{STATION_SIZE}; static class StationTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { parser.parse(value.toString()); if (parser.isValidTemperature()) { context.write(new Text(parser.getStationId()), new IntWritable(parser.getTemperature())); } } } static class MaxTemperatureReducerWithStationLookup extends Reducer<Text, IntWritable, Text, IntWritable>{ private NcdcStationMetadata metadata; @Override protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { metadata = new NcdcStationMetadata(); URI[] localPaths = context.getCacheFiles(); if (localPaths.length == 0) { throw new FileNotFoundException("Distributed cache file not found."); } File file = new File(localPaths[0].getPath().toString()); metadata.initialize(file); context.getCounter(StationFile.STATION_SIZE).setValue(metadata.getStationMap().size()); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { String stationName = metadata.getStationName(key.toString()); int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(new Text(stationName), new IntWritable(maxValue)); } } public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf()); job.setJobName("MaxTemperatureByStationNameUsingDistributedCacheFileAPI"); job.setJarByClass(getClass()); job.setMapperClass(StationTemperatureMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(MaxTemperatureReducerWithStationLookup.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.addCacheFile(new URI("hdfs://fz/user/hdfs/MapReduce/data/sideData/distributedCache/stations-fixed-width.txt")); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) { try { String[] params = new String[] { "hdfs://fz/user/hdfs/MapReduce/data/sideData/distributedCache/input", "hdfs://fz/user/hdfs/MapReduce/data/sideData/distributedCache/output" }; int exitCode = ToolRunner.run(new MaxTemperatureByStationNameUsingDistributedCacheFileAPI(), params); System.exit(exitCode); } catch (Exception e) { e.printStackTrace(); } } }