zoukankan      html  css  js  c++  java
  • MapReduce关系代数运算

    常见关系代数运算包括:选择、投影、并、交、差以及自然连接操作等,都可以十分容易利用MapReduce框架进行并行化计算

    关系R
    NAME SEX AGE
    小明 25
    小红 18
    小张 22
    小米 23
    小丽 21
    小王 19
    小美 25
    小朱 26

    选择操作

    将关系R的数据存储在relationR文件,然后移入HDFS下的data文件夹,如代码1-1

    代码1-1

    root@lejian:/data# cat relationR
    小明 男 25
    小红 女 18
    小张 男 22
    小米 女 23
    小丽 女 21
    小王 男 19
    小美 女 25
    小朱 女 26
    root@lejian:/data# hadoop fs -put selection /data
    root@lejian:/data# hadoop fs -ls -R /data
    -rw-r--r--   1 root supergroup        112 2017-01-07 15:03 /data/relationR
    

    对于关系R的应用条件C,选择性别为女的数据,只需在Map阶段对每个输入的记录进行判断,将满足条件的数据输出即可,输出键值为(key,null)。Reduce阶段无需做额外的工作

    代码1-2

    <?xml version="1.0"?>
    <configuration>
    	<property>
    		<name>sex</name>
    		<value>女</value>
    	</property>
    </configuration>
    

    代码1-3

    package com.hadoop.mapreduce;
    
    public class Person {
    
    	private String name;
    	private String sex;
    	private int age;
    
    	public Person(String line) {
    		super();
    		String[] lines = line.split(" ");
    		this.name = lines[0];
    		this.sex = lines[1];
    		this.age = Integer.parseInt(lines[2]);
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public String getSex() {
    		return sex;
    	}
    
    	public int getAge() {
    		return age;
    	}
    
    	public String getVal(String col) {
    		if ("name".equals(col)) {
    			return name;
    		}
    		if ("sex".equals(col)) {
    			return sex;
    		}
    		return age + "";
    	}
    
    	@Override
    	public String toString() {
    		return name + " " + sex + " " + age;
    	}
    
    }
    

    代码1-4

    package com.hadoop.mapreduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class SelectionMap extends Mapper<LongWritable, Text, Text, NullWritable> {
    
    	private String sex = "";
    	private Text val = new Text();
    
    	protected void setup(Context context) throws java.io.IOException, InterruptedException {
    		Configuration conf = context.getConfiguration();
    		sex = conf.get("sex");
    	};
    
    	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
    		Person person = new Person(value.toString());
    		if (sex.equals(person.getVal("sex"))) {
    			val.set(person.toString());
    			context.write(val, NullWritable.get());
    		}
    	};
    
    }
    

    代码1-5

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Selection {
    
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		if (args == null || args.length != 2) {
    			throw new RuntimeException("请输入输入路径、输出路径");
    		}
    		Configuration conf = new Configuration();
    		conf.addResource("conf.xml");
    		Job job = Job.getInstance(conf);
    		job.setJobName("Selection");
    		job.setMapperClass(SelectionMap.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(NullWritable.class);
    		FileInputFormat.addInputPaths(job, args[0]);
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    
    }
    

    运行代码1-5,运行结果如代码1-6

    代码1-6

    root@lejian:/data# hadoop jar selection.jar com.hadoop.mapreduce.Selection /data /output
    …………
    root@lejian:/data# hadoop fs -ls -R /output
    -rw-r--r--   1 root supergroup          0 2017-01-07 15:05 /output/_SUCCESS
    -rw-r--r--   1 root supergroup         70 2017-01-07 15:05 /output/part-r-00000
    root@lejian:/data# hadoop fs -cat /output/part-r-00000
    小丽 女 21
    小朱 女 26
    小米 女 23
    小红 女 18
    小美 女 25
    

    投影操作

    例如在关系R上应用投影操作获得属性AGE的所有值,我们只需要在Map阶段将每条记录的AGE属性和NullWritable输出,而Reduce端仅获取key即可,注意,此时投影操作具有去重功能

    代码1-7

    package com.hadoop.mapreduce;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class ProjectionMap extends Mapper<LongWritable, Text, IntWritable, NullWritable> {
    
    	private IntWritable age = new IntWritable();
    
    	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
    		Person person = new Person(value.toString());
    		age.set(person.getAge());
    		context.write(age, NullWritable.get());
    	};
    
    }
    

    代码1-8

    package com.hadoop.mapreduce;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class ProjectionReduce extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> {
    
    	protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws java.io.IOException, InterruptedException {
    		context.write(key, NullWritable.get());
    	};
    
    }
    

    代码1-9

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Projection {
    
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		if (args == null || args.length != 2) {
    			throw new RuntimeException("请输入输入路径、输出路径");
    		}
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf);
    		job.setJobName("Projection");
    		job.setMapperClass(ProjectionMap.class);
    		job.setReducerClass(ProjectionReduce.class);
    		job.setOutputKeyClass(IntWritable.class);
    		job.setOutputValueClass(NullWritable.class);
    		FileInputFormat.addInputPaths(job, args[0]);
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    
    }
    

    运行代码1-9,运行结果如代码1-10

    代码1-10

    root@lejian:/data# hadoop jar projection.jar com.hadoop.mapreduce.Projection /data /output
    …………
    root@lejian:/data# hadoop fs -ls -R /output
    -rw-r--r--   1 root supergroup          0 2017-01-07 15:52 /output/_SUCCESS
    -rw-r--r--   1 root supergroup         21 2017-01-07 15:52 /output/part-r-00000
    root@lejian:/data# hadoop fs -cat /output/part-r-00000
    18
    19
    21
    22
    23
    25
    26
    

    交运算

    如果有一个关系A和关系B为同一个模式,希望得到关系A和关系B的交集,那么在Map阶段对于A和B中的每一条记录r输出(r,1),在Reduce阶段汇总计数,如果计数为2,则将该条记录输出。依旧以Person类为例,这里把Person作为主键,为了使得关系A和关系B相同的Person发送到同一个Reduce节点进行计算,需要对原先代码1-3的Person类进行修改,如代码1-11,MapReduce默认会先调用对象的compareTo方法进行对象间的比较,如果对象相等,再比较其hashCode,如果hashCode相等,则认为这两个对象为同一个对象

    修改代码1-3的Person类为代码1-11

    代码1-11

    package com.hadoop.mapreduce;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class Person implements WritableComparable<Person> {
    
    	private String name;
    	private String sex;
    	private int age;
    
    	public Person() {
    		super();
    		// TODO Auto-generated constructor stub
    	}
    
    	public Person(String line) {
    		super();
    		String[] lines = line.split(" ");
    		this.name = lines[0];
    		this.sex = lines[1];
    		this.age = Integer.parseInt(lines[2]);
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public String getSex() {
    		return sex;
    	}
    
    	public int getAge() {
    		return age;
    	}
    
    	public String getVal(String col) {
    		if ("name".equals(col)) {
    			return name;
    		}
    		if ("sex".equals(col)) {
    			return sex;
    		}
    		return age + "";
    	}
    
    	@Override
    	public String toString() {
    		return name + " " + sex + " " + age;
    	}
    
    	@Override
    	public int hashCode() {
    		int res = 20;
    		res = name.hashCode() + 10 * res;
    		res = sex.hashCode() + 10 * res;
    		res = age + 10 * res;
    		return res;
    	}
    
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(name);
    		out.writeUTF(sex);
    		out.writeInt(age);
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		name = in.readUTF();
    		sex = in.readUTF();
    		age = in.readInt();
    	}
    
    	@Override
    	public int compareTo(Person o) {
    		// TODO Auto-generated method stub
    		if (hashCode() > o.hashCode()) {
    			return 1;
    		}
    		if (hashCode() < o.hashCode()) {
    			return -1;
    		}
    		return 0;
    	}
    
    	public static void main(String[] args) {
    		System.out.println(new Person("Lily female 22").hashCode());
    	}
    
    }
    

     将关系A和关系B移入HDFS下的data文件夹,如代码1-12

    root@lejian:/data# cat relationA 
    Tom male 21
    Amy female 19
    Daivd male 16
    Lily female 22
    Lucy female 20
    John male 19
    Rose female 19
    Jojo female 26
    root@lejian:/data# cat relationB
    Daivd male 16
    Jack male 15
    Lily female 22
    Lucy female 20
    Tom male 25
    root@lejian:/data# hadoop fs -put relation* /data
    root@lejian:/data# hadoop fs -ls -R /data
    -rw-r--r--   1 root supergroup        113 2017-01-07 20:48 /data/relationA
    -rw-r--r--   1 root supergroup         69 2017-01-07 20:48 /data/relationB
    

    代码1-13

    package com.hadoop.mapreduce;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class IntersectionMap extends Mapper<LongWritable, Text, Person, IntWritable> {
    
    	private static final IntWritable ONE = new IntWritable(1);
    
    	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
    		Person person = new Person(value.toString());
    		context.write(person, ONE);
    	};
    
    }
    

    代码1-14

    package com.hadoop.mapreduce;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class IntersectionReduce extends Reducer<Person, IntWritable, Person, NullWritable> {
    	protected void reduce(Person key, Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException {
    		int count = 0;
    		for (IntWritable val : values) {
    			count += val.get();
    		}
    		if (count == 2) {
    			context.write(key, NullWritable.get());
    		}
    	};
    }
    

    代码1-15

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Intersection {
    
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		if (args == null || args.length != 2) {
    			throw new RuntimeException("请输入输入路径、输出路径");
    		}
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf);
    		job.setJobName("Intersection");
    		job.setJarByClass(Intersection.class);
    		
    		job.setMapperClass(IntersectionMap.class);
    		job.setMapOutputKeyClass(Person.class);
    		job.setMapOutputValueClass(IntWritable.class);
    		
    		job.setReducerClass(IntersectionReduce.class);
    		job.setOutputKeyClass(Person.class);
    		job.setOutputValueClass(NullWritable.class);
    		
    		FileInputFormat.addInputPaths(job, args[0]);
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    
    }
    

    运行代码1-15,运行结果如代码1-16

    代码1-16

    root@lejian:/data# hadoop jar intersection.jar com.hadoop.mapreduce.Intersection /data /output
    …………
    root@lejian:/data# hadoop fs -ls -R /output
    -rw-r--r--   1 root supergroup          0 2017-01-07 20:30 /output/_SUCCESS
    -rw-r--r--   1 root supergroup         44 2017-01-07 20:30 /output/part-r-00000
    root@lejian:/data# hadoop fs -cat /output/part-r-00000
    Daivd male 12
    Lily female 22
    Lucy female 20
    

    差运算

    计算关系A-关系B的差集,即找出在关系A中存在而在关系B中不存在的记录,在Map阶段,对于关系A和关系B中每一条记录r输出键值对(r,A),(r,B),在Reduce阶段检查每一条记录r和其对应的关系名称,只有关系名称只存在A,才输出记录

    先显示HDFS中data文件夹下得relationA和relationB的文件内容,如代码1-17

    代码1-17

    root@lejian:/data# hadoop fs -ls -R /data
    -rw-r--r--   1 root supergroup        113 2017-01-07 20:48 /data/relationA
    -rw-r--r--   1 root supergroup         69 2017-01-07 20:48 /data/relationB
    root@lejian:/data# hadoop fs -cat /data/relationA
    Tom male 21
    Amy female 19
    Daivd male 16
    Lily female 22
    Lucy female 20
    John male 19
    Rose female 19
    Jojo female 26
    root@lejian:/data# hadoop fs -cat /data/relationB
    Daivd male 16
    Jack male 15
    Lily female 22
    Lucy female 20
    Tom male 25
    

    代码1-18

    package com.hadoop.mapreduce;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class DifferenceMap extends Mapper<LongWritable, Text, Person, Text> {
    
    	private Text relationName = new Text();
    
    	protected void setup(Context context) throws java.io.IOException, InterruptedException {
    		FileSplit fileSplit = (FileSplit) context.getInputSplit();
    		relationName.set(fileSplit.getPath().getName());
    	};
    
    	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
    		Person person = new Person(value.toString());
    		context.write(person, relationName);
    	};
    
    }
    

    代码1-19

    package com.hadoop.mapreduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class DifferenceReduce extends Reducer<Person, Text, Person, NullWritable> {
    
    	private String remove = "";
    
    	protected void setup(Context context) throws java.io.IOException, InterruptedException {
    		Configuration conf = context.getConfiguration();
    		remove = conf.get("remove");
    	};
    
    	protected void reduce(Person key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException {
    		for (Text val : values) {
    			if (remove.equals(val.toString())) {
    				return;
    			}
    		}
    		context.write(key, NullWritable.get());
    	};
    
    }
    

    代码1-20

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Difference {
    
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		if (args == null || args.length != 3) {
    			throw new RuntimeException("请输入输入路径、输出路径和被减集合");
    		}
    		Configuration conf = new Configuration();
    		conf.set("remove", args[2]);
    		Job job = Job.getInstance(conf);
    		job.setJobName("Difference");
    		job.setJarByClass(Difference.class);
    
    		job.setMapperClass(DifferenceMap.class);
    		job.setMapOutputKeyClass(Person.class);
    		job.setMapOutputValueClass(Text.class);
    
    		job.setReducerClass(DifferenceReduce.class);
    		job.setOutputKeyClass(Person.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		FileInputFormat.addInputPaths(job, args[0]);
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    
    }
    

    运行代码1-20,运行结果如代码1-21

    代码1-21

    root@lejian:/data# hadoop jar difference.jar com.hadoop.mapreduce.Difference /data /output relationB
    …………
    root@lejian:/data# hadoop fs -ls -R /output
    -rw-r--r--   1 root supergroup          0 2017-01-08 08:59 /output/_SUCCESS
    -rw-r--r--   1 root supergroup         69 2017-01-08 08:59 /output/part-r-00000
    root@lejian:/data# hadoop fs -cat /output/part-r-00000
    Tom male 21
    Amy female 19
    John male 19
    Jojo female 26
    Rose female 19
    

    自然连接

    如代码1-22,student集合的第一列是id,第二列是姓名,第三列是性别,第四列是年龄,grade集合第一列是id,第二列是科目,第三列是科目成绩,需要对student集合和grade集合做自然连接。在Map阶段将student和grade中每一条记录r作为value,而记录中的id作为key输出。在Reduce阶段则将同一键收集而来的数据根据它们的来源(student或grade)做笛卡尔积然后将结果输出

    代码1-22中,将student集合和grade集合存储在HDFS下的data文件夹中

    代码1-22

    root@lejian:/data# cat student 
    1 Amy female 18
    2 Tom male 19
    3 Sam male 21
    4 John male 19
    5 Lily female 21
    6 Rose female 20
    root@lejian:/data# cat grade 
    1 Math 89
    2 Math 75
    4 English 85
    3 English 95
    5 Math 91
    5 English 88
    6 Math 78
    6 English 99
    2 English 80
    root@lejian:/data# hadoop fs -put student /data
    root@lejian:/data# hadoop fs -put grade /data
    root@lejian:/data# hadoop fs -ls -R /data
    -rw-r--r--   1 root supergroup        105 2017-01-08 09:59 /data/grade
    -rw-r--r--   1 root supergroup         93 2017-01-08 09:59 /data/student
    

    代码1-23

    package com.hadoop.mapreduce;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class NaturalJoinMap extends Mapper<LongWritable, Text, IntWritable, Text> {
    
    	private String fileName = "";
    	private Text val = new Text();
    	private IntWritable stuKey = new IntWritable();
    
    	protected void setup(Context context) throws java.io.IOException, InterruptedException {
    		FileSplit fileSplit = (FileSplit) context.getInputSplit();
    		fileName = fileSplit.getPath().getName();
    	};
    
    	protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {
    		String[] arr = value.toString().split(" ");
    		stuKey.set(Integer.parseInt(arr[0]));
    		val.set(fileName + " " + value.toString());
    		context.write(stuKey, val);
    	};
    
    }
    

    代码1-24

    package com.hadoop.mapreduce;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class NaturalJoinReduce extends Reducer<IntWritable, Text, Text, NullWritable> {
    
    	private Text student = new Text();
    	private Text value = new Text();
    
    	protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException {
    		List<String> grades = new ArrayList<String>();
    		for (Text val : values) {
    			if (val.toString().contains("student")) {
    				student.set(studentStr(val.toString()));
    			} else {
    				grades.add(gradeStr(val.toString()));
    			}
    		}
    		for (String grade : grades) {
    			value.set(student.toString() + grade);
    			context.write(value, NullWritable.get());
    		}
    	};
    
    	private String studentStr(String line) {
    		String[] arr = line.split(" ");
    		StringBuilder str = new StringBuilder();
    		for (int i = 1; i < arr.length; i++) {
    			str.append(arr[i] + " ");
    		}
    		return str.toString();
    	}
    
    	private String gradeStr(String line) {
    		String[] arr = line.split(" ");
    		StringBuilder str = new StringBuilder();
    		for (int i = 2; i < arr.length; i++) {
    			str.append(arr[i] + " ");
    		}
    		return str.toString();
    	}
    
    }
    

    代码1-25

    package com.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class NaturalJoin {
    
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    		if (args == null || args.length != 2) {
    			throw new RuntimeException("请输入输入路径、输出路径");
    		}
    		Configuration conf = new Configuration();
    		Job job = Job.getInstance(conf);
    		job.setJobName("NaturalJoin");
    		job.setJarByClass(NaturalJoin.class);
    
    		job.setMapperClass(NaturalJoinMap.class);
    		job.setMapOutputKeyClass(IntWritable.class);
    		job.setMapOutputValueClass(Text.class);
    
    		job.setReducerClass(NaturalJoinReduce.class);
    		job.setOutputKeyClass(IntWritable.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		FileInputFormat.addInputPaths(job, args[0]);
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		System.exit(job.waitForCompletion(true) ? 0 : 1);
    	}
    
    }
    

    运行代码1-25,运行结果如代码1-26

    代码1-26

    root@lejian:/data# hadoop jar naturalJoin.jar com.hadoop.mapreduce.NaturalJoin /data /output
    …………
    root@lejian:/data# hadoop fs -ls -R /output
    -rw-r--r--   1 root supergroup          0 2017-01-08 11:19 /output/_SUCCESS
    -rw-r--r--   1 root supergroup        237 2017-01-08 11:19 /output/part-r-00000
    root@lejian:/data# hadoop fs -cat /output/part-r-00000
    1 Amy female 18 Math 89 
    2 Tom male 19 English 80 
    2 Tom male 19 Math 75 
    3 Sam male 21 English 95 
    4 John male 19 English 85 
    5 Lily female 21 English 88 
    5 Lily female 21 Math 91 
    6 Rose female 20 English 99 
    6 Rose female 20 Math 78 
    
  • 相关阅读:
    10个超实用的PHP代码片段
    MySQL支撑百万级流量高并发的网站部署详解
    程序员总结:帮助你早些明白一些道理
    50个最常用的UNIX / Linux命令(结合实例)
    php.ini 核心配置选项说明
    智能指针的死穴 循环引用
    滥用vector带来的瓶颈
    JS——层的遮罩效果
    【趣】无广告看视频
    【SQLServer】远程访问数据库进行配置
  • 原文地址:https://www.cnblogs.com/baoliyan/p/6259278.html
Copyright © 2011-2022 走看看