import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.Lz4Codec; //import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; import com.hadoop.compression.lzo.LzoCodec; // 002484 18.29 // 600879 12.89 public class SecondSotrStr { public static class StrPair implements WritableComparable<StrPair> { private Text first; private Text second; private Text third; private Text fourth; public StrPair() { set(new Text(), new Text(), new Text(), new Text()); } public void set(Text left, Text right, Text third, Text fourth) { this.first = left; this.second = right; this.third = third; this.fourth = fourth; } public Text getFirst() { return first; } public Text getSecond() { return second; } public Text getThird() { return third; } public Text getFourth() { return fourth; } @Override public String toString() { return first + " " + second + " " + third + " " + fourth; } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); third.readFields(in); fourth.readFields(in); } @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); third.write(out); fourth.write(out); } @Override public int hashCode() { return first.hashCode() * 157 + second.hashCode() * 10 + third.hashCode(); } @Override public boolean equals(Object right) { if (right instanceof StrPair) { StrPair r = (StrPair) right; return first.equals(r.first) && second.equals(r.second) && third.equals(r.third) && fourth.equals(r.fourth); } else { return false; } } /** A Comparator that compares serialized StrPair. */ public static class Comparator extends WritableComparator { private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator(); public Comparator() { super(StrPair.class); } // 排序比较器,数据全部存在byte数组 public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { // 二进制数组读取 /* * try { //System.out.println("--" + b1[s1]); Integer firstL1 = * WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); * //String str = readSt // System.out.println("firstL1 = " + * firstL1); } catch (IOException e) { // TODO Auto-generated * catch block e.printStackTrace(); } */ // int intvalue = readInt(b1, s1); /* * int third = 0; for(int i =s1 + 9; i<= s1+ 12; i++){ third += * (b1[i]&0xff) << (24-8*i); } System.out.println("third = " + * third); */ System.out.println("l1 = " + l1); return compareBytes(b1, s1, l1, b2, s2, l2); /* try { int firstl1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstl2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); int cmp = TEXT_COMPARATOR.compare(b1, s1, firstl1, b2, s2, firstl2); if (cmp != 0) return cmp; int firstl12 = WritableUtils.decodeVIntSize(b1[s1 + firstl1]) + readVInt(b1 , s1 + firstl1); int firstl22 = WritableUtils.decodeVIntSize(b2[s2 + firstl2]) + readVInt(b2, s2 + firstl2); cmp = TEXT_COMPARATOR.compare(b1, s1 + firstl1, firstl12, b2, s2 + firstl2, firstl22); if (cmp != 0) return cmp; int firstl13 = WritableUtils.decodeVIntSize(b1[s1+ firstl1 + firstl12]) + readVInt(b1 , s1 + firstl1 + firstl22); int firstl23 = WritableUtils.decodeVIntSize(b2[s2 + firstl2 + firstl22]) + readVInt(b2, s2 + firstl2 + firstl22); cmp = TEXT_COMPARATOR.compare(b1, s1+ firstl1 + firstl12, firstl13, b2, s2 + firstl2 + firstl22, firstl23); //if (cmp != 0) return cmp; return TEXT_COMPARATOR.compare(b1, s1 + firstl1, l1 - firstl1, b2, s2 + firstl2, l1 - firstl2); } catch (IOException e) { throw new IllegalArgumentException(e); } */ } } static { // register this comparator WritableComparator.define(StrPair.class, new Comparator()); } // @Override public int compareTo(StrPair o) {/* * if (first != o.first) { return first * < o.first ? -1 : 1; } else if (second * != o.second) { return second < * o.second ? -1 : 1; }// else if (third * != o.third) { // return third < * o.third ? -1 : 1;} * * return 0; */ return 0; } } /** * Partition based on the first part of the pair. */ public static class FirstPartitioner extends Partitioner<StrPair, Text> { @Override // public int getPartition(StrPair key, Text value, int numPartitions) { return Math.abs(key.getFirst().hashCode() * 127) % numPartitions; } } /** * Compare only the first part of the pair, so that reduce is called once * for each value of the first part. */ // 调用这里 public static class FirstGroupingComparator implements RawComparator<StrPair> { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8); } @Override public int compare(StrPair o1, StrPair o2) { System.out.println("-----group2-----"); Text l = o1.getFirst(); Text r = o2.getFirst(); return l.equals(r) ? 0 : 1; // return l == r ? 0 : (l < r ? -1 : 1); } } /** * Read two integers from each line and generate a key, value pair as * ((left, right), right). */ public static class MapClass extends Mapper<LongWritable, Text, StrPair, NullWritable> { private final StrPair key = new StrPair(); private final IntWritable value = new IntWritable(); private Text left = new Text(); private Text right = new Text(); private Text third = new Text(); private Text fourth = new Text(); @Override public void map(LongWritable inKey, Text inValue, Context context) throws IOException, InterruptedException { System.out.println("value" + inValue.toString()); StringTokenizer itr = new StringTokenizer(inValue.toString()); if (itr.hasMoreTokens()) { left.set((itr.nextToken())); if (itr.hasMoreTokens()) { right.set(itr.nextToken()); if (itr.hasMoreTokens()) { third.set(itr.nextToken()); if (itr.hasMoreTokens()) { fourth.set(itr.nextToken()); } } } key.set(left, right, third, fourth); // value.set(right); context.write(key, NullWritable.get()); } } } /** * A reducer class that just emits the sum of the input values. */ public static class Reduce extends Reducer<StrPair, NullWritable, Text, NullWritable> { private static final Text SEPARATOR = new Text( "------------------------------------------------"); private final Text first = new Text(); @Override public void reduce(StrPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // Text outkey = new Text(key.to); // context.write(SEPARATOR, null); // first.set(Integer.toString(key.getFirst())); // System.out.println("key1 " + key ); for (NullWritable value : values) { System.out.println("key2 " + key); context.write(new Text(key.toString()), NullWritable.get()); } } } private static boolean flag; public static boolean deleteFile(String sPath) { flag = false; File file = new File(sPath); // 路径为文件且不为空则进行删除 if (file.isFile() && file.exists()) { file.delete(); flag = true; } return flag; } public static boolean deleteDirectory(String sPath) { // 如果sPath不以文件分隔符结尾,自动添加文件分隔符 if (!sPath.endsWith(File.separator)) { sPath = sPath + File.separator; } File dirFile = new File(sPath); // 如果dir对应的文件不存在,或者不是一个目录,则退出 if (!dirFile.exists() || !dirFile.isDirectory()) { return false; } flag = true; // 删除文件夹下的所有文件(包括子目录) File[] files = dirFile.listFiles(); for (int i = 0; i < files.length; i++) { // 删除子文件 if (files[i].isFile()) { flag = deleteFile(files[i].getAbsolutePath()); if (!flag) break; } // 删除子目录 else { flag = deleteDirectory(files[i].getAbsolutePath()); if (!flag) break; } } if (!flag) return false; // 删除当前目录 if (dirFile.delete()) { return true; } else { return false; } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); /* * conf.setBoolean("mapreduce.map.output.compress", true); * //conf.setBoolean("mapreduce.output.fileoutputformat.compress", * false); * conf.setClass("mapreduce.output.fileoutputformat.compress.codec", * GzipCodec.class, CompressionCodec.class); */ // gzip /* * conf.setBoolean("mapreduce.map.output.compress", true); * conf.setClass("mapreduce.map.output.compression.codec", * GzipCodec.class, CompressionCodec.class); * conf.setBoolean("mapreduce.output.fileoutputformat.compress", true); * conf.setClass("mapreduce.output.fileoutputformat.compress.codec", * GzipCodec.class, CompressionCodec.class); */ conf.set("mapreduce.map.log.level", "DEBUG"); // snappy /* * conf.setBoolean("mapreduce.map.output.compress", true); * conf.setClass("mapreduce.map.output.compression.codec", * SnappyCodec.class, CompressionCodec.class); * conf.setBoolean("mapreduce.output.fileoutputformat.compress", false); * conf.setClass("mapreduce.output.fileoutputformat.compress.codec", * SnappyCodec.class, CompressionCodec.class); */ String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: SecondSotrStr <in> <out>"); System.exit(2); } Path outputDir = new Path(otherArgs[1]); FileSystem fs = FileSystem.get(conf); if (fs.exists(outputDir)) { fs.delete(outputDir, true); } Job job = new Job(conf, "secondary sort"); job.setJarByClass(SecondSotrStr.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); /* * conf.setBoolean("mapred.output.compress", true); // * conf.setClass("mapred.output.compression.codec", GzipCodec.class, * CompressionCodec.class); * conf.setClass("mapred.output.compression.codec", SnappyCodec.class, * CompressionCodec.class); * * conf.setBoolean("reduce.output.compress", true); // * conf.setClass("mapred.output.compression.codec", GzipCodec.class, * CompressionCodec.class); * conf.setClass("reduce.output.compression.codec", SnappyCodec.class, * CompressionCodec.class); * * /* conf.setBoolean("mapreduce.output.compress", true); * conf.setClass("mapreduce.output.compression.codec", GzipCodec.class, * CompressionCodec.class); */ // group and partition by the first int in the pair job.setPartitionerClass(FirstPartitioner.class); job.setGroupingComparatorClass(FirstGroupingComparator.class); // the map output is StrPair, IntWritable job.setMapOutputKeyClass(StrPair.class); job.setMapOutputValueClass(NullWritable.class); // the reduce output is Text, IntWritable job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // lzo /* * conf.setBoolean("mapreduce.map.output.compress", true); * conf.setClass("mapreduce.map.output.compression.codec", * LzoCodec.class, CompressionCodec.class); * conf.setBoolean("mapreduce.output.fileoutputformat.compress", true); * conf.setClass("mapreduce.output.fileoutputformat.compress.codec", * LzoCodec.class, CompressionCodec.class); */ // 块压缩 // job.setOutputFormatClass(SequenceFileOutputFormat.class); conf.set("mapred.output.compression.type", "BLOCK"); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }