一、对于二次排序案例部分理解
1. 分析需求(首先对第一个字段排序,然后在对第二个字段排序) 杂乱的原始数据 排序完成的数据 a,1 a,1 b,1 a,2 a,2 [排序] a,100 b,6 ===> b,-3 c,2 b,-2 b,-2 b,1 a,100 b,6 b,-3 c,-7 c,-7 c,2 2. 分析[MapRedice过程] 1> 分析数据传入通过input()传入map() 2> map()对数据进行层层过滤,以达到我们想要的数据源, 3> 过滤方法中可添加自定义计数器 4> 过滤后写入context,转入shuffle阶段 5> 可以说大部分shuffle阶段是map()端的shuffle 6> 具体shullfe中,数据经过默认分区(hashPartitioner),而默认分区规则是获取 (key.getHashCode() & Integer.MAX_VALUE)%numReudeceTasks;当然默认reduce数目就一个, reduce输出的文件也就一个,我是这样认为的,经过输出测试,就算你设置了自定义的分区,但你的partition数目 并没设置,仍然走默认分区 7> 分区之后对是分区的一个排序,再对分区中的数据进行排序,排序规则按照key排序,我们可以自定义数据类型对其 设置排序规则,比如二次排序,可以自定义一个组合的key,在组合key中定义根据第一个字段排序,如果第一个字段 相同,那么再进行对第二个字段排序,以达到二次排序的目的,在分区排序后进入分组阶段也是默认按照key分的, 分组需要实现RawComparator 8> 分组之后是merge个并归排序然后进入reduce,其中分组决定数据进入某个reduce,而分区决定了reduce阶段生成 文件的数目,分组算是shuffle阶段对程序运行的一个优化吧我是这么理解的 3. 分析[二次排序] 1> 从上面的数据可以看出,我们可以自定义一个数据类型,来存放第一个和第二个字段,然后自定义一个比较器来 说明排序规则按照key中的第一个字段进行排序,这里涉及到自定义数据需要实现WritableComparable也可以 分别继承Writable和Comparable,反正越方便越好 2> 接下来看看分区操作,该例只生成一个排好序的文件,不用自定义分区,自定义分区后也不会走该类,自定义分区需要 继承Partitioner,注意是继承,我们自己要重写分区规则 3> 然后是分组操作,分组为优化考虑还是有必要的,我们设计分组规则为按照自定义数据类型的第一个字段进行分组, 分组需要实现RawComparator 4> 考虑哪里还需要优化,根据数据源的数据量,字段是否必在,长度情况, 类型情况,是否使用combine与自定义压缩类,数值为负数等,在比较器中既然定义了根据第二个字段比较,我想也 没必要加个大数减个大数 效果展示: 数据源 map()后 shuffle阶段后 reduce()后 a,1 a#1,1 a#1 [1,2,100] a 1 b,1 b#1,1 b#-3 [-3,-2,1,6] a 2 a,2 a#2,2 c#-7 [-7,2] a 100 b,6 b#6,6 b -3 c,2 c#2,2 b -2 b,-2 b#-2,-2 b 1 a,100 a#100,100 b 6 b,-3 b#-3,-3 c -7 c,-7 c#-7,-7 c 2
二、二次排序示例代码
SSortMr.java ## 主类 ============ package com.bigdata_senior.SSortMr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SSortMr { //Mapper Class private static class SSortMapper extends Mapper<LongWritable, Text, SecondaryWritable, LongWritable>{ private SecondaryWritable mapOutKey = new SecondaryWritable(); private LongWritable mapOutValue = new LongWritable(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String [] strValue = lineValue.split(","); mapOutKey.set(strValue[0],Integer.valueOf(strValue[1])); mapOutValue.set(Integer.valueOf(strValue[1])); context.write(mapOutKey, mapOutValue); System.out.println("key-->"+mapOutKey+" value-->"+mapOutValue); } } //Reduce Class private static class SSortReduce extends Reducer<SecondaryWritable, LongWritable, Text, LongWritable>{ private Text reduceOutKey = new Text(); @Override public void reduce(SecondaryWritable key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException { for(LongWritable value : values){ reduceOutKey.set(key.getFirst()+"#"+key.getSecond()); context.write(reduceOutKey, value); } } } //Driver public int run(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); //job.setNumReduceTasks(3); //input Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job,inPath); //output Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); //mapper job.setMapperClass(SSortMapper.class); job.setMapOutputKeyClass(SecondaryWritable.class); job.setMapOutputValueClass(LongWritable.class); //partitioner //job.setPartitionerClass(SecondaryPartionerCLass.class); //group job.setGroupingComparatorClass(SecondaryGroupClass.class); //Reduce job.setReducerClass(SSortReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //submit job boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[]{ "hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/input", "hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/output13" }; //run job int status = new SSortMr().run(args); System.exit(status); } }
SecondaryWritable.java ## 自定义数据类型 ====================== package com.bigdata_senior.SSortMr; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class SecondaryWritable implements WritableComparable<SecondaryWritable> { private String first; private int second; public SecondaryWritable() {} public SecondaryWritable(String first,int second){ this.set(first, second); } public void set(String fist,int second){ this.first = fist; this.second = second; } public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second ; } public void setSecond(int second) { this.second = second ; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.first); out.writeInt(this.second); } @Override public void readFields(DataInput in) throws IOException { this.first = in.readUTF(); this.second = in.readInt(); } @Override public int compareTo(SecondaryWritable o) { int comp = this.first.compareTo(o.first); if(0 != comp){ return comp; } return Integer.valueOf(this.second).compareTo(Integer.valueOf(o.second)); } @Override public String toString() { return first + "#" + second; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((first == null) ? 0 : first.hashCode()); result = prime * result + second; return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; SecondaryWritable other = (SecondaryWritable) obj; if (first == null) { if (other.first != null) return false; } else if (!first.equals(other.first)) return false; if (second != other.second) return false; return true; } }
SecondaryPartionerCLass.java ## 自定义分区规则(已注释不用) ============================ package com.bigdata_senior.SSortMr; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Partitioner; public class SecondaryPartionerCLass extends Partitioner<SecondaryWritable, LongWritable> { @Override public int getPartition(SecondaryWritable key, LongWritable value, int numPartitions) { return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } }
SecondaryGroupClass.java ## 自定义分组规则 ======================== package com.bigdata_senior.SSortMr; import java.util.Arrays; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableComparator; public class SecondaryGroupClass implements RawComparator<SecondaryWritable> { @Override public int compare(SecondaryWritable o1, SecondaryWritable o2) { System.out.println("o1: "+o1.toString()+" o2: "+o2.toString()); return o1.getFirst().compareTo(o2.getFirst()); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { System.out.println("b1: "+Arrays.toString(b1)+" b2: "+Arrays.toString(b2)); return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4); } }
另外还可以: ## 但这个对于小数据可用,大数据将非常消耗资源 SSortMr2.java ============= package com.bigdata_senior.SSortMr2; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SSortMr2 { //Mapper Class private static class SSortMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ private Text mapOutKey = new Text(); private LongWritable mapOutValue = new LongWritable(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String [] strValue = lineValue.split(","); mapOutKey.set(strValue[0]); mapOutValue.set(Integer.valueOf(strValue[1])); context.write(mapOutKey, mapOutValue); System.out.println("key-->"+mapOutKey+" value-->"+mapOutValue); } } //Reduce Class private static class SSortReduce extends Reducer<Text, LongWritable, Text, Long>{ @Override public void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException { List<Long> longList = new ArrayList<Long>(); for(LongWritable value: values){ longList.add(value.get()); } Collections.sort(longList); for(Long value : longList){ System.out.println("key--> "+key+" value--> "+value); context.write(key, value); } } } //Driver public int run(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); //input Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job,inPath); //output Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); //mapper job.setMapperClass(SSortMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //Reduce job.setReducerClass(SSortReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Long.class); //submit job boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[]{ "hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/input", "hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/sortmr/output22" }; //run job int status = new SSortMr2().run(args); System.exit(status); } }
三、MapReduce join简单理解
1. join(组合) 2. 即两张或两张以上的数据源数据组合输出 3. 由于学了hive,感觉MapReduce的join不再是重点,因为在MapReduce处理 1> 为止join表数目 2> 操作繁琐,过滤多样,可能会考虑不全 3> 资源消耗较重 4. MapReduce的join大致就是将两张表加载进内存,在数据混淆情况下,为其设置自定义数据类型以区分两张表, 然后在reudece()中分别获取表并指定输出结果,当然处理join的方式还有很多,比如setup()加载一张表存进集合处理
四、MapReduce join代码示例
JoinMr.java ## 主类 =========== package com.bigdata_senior.joinMr; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JoinMr { //Mapper Class private static class WordCountMapper extends Mapper<LongWritable, Text, LongWritable, JoinWritable>{ private LongWritable mapoutputkey = new LongWritable(); private JoinWritable mapoutputvalue = new JoinWritable(); @Override protected void setup(Context context) throws IOException, InterruptedException { } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String [] strValue = lineValue.split(","); int length = strValue.length; if(3 != length && 4 != length){ return; } //get cid Long cid = Long.valueOf(strValue[0]); //get cname String cname = strValue[1]; //set customer if(3 == length){ String phone = strValue[2]; mapoutputkey.set(cid); mapoutputvalue.set("customer", cname + "," + phone); } //set order if(4 == length){ String price = strValue[2]; String date = strValue[3]; mapoutputkey.set(cid); mapoutputvalue.set("order", cname +","+price +","+ date); } context.write(mapoutputkey, mapoutputvalue); } } //Reduce Class private static class WordCountReduce extends Reducer<LongWritable, JoinWritable, NullWritable, Text>{ private Text outputValue = new Text(); @Override public void reduce(LongWritable key, Iterable<JoinWritable> values,Context context) throws IOException, InterruptedException { String customerInfo = null; List<String> orderList = new ArrayList<String>(); for(JoinWritable value : values){ if("customer".equals(value.getTag())){ customerInfo = value.getData(); System.out.println(customerInfo); }else if("order".equals(value.getTag())){ orderList.add(value.getData()); } } for(String order: orderList){ outputValue.set(key.get()+","+customerInfo+","+order); context.write(NullWritable.get(), outputValue); } } } //Driver public int run(String[] args) throws Exception { Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); //input Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job,inPath); //output Path outPath = new Path(args[1]); FileOutputFormat.setOutputPath(job, outPath); //mapper job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(JoinWritable.class); //Reduce job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //submit job boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[]{ "hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/join/input", "hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/join/output2" }; //run job int status = new JoinMr().run(args); System.exit(status); } }
JoinWritable.java ## 自定义数据类型 package com.bigdata_senior.joinMr; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class JoinWritable implements Writable { private String tag; private String data; public JoinWritable(){} public JoinWritable(String tag,String data){ this.set(tag, data); } public void set(String tag,String data){ this.setTag(tag); this.setData(data); } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } public String getData() { return data; } public void setData(String data) { this.data = data; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.getTag()); out.writeUTF(this.getData()); } @Override public void readFields(DataInput in) throws IOException { this.setTag(in.readUTF()); this.setData(in.readUTF()); } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((data == null) ? 0 : data.hashCode()); result = prime * result + ((tag == null) ? 0 : tag.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; JoinWritable other = (JoinWritable) obj; if (data == null) { if (other.data != null) return false; } else if (!data.equals(other.data)) return false; if (tag == null) { if (other.tag != null) return false; } else if (!tag.equals(other.tag)) return false; return true; } @Override public String toString() { return tag + "," +data; } }