一:准备
1.二测排序
其中1说明了自定义类型
2与3说明了shuffle阶段的分区与分组,同时说明了程序的写法。
2.RawComparator class
3.二次排序的要点
组合key,key是一个组合的字段,自定义数据类型
实现WritableComparable
保证原来的分区不变,自定义分区规则
继承Patitioner
保证原来的分组不变,自定义分组规则
继承RawComparator
4.输入的数据
5.需求
平时的只有一次排序,就是第一个会排序,但是输出的结果中第二个没有排序处理。
现在希望。在第一个key排序之后,后面的key也可以排序出来。
二:第一次排序
3.输出第一排序的程序
MAPPER--------------
REDUCER------------
4.结果
三:二次排序
5.map和reduce程序
6.自定义类型的程序
需要实现接口WritableComparable
输入String,int。
7.自定义分组比较器
需要实现RawComparator
两个函数都是相同的意思,都是在返回first的比较结果。
8.定义分区规则
继承Patitioner
9.运行结果
四:优化点
例如分区就属于优化,但是这里说的是正负数的优化。
五:重新整理
1.项目结构
2.程序代码
RealSecondSort.class
package com.senior.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.senior.network.WebPvCount; import com.senior.network.WebPvCount.WebPvCountMapper; import com.senior.network.WebPvCount.WebPvCountReducer; public class RealSecondSort extends Configured implements Tool{ //Mapper public static class SortMapper extends Mapper<LongWritable,Text,PariWritable,IntWritable>{ private PariWritable mapoutkey=new PariWritable(); private IntWritable mapoutvalue=new IntWritable(); @Override protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException { String valueStr=value.toString(); String strs[]=valueStr.split(","); mapoutkey.set(strs[0],Integer.valueOf(strs[1])); mapoutvalue.set(Integer.valueOf(strs[1])); context.write(mapoutkey, mapoutvalue); } } //Reducer public static class SortReducer extends Reducer<PariWritable,IntWritable,Text,IntWritable>{ private Text outkey=new Text(); @Override protected void reduce(PariWritable key, Iterable<IntWritable> value,Context context)throws IOException, InterruptedException { for(IntWritable str : value){ outkey.set(key.getFirst()); context.write(outkey, str); } } } //Driver public int run(String[] args) throws Exception { Configuration conf=this.getConf(); Job job=Job.getInstance(conf,this.getClass().getSimpleName()); job.setJarByClass(RealSecondSort.class); //input Path inpath=new Path(args[0]); FileInputFormat.addInputPath(job, inpath); //output Path outpath=new Path(args[1]); FileOutputFormat.setOutputPath(job, outpath); //map job.setMapperClass(SortMapper.class); job.setMapOutputKeyClass(PariWritable.class); job.setMapOutputValueClass(IntWritable.class); //shuffle************************************* job.setPartitionerClass(PartitionNum.class); job.setGroupingComparatorClass(GroupingComparator.class); //shuffle************************************* //reduce job.setReducerClass(SortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //submit boolean isSucess=job.waitForCompletion(true); return isSucess?0:1; } //Main public static void main(String[] args) throws Exception{ Configuration conf=new Configuration(); args=new String[]{ "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/inputSortData", "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/outputSortData2" }; int status=ToolRunner.run(new RealSecondSort(), args); System.exit(status); } }
PariWritable.java
这个地方使用的接口可以看看下面的说明。
package com.senior.sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class PariWritable implements WritableComparable<PariWritable>{ private String first; private Integer second; public PariWritable(){} public PariWritable(String first,Integer second){ set(first,second); } //set get public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public Integer getSecond() { return second-Integer.MAX_VALUE; } public void setSecond(Integer second) { this.second = second+Integer.MAX_VALUE; } public void set(String first, Integer second) { this.first=first; this.second=second; } // public void readFields(DataInput input) throws IOException { this.first=input.readUTF(); this.second=input.readInt(); } public void write(DataOutput output) throws IOException { output.writeUTF(first); output.writeInt(second); } public int compareTo(PariWritable o) { int comp=this.first.compareTo(o.getFirst()); if(0!=comp){ return comp; } return Integer.valueOf(getSecond()).compareTo(Integer.valueOf(o.getSecond())); } @Override public String toString() { return "PariWritable [first=" + first + ", second=" + second + "]"; } }
PartitionNum.java
package com.senior.sort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class PartitionNum extends Partitioner<PariWritable, IntWritable> { @Override public int getPartition(PariWritable key, IntWritable value, int num) { return (key.getFirst().hashCode()&Integer.MAX_VALUE)%num; } }
GroupingComparator.java
关于程序中的一点仔细看下面的一个部分,就可以很好的理解了。
package com.senior.sort; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableComparator; public class GroupingComparator implements RawComparator<PariWritable> { public int compare(PariWritable o1, PariWritable o2) { return o1.getFirst().compareTo(o2.getFirst()); } public int compare(byte[] b1, int arg1, int l1, byte[] b2, int arg4,int l2) { return WritableComparator.compareBytes(b1, 0, l1-4, b2, 0, l2-4); } }
3.效果
六:Hadoop的序列化
1.说明
在上面的程序中使用到了序列化,在整理的过程中对这一块进行整理一番。
2.序列化的功能
- 排列紧凑:尽量减少带宽,加快数据交换速度
- 处理快速:进程间通信需要大量的数据交互,使用大量的序列化机制,必须减少序列化和反序列的开支
- 跨语言:可以支持不同语言间的数据交互啊,如C++
- 可扩展:当系统协议升级,类定义发生变化,序列化机制需要支持这些升级和变化
3.Writable
public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }
4.其他接口
public interface RawComparator<T> extends Comparator<T> { public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2); }
class MyGrouper implements RawComparator<StartEndDate> { @Override public int compare(StartEndDate o1, StartEndDate o2) { return (int)(o1.getStartDate().getTime()- o2.getEndDate().getTime()); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int compareBytes = WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8); return compareBytes; } }