访问原网站:http://www.cs.bgu.ac.il/~dsp112/Data_Processing,Chain,Join
Data Processing Examples
Based on Hadoop in Action, Chuck Lam, 2010, sections 4.1-4.3
Data: NBER data sets of patents
Tasks
Citation list for each patent
62 lines ...
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.Mapper.Context;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- public class InvertAndGroupCitation {
- // The program gets the patent citation file (http://www.nber.org/patents/),
- // which composed of <citing pattent> <cited patent> records,
- // and generates for each patent, a list of the patent that cites it: <patent> [<citing patent>...]
- //
- // A modification of Chuck Lam's example (Hadoop In action, 2010, p. 68)
- public static class MapClass extends Mapper<Text, Text, Text, Text> {
- // The map method inverts the given key and the value: citing-cited --> cited-citing
- public void map(Text citing, Text cited, Context context) throws IOException, InterruptedException {
- context.write(cited, citing);
- }
- }
- public static class ReduceClass extends Reducer<Text,Text,Text,Text> {
- public void reduce(Text cited, Iterable<Text> citings, Context context) throws IOException, InterruptedException {
- //The reduce method gets the citing list per cited key, and format the citing patents as a comma separated list
- StringBuilder sb = new StringBuilder();
- int i=0;
- for (Text citing : citings) {
- if (i>0)
- sb.append(", ");
- sb.append(citing.toString());
- }
- context.write(cited, new Text(sb.toString()));
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- conf.set("key.value.separator.in.input.line",",");
- Job job = new Job(conf, "InvertAndGroupCitation");
- job.setJarByClass(InvertAndGroupCitation.class);
- job.setMapperClass(MapClass.class);
- job.setReducerClass(ReduceClass.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setInputFormatClass(MyKeyValueTextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
Citations count for each patent
57 lines ...
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- public class InvertAndCountCitation {
- // The program gets the patent citation file (http://www.nber.org/patents/),
- // which composed of <citing pattent> <cited patent> records,
- // and generates for each patent, the number of citing patents: <patent> <#citing patents>
- //
- // A modification of Chuck Lam's example (Hadoop In action, 2010, p. 68)
- public static class MapClass extends Mapper<Text, Text, Text, Text> {
- // The map method inverts the given key and the value: citing-cited --> cited-citing
- public void map(Text citing, Text cited, Context context) throws IOException, InterruptedException {
- context.write(cited, citing);
- }
- }
- public static class ReduceClass extends Reducer<Text,Text,Text,IntWritable> {
- public void reduce(Text cited, Iterable<Text> citings, Context context) throws IOException, InterruptedException {
- // The reduce method gets the citing list per cited key, and emits the patent and the number of citings
- int count=0;
- for (Text citing : citings)
- count++;
- context.write(cited, new IntWritable(count));
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- conf.set("key.value.separator.in.input.line",",");
- Job job = new Job(conf, "InverAndCountCitation");
- job.setJarByClass(InvertAndGroupCitation.class);
- job.setMapperClass(MapClass.class);
- job.setReducerClass(ReduceClass.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setInputFormatClass(MyKeyValueTextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
Citations count histogram
58 lines ...
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- public class CitationHistogram {
- // The program gets a pair list file of <patent> <number of citing patents>
- // and generates a histogram on the number of citations
- //
- // A modification of Chuck Lam's example (Hadoop In action, 2010, p. 68)
- public static class MapClass extends Mapper<Text, IntWritable, IntWritable, IntWritable> {
- // The map procedure gets a pair of patent and its citation counts and emit each citation count
- // with '1' value, indicating one instance of this count
- private final static IntWritable one = new IntWritable(1);
- private IntWritable citationCount = new IntWritable();
- public void map(Text cited, IntWritable numberOfCitations, Context context) throws IOException, InterruptedException {
- citationCount.set(Integer.parseInt(numberOfCitations.toString()));
- context.write(citationCount, one);
- }
- }
- public static class ReduceClass extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
- public void reduce(IntWritable numberOfCitations, Iterable<IntWritable> counts, Context context) throws IOException, InterruptedException {
- // The reduce method sums all instances of each given count, in order to compose the histogram
- int count = 0;
- for (IntWritable count1 : counts)
- count += count1.get();
- context.write(numberOfCitations, new IntWritable(count)); }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = new Job(conf, "CitationHistogram");
- job.setJarByClass(CitationHistogram.class);
- job.setMapperClass(MapClass.class);
- job.setReducerClass(ReduceClass.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setInputFormatClass(MyKeyValueTextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(IntWritable.class);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
Distribution of patent attribute values
66 lines ...
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.DoubleWritable;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- public class CountByAttribute {
- // The program gets the patent information file (http://www.nber.org/patents/),
- // which composed of a patent and its attribute, for each record,
- // and generates for a given attribute, count distribution of its values
- //
- // A modification of Chuck Lam's example (Hadoop In action, 2010, p. 68)
- public static class MapClass extends Mapper<LongWritable, Text, Text, IntWritable> {
- private final static IntWritable one = new IntWritable(1);
- private Text word = new Text();
- public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- // The map gets a record of a pattern, extracts the relevant attribute value,
- // and emits the attribute and '1' value, indicating its existence
- String fields[] = value.toString().split(",", -20);
- word.set(fields[3]); // year
- //word.set(fields[4]); //country
- context.write(word,one);
- }
- }
- public static class ReduceClass extends Reducer<Text,IntWritable,Text,IntWritable> {
- public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- // The reduce methods gets an attribute value and its counts and emits its total count
- int sum = 0;
- for (IntWritable value : values)
- sum += value.get();
- context.write(key, new IntWritable(sum));
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = new Job(conf, "CountByAttribute");
- job.setJarByClass(CountByAttribute.class);
- job.setMapperClass(MapClass.class);
- job.setReducerClass(ReduceClass.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(DoubleWritable.class);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
Task Chain Example
Pipeline of depending jobs by JobControl
63 lines ...
- import java.util.LinkedList;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.mapred.InputFormat;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputFormat;
- import org.apache.hadoop.mapred.Reducer;
- import org.apache.hadoop.mapred.jobcontrol.Job;
- import org.apache.hadoop.mapred.jobcontrol.JobControl;
- import org.apache.hadoop.mapred.lib.ChainMapper;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.io.Text;
- public class CitationPipeline {
- public static void main(String[] args) {
- // Example of applying a pipeline of two mapper/reducer jobs, by using JobControl object,
- // over a depending constraint.
- // The first Job counts the number of citations for each patent,
- // and the second job generate the histogram
- // 0.21.0 compliant! in order to work with 0.20.2 use JobConf/JobClient mechanism
- // Note: The code was not checked!!!
- try {
- JobConf jobconf1 = new JobConf(new Configuration());
- Job job1 = new Job(jobconf1);
- jobconf1.setJarByClass(WordCount.class);
- jobconf1.setMapperClass(InvertAndCount.MapClass.class);
- jobconf1.setReducerClass(InvertAndCount.ReduceClass.class);
- jobconf1.setOutputKeyClass(Text.class);
- jobconf1.setOutputValueClass(IntWritable.class);
- FileInputFormat.addInputPath(job1.getJob(), new Path(args[0]));
- FileOutputFormat.setOutputPath(job1.getJob(), new Path("intermediate"));
- JobConf jobconf2 = new JobConf(new Configuration());
- Job job2 = new Job(jobconf2);
- jobconf2.setJarByClass(CitationHistogram.class);
- jobconf2.setMapperClass(CitationHistogram.MapClass.class);
- jobconf2.setReducerClass(CitationHistogram.ReduceClass.class);
- jobconf2.setOutputKeyClass(IntWritable.class);
- jobconf2.setOutputValueClass(IntWritable.class);
- jobconf2.setInputFormat(MyKeyValueTextInputFormat.class);
- jobconf2.setOutputFormat(TextOutputFormat.class);
- FileInputFormat.addInputPath(job1.getJob(), new Path("intermediate"));
- FileOutputFormat.setOutputPath(job1.getJob(), new Path(args[1]));
- job2.addDependingJob(job1);
- JobControl jc = new JobControl("JC");
- jc.addJob(job1);
- jc.addJob(job2);
- jc.run();
- } catch (Exception e) {
- }
- }
- }
Chaining of pre-processing and post-processing mappers to a reducer
55 lines ...
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.mapred.lib.ChainMapper;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- public class SpeechLemmaIndexingChain {
- public static void main(String[] args) {
- // Example of direct chaining of several mappers (as preprocessors) to a reducer
- // The first mapper gets a sound file and generate its text extraction
- // The second mapper converts the tokens in the text to the equivalent lexemes
- // The third mapper generates the inverted index
- // The reducer emits the inverted index to the index file
- // 0.21.0 compliant! in order to work with 0.20.2 use JobConf/JobClient mechanism
- // Note: The code was not checked!!!
- Configuration conf = new Configuration();
- Job job = new Job(conf, "Job with chained tasks");
- job.setJarByClass(SpeechLemmaIndexingChain.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- Configuration mapText2SpeechConf = new Configuration(false);
- ChainMapper.addMapper(job, SpeechRecognitionMapper.class,
- IntWritable.class, Sound.class,IntWritable.class, Text.class,
- true, mapText2SpeechConf);
- Configuration mapLemmatizerConf = new Configuration(false);
- ChainMapper.addMapper(job, LematizerMapper.class,
- IntWritable.class, Text.class,IntWritable.class, Text.class,
- true, mapLemmatizerConf);
- Configuration mapIndexerConf = new Configuration(false);
- ChainMapper.addMapper(job, IndexMapper.class,
- IntWritable.class, Text.class,Text.class, IntWritable.class,
- true, mapIndexerConf);
- Configuration reduceIndexerConf = new Configuration(false);
- ChainReducer.setReducer(job, IndexReducer.class,
- LongWritable.class, Text.class,Text.class, Text.class,
- true, reduceConf);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
Data join Example
Reducer-side join implementation, based on tagged values
57 lines ...
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.io.WritableComparable;
- public abstract class TaggedValue<T extends Writable,V extends Writable> implements Writable {
- // An implementation of value with tag, as a writable object
- protected T tag;
- protected V value;
- TaggedValue() {
- init();
- }
- TaggedValue(T tag) {
- this.tag = tag;
- this.value = null;
- }
- TaggedValue(T tag,V value) {
- this.tag = tag;
- this.value = value;
- }
- protected abstract void init();
- @Override
- public void readFields(DataInput data) throws IOException {
- tag.readFields(data);
- value.readFields(data);
- }
- @Override
- public void write(DataOutput data) throws IOException {
- tag.write(data);
- value.write(data);
- }
- public String toString() {
- return tag + ":" + value;
- }
- public boolean equals(Object o) {
- TaggedValue<T,V> other = (TaggedValue<T,V>)o;
- return tag.equals(other.tag) && value.equals(other.value);
- }
- public T getTag() { return tag; }
- public V getvalue() { return value; }
- public void setValue(V value) { this.value = value; }
- }
22 lines ...
- import org.apache.hadoop.io.Text;
- public class TextTaggedValue extends TaggedValue<Text,Text> {
- public TextTaggedValue() {
- super();
- }
- public TextTaggedValue(Text tag) {
- super(tag);
- }
- public TextTaggedValue(Text tag,Text value) {
- super(tag,value);
- }
- @Override
- protected void init() {
- tag = new Text();
- value = new Text();
- }
- }
76 lines ...
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.KeyValueLineRecordReader;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- public class KeyTaggedValueLineRecordReader extends RecordReader<Text, TextTaggedValue> {
- // This record reader parses the input file into pairs of text key, and tagged value text,
- // where the tag is based on the name of the input file
- private KeyValueLineRecordReader lineReader;
- private Text lineKey, lineValue;
- private Text key;
- private Text tag;
- private TextTaggedValue value;
- public static RecordReader<Text, TextTaggedValue> create(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- RecordReader<Text, TextTaggedValue> ret = new KeyTaggedValueLineRecordReader();
- ret.initialize(split,context);
- return ret;
- }
- private KeyTaggedValueLineRecordReader() throws IOException {
- }
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
- org.apache.hadoop.mapreduce.lib.input.FileSplit fs = (org.apache.hadoop.mapreduce.lib.input.FileSplit)split;
- key = new Text();
- tag = new Text(fs.getPath().getName().split("-")[0]);
- value = new TextTaggedValue(tag);
- lineReader = new KeyValueLineRecordReader(context.getConfiguration(),
- new org.apache.hadoop.mapred.FileSplit(fs.getPath(),fs.getStart(),fs.getLength(),fs.getLocations()));
- lineKey = lineReader.createKey();
- lineValue = lineReader.createValue();
- }
- public synchronized boolean nextKeyValue() throws IOException {
- if (!lineReader.next(lineKey, lineValue)) {
- return false;
- }
- key.set(lineKey);
- value.setValue(new Text(lineValue.toString()));
- return true;
- }
- public TextTaggedValue getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
- public Text getCurrentKey() throws IOException, InterruptedException {
- return key;
- }
- public Text createKey() {
- return new Text("");
- }
- public Text createValue() {
- return new Text("");
- }
- public long getPos() throws IOException {
- return lineReader.getPos();
- }
- public float getProgress() throws IOException {
- return lineReader.getProgress();
- }
- public void close() throws IOException {
- lineReader.close();
- }
- }
15 lines ...
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- public class KeyTaggedValueTextInputFormat extends FileInputFormat<Text, TextTaggedValue> {
- // An implementation of an InputFormat of a text key and text tagged value
- public RecordReader<Text, TextTaggedValue> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException {
- return KeyTaggedValueLineRecordReader.create(split,context);
- }
- }
73 lines ...
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- public class ReduceSideJoin {
- // This program gets two types of values and produce a join-by-key value sets
- public static class MapClass extends Mapper<Text,TextTaggedValue,Text,TextTaggedValue> {
- // The map gets a key and tagged value (of 2 types) and emits the key and the value
- public void map(Text key, TextTaggedValue value, Context context) throws IOException, InterruptedException {
- context.write(key, value);
- }
- }
- public static class ReduceClass extends Reducer<Text,TextTaggedValue,Text,Text> {
- public void reduce(Text key, Iterable<TextTaggedValue> taggedValues, Context context) throws IOException, InterruptedException {
- // The reduce gets a key and a set of values of two types (identified by their tags)
- // and generates a cross product of the two types of values
- Map<Text,List<Text>> mapTag2Values = new HashMap<Text,List<Text>>();
- for (TextTaggedValue taggedValue : taggedValues) {
- List<Text> values = mapTag2Values.get(taggedValue.getTag());
- if (values == null) {
- values = new LinkedList<Text>();
- mapTag2Values.put(taggedValue.getTag(),values);
- }
- values.add(taggedValue.getvalue());
- }
- crossProduct(key,mapTag2Values,context);
- }
- protected void crossProduct(Text key,Map<Text,List<Text>> mapTag2Values,Context context) throws IOException, InterruptedException {
- // This specific implementation of the cross product, combine the data of the customers and the orders (
- // of a given costumer id).
- Text customer = mapTag2Values.get(new Text("customers")).get(0);
- for (Text order : mapTag2Values.get("orders"))
- context.write(key, new Text(customer.toString() + "," + order.toString()));
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = new Job(conf, "DataJoin");
- job.setJarByClass(ReduceSideJoin.class);
- job.setMapperClass(MapClass.class);
- job.setReducerClass(ReduceClass.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setInputFormatClass(KeyTaggedValueTextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(TextTaggedValue.class);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
Reducer-side join implementation, based on secondary ordering of tagged keys
67 lines ...
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.io.WritableComparable;
- public abstract class TaggedKey<T extends WritableComparable,K extends WritableComparable>
- implements WritableComparable {
- // An implementation of value with tag, as a writable object
- protected T tag;
- protected K key;
- TaggedKey() {
- init();
- }
- TaggedKey(T tag) {
- this.tag = tag;
- this.key = null;
- }
- TaggedKey(T tag,K key) {
- this.tag = tag;
- this.key = key;
- }
- protected abstract void init();
- @Override
- public void readFields(DataInput data) throws IOException {
- tag.readFields(data);
- key.readFields(data);
- }
- @Override
- public void write(DataOutput data) throws IOException {
- tag.write(data);
- key.write(data);
- }
- @Override
- public int compareTo(Object arg0) {
- TaggedKey<T,K> other = (TaggedKey<T,K>)arg0;
- int i = key.compareTo(other.key);
- if (i==0)
- i=tag.compareTo(other.tag);
- return i;
- }
- public String toString() {
- return tag + ":" + key;
- }
- public boolean equals(Object o) {
- TaggedKey<T,K> other = (TaggedKey<T,K>)o;
- return tag.equals(other.tag) && key.equals(other.key);
- }
- public T getTag() { return tag; }
- public K getKey() { return key; }
- public void setKey(K key) { this.key = key; }
- }
21 lines ...
- import org.apache.hadoop.io.Text;
- public class TextTaggedKey extends TaggedKey<Text,Text> {
- public TextTaggedKey() {
- super();
- }
- public TextTaggedKey(Text tag) {
- super(tag);
- }
- public TextTaggedKey(Text tag,Text key) {
- super(tag,key);
- }
- @Override
- public void init() {
- tag = new Text();
- key = new Text();
- }
- }
77 lines ...
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.KeyValueLineRecordReader;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- public class TaggedKeyValueLineRecordReader extends RecordReader<TextTaggedKey,Text> {
- // This record reader parses the input file into pairs of text key, and tagged value text,
- // where the tag is based on the name of the input file
- private KeyValueLineRecordReader lineReader;
- private Text lineKey, lineValue;
- private Text tag;
- private TextTaggedKey key;
- private Text value;
- public static RecordReader<TextTaggedKey,Text> create(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
- RecordReader<TextTaggedKey,Text> ret = new TaggedKeyValueLineRecordReader();
- ret.initialize(split,context);
- return ret;
- }
- private TaggedKeyValueLineRecordReader() throws IOException {
- }
- public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
- org.apache.hadoop.mapreduce.lib.input.FileSplit fs = (org.apache.hadoop.mapreduce.lib.input.FileSplit)split;
- tag = new Text(fs.getPath().getName().split("-")[0]);
- key = new TextTaggedKey(tag);
- value = new Text();
- lineReader = new KeyValueLineRecordReader(context.getConfiguration(),
- new org.apache.hadoop.mapred.FileSplit(fs.getPath(),fs.getStart(),fs.getLength(),fs.getLocations()));
- lineKey = lineReader.createKey();
- lineValue = lineReader.createValue();
- }
- public synchronized boolean nextKeyValue() throws IOException {
- if (!lineReader.next(lineKey, lineValue)) {
- return false;
- }
- key.setKey(new Text(lineKey.toString()));
- value.set(lineValue);
- return true;
- }
- public Text getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
- public TextTaggedKey getCurrentKey() throws IOException, InterruptedException {
- return key;
- }
- public Text createKey() {
- return new Text("");
- }
- public Text createValue() {
- return new Text("");
- }
- public long getPos() throws IOException {
- return lineReader.getPos();
- }
- public float getProgress() throws IOException {
- return lineReader.getProgress();
- }
- public void close() throws IOException {
- lineReader.close();
- }
- }
15 lines ...
- import java.io.IOException;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.InputSplit;
- import org.apache.hadoop.mapreduce.RecordReader;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- public class TaggedKeyValueTextInputFormat extends FileInputFormat<TextTaggedKey,Text> {
- // An InputFormat implementation of an InputFormat of text key and tagged value text
- public RecordReader<TextTaggedKey,Text> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException {
- return TaggedKeyValueLineRecordReader.create(split,context);
- }
- }
102 lines ...
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Map;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Partitioner;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- public class ReduceSideJoinWithSecondaryOrdering {
- // In contrast to ReduceSideJoin, the joined data are written on-the-fly to the context,
- // without aggregating the data in the memory, by using a secondary sort on the tagged keys
- public static class MapClass extends Mapper<TextTaggedKey,Text,TextTaggedKey,Text> {
- // The map gets a tagged key and a value and emits the key and the value
- public void map(TextTaggedKey key, Text value, Context context) throws IOException, InterruptedException {
- context.write(key, value);
- }
- }
- public static class ReduceClass extends Reducer<TextTaggedKey,Text,Text,Text> {
- Text currentTag = new Text("");
- Text currentKey = new Text("");
- List<Text> relation1ValuesList = new LinkedList<Text>();
- boolean writeMode = false;
- public void reduce(TextTaggedKey taggedKey, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- // The reduce gets a tagged key and a set of values
- // In case the first data set (sorted by the tagged key) was completely received,
- // any set of the second dataset is written on-the-fly to the context,
- // by applying the cross product method.
- if (!currentKey.equals(taggedKey.getKey())) {
- relation1ValuesList.clear();
- writeMode = false;
- } else
- writeMode = !currentTag.equals(taggedKey.getTag());
- if (writeMode)
- crossProduct(taggedKey.getKey(),values,context);
- else {
- for (Text value : values)
- relation1ValuesList.add(new Text(value.toString()));
- }
- currentTag = new Text (taggedKey.getTag().toString());
- currentKey = new Text (taggedKey.getKey().toString());
- }
- protected void crossProduct(Text key,Iterable<Text> relation2Values ,Context context) throws IOException, InterruptedException {
- // This specific implementation of the cross product, combine the data of the customers and the orders (
- // of a given costumer id).
- StringBuilder sb = new StringBuilder();
- for (Text value1 : relation1ValuesList) {
- sb.append(",");
- sb.append(value1);
- }
- for (Text relation2Value : relation2Values ) {
- sb.append(",");
- sb.append(relation2Value);
- }
- context.write(key, new Text(sb.toString()));
- }
- }
- public static class PartitionerClass extends Partitioner<TextTaggedKey,Text> {
- // ensure that keys with same key are directed to the same reducer
- public int getPartition(TextTaggedKey key,Text value, int numPartitions) {
- return Math.abs(key.getKey().hashCode()) % numPartitions;
- }
- }
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = new Job(conf, "DataJoin");
- job.setJarByClass(ReduceSideJoinWithSecondaryOrdering.class);
- job.setMapperClass(MapClass.class);
- job.setReducerClass(ReduceClass.class);
- job.setPartitionerClass(PartitionerClass.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setInputFormatClass(TaggedKeyValueTextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setMapOutputKeyClass(TextTaggedKey.class);
- job.setMapOutputValueClass(Text.class);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
Reducer-side join with Hadoop's 0.20.0 DataJoinReducerBase pattern
Based on Hadoop in Action, Chuck Lam, 2010, section 5.2.1
106 lines ...
- import java.io.DataOutput;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.FileOutputFormat;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- // A demonstration of Hadoop's 0.20.0 DataJoinReducerBase pattern
- public class ReduceSideJoinByPattern extends Configured implements Tool {
- public static class MapClass extends DataJoinMapperBase {
- protected Text generateInputTag(String inputFile) {
- String datasource = inputFile.split("-")[0];
- return new Text(datasource);
- }
- protected Text generateGroupKey(TaggedMapOutput aRecord) {
- String line = ((Text) aRecord.getData()).toString();
- String[] tokens = line.split(",");
- String groupKey = tokens[0];
- return new Text(groupKey);
- }
- protected TaggedMapOutput generateTaggedMapOutput(Object value) {
- TaggedWritable retv = new TaggedWritable((Text) value);
- retv.setTag(this.inputTag);
- return retv;
- }
- }
- public static class Reduce extends DataJoinReducerBase {
- protected TaggedMapOutput combine(Object[] tags, Object[] valuesvalues) {
- if (tags.length < 2)
- return null;
- String joinedStr = "";
- for (int i=0; i<values.length; i++) {
- if (i > 0)
- joinedStr += ",";
- TaggedWritable tw = (TaggedWritable) values[i];
- String line = ((Text) tw.getData()).toString();
- String[] tokens = line.split(",", 2);
- joinedStr += tokens[1];
- }
- TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
- retv.setTag((Text) tags[0]);
- return retv;
- }
- }
- public static class TaggedWritable extends TaggedMapOutput {
- private Writable data;
- public TaggedWritable(Writable data) {
- this.tag = new Text("");
- this.data = data;
- }
- public Writable getData() {
- return data;
- }
- public void write(DataOutput out) throws IOException {
- this.tag.write(out);
- this.data.write(out);
- }
- public void readFields(DataInput in) throws IOException {
- this.tag.readFields(in);
- this.data.readFields(in);
- }
- }
- public int run(String[] args) throws Exception {
- Configuration conf = getConf();
- JobConf job = new JobConf(conf, ReduceSideJoinByPattern.class);
- Path in = new Path(args[0]);
- Path out = new Path(args[1]);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, out);
- job.setJobName("DataJoinByPattern");
- job.setMapperClass(MapClass.class);
- job.setReducerClass(Reduce.class);
- job.setInputFormat(TextInputFormat.class);
- job.setOutputFormat(TextOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(TaggedWritable.class);
- job.set("mapred.textoutputformat.separator", ",");
- JobClient.runJob(job);
- return 0;
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(),
- new ReduceSideJoin(),
- args);
- System.exit(res);
- }
- }
Mapper-side join with DistributedCache
Based on Hadoop in Action, Chuck Lam, 2010, section 5.2.2
86 lines ...
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.io.IOException;
- import java.util.Hashtable;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.FileInputFormat;
- import org.apache.hadoop.mapred.FileOutputFormat;
- import org.apache.hadoop.mapred.InputFormat;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.TextOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- // A demostration of Hadoop's DistributedCache tool
- //
- public class MapperSideJoinWithDistributedCache extends Configured implements Tool {
- public static class MapClass extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
- private Hashtable<String, String> joinData = new Hashtable<String, String>();
- @Override
- public void configure(JobConf conf) {
- try {
- Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
- if (cacheFiles != null && cacheFiles.length > 0) {
- String line;
- String[] tokens;
- BufferedReader joinReader = new BufferedReader(new FileReader(cacheFiles[0].toString()));
- try {
- while ((line = joinReader.readLine()) != null) {
- tokens = line.split(",", 2);
- joinData.put(tokens[0], tokens[1]);
- }
- } finally {
- joinReader.close();
- }
- }
- } catch(IOException e) {
- System.err.println("Exception reading DistributedCache: " + e);
- }
- }
- public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
- String joinValue = joinData.get(key);
- if (joinValue != null) {
- output.collect(key,new Text(value.toString() + "," + joinValue));
- }
- }
- }
- public int run(String[] args) throws Exception {
- Configuration conf = getConf();
- JobConf job = new JobConf(conf, MapperSideJoinWithDistributedCache.class);
- DistributedCache.addCacheFile(new Path(args[0]).toUri(), conf);
- Path in = new Path(args[1]);
- Path out = new Path(args[2]);
- FileInputFormat.setInputPaths(job, in);
- FileOutputFormat.setOutputPath(job, out);
- job.setJobName("DataJoin with DistributedCache");
- job.setMapperClass(MapClass.class);
- job.setNumReduceTasks(0);
- job.setInputFormat((Class<? extends InputFormat>) MyKeyValueTextInputFormat.class);
- job.setOutputFormat(TextOutputFormat.class);
- job.set("key.value.separator.in.input.line", ",");
- JobClient.runJob(job);
- return 0;
- }
- public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(new Configuration(),
- new MapperSideJoinWithDistributedCache(),args);
- System.exit(res);
- }
- }