zoukankan      html  css  js  c++  java
  • hadoop二次排序

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.File;
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DataInputBuffer;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    import org.apache.hadoop.io.WritableUtils;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.Lz4Codec;
    //import org.apache.hadoop.io.compress.GzipCodec;
    import org.apache.hadoop.io.compress.SnappyCodec;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    import com.hadoop.compression.lzo.LzoCodec;
    
    // 002484 18.29
    // 600879 12.89
    public class SecondSotrStr {
    public static class StrPair implements WritableComparable<StrPair> {
    private Text first;
    private Text second;
    private Text third;
    private Text fourth;
    
    public StrPair() {
    set(new Text(), new Text(), new Text(), new Text());
    }
    
    public void set(Text left, Text right, Text third, Text fourth) {
    this.first = left;
    this.second = right;
    this.third = third;
    this.fourth = fourth;
    }
    
    public Text getFirst() {
    return first;
    }
    
    public Text getSecond() {
    return second;
    }
    
    public Text getThird() {
    return third;
    }
    
    public Text getFourth() {
    return fourth;
    }
    
    @Override
    public String toString() {
    return first + "	" + second + "	" + third + "	" + fourth;
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
    third.readFields(in);
    fourth.readFields(in);
    }
    
    @Override
    public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
    third.write(out);
    fourth.write(out);
    }
    
    @Override
    public int hashCode() {
    return first.hashCode() * 157 + second.hashCode() * 10
    + third.hashCode();
    }
    
    @Override
    public boolean equals(Object right) {
    if (right instanceof StrPair) {
    StrPair r = (StrPair) right;
    return first.equals(r.first) && second.equals(r.second)
    && third.equals(r.third) && fourth.equals(r.fourth);
    } else {
    return false;
    }
    }
    
    /** A Comparator that compares serialized StrPair. */
    public static class Comparator extends WritableComparator {
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
    
    public Comparator() {
    super(StrPair.class);
    }
    
    // 排序比较器,数据全部存在byte数组
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
    int l2) {
    // 二进制数组读取
    /*
    * try { //System.out.println("--" + b1[s1]); Integer firstL1 =
    * WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1);
    * //String str = readSt // System.out.println("firstL1 = " +
    * firstL1); } catch (IOException e) { // TODO Auto-generated
    * catch block e.printStackTrace(); }
    */
    // int intvalue = readInt(b1, s1);
    /*
    * int third = 0; for(int i =s1 + 9; i<= s1+ 12; i++){ third +=
    * (b1[i]&0xff) << (24-8*i); } System.out.println("third = " +
    * third);
    */
    System.out.println("l1 = " + l1);
    return compareBytes(b1, s1, l1, b2, s2, l2);
    /*
    try {
    int firstl1 = WritableUtils.decodeVIntSize(b1[s1])
    + readVInt(b1, s1);
    int firstl2 = WritableUtils.decodeVIntSize(b2[s2])
    + readVInt(b2, s2);
    int cmp = TEXT_COMPARATOR.compare(b1, s1, firstl1, b2, s2,
    firstl2);
    if (cmp != 0)
    return cmp;
    
    
    int firstl12 = WritableUtils.decodeVIntSize(b1[s1 + firstl1])
    + readVInt(b1 , s1 + firstl1);
    int firstl22 = WritableUtils.decodeVIntSize(b2[s2 + firstl2])
    + readVInt(b2, s2 + firstl2);
    cmp = TEXT_COMPARATOR.compare(b1, s1 + firstl1, firstl12, b2, s2 + firstl2,
    firstl22);
    if (cmp != 0) 
    return cmp;
    
    
    
    
    int firstl13 = WritableUtils.decodeVIntSize(b1[s1+ firstl1 + firstl12])
    + readVInt(b1 , s1 + firstl1 + firstl22);
    int firstl23 = WritableUtils.decodeVIntSize(b2[s2 + firstl2 + firstl22])
    + readVInt(b2, s2 + firstl2 + firstl22);
    cmp = TEXT_COMPARATOR.compare(b1, s1+ firstl1 + firstl12, firstl13, b2, s2 + firstl2 + firstl22,
    firstl23);
    //if (cmp != 0)
    return cmp;
    
    
    
    
    
    return TEXT_COMPARATOR.compare(b1, s1 + firstl1, l1
    - firstl1, b2, s2 + firstl2, l1 - firstl2);
    
    
    } catch (IOException e) {
    throw new IllegalArgumentException(e);
    }
    */
    
    }
    }
    
    static { // register this comparator
    WritableComparator.define(StrPair.class, new Comparator());
    
    }
    
    // @Override
    public int compareTo(StrPair o) {/*
    * if (first != o.first) { return first
    * < o.first ? -1 : 1; } else if (second
    * != o.second) { return second <
    * o.second ? -1 : 1; }// else if (third
    * != o.third) { // return third <
    * o.third ? -1 : 1;}
    * 
    * return 0;
    */
    return 0;
    }
    
    }
    
    /**
    * Partition based on the first part of the pair.
    */
    public static class FirstPartitioner extends Partitioner<StrPair, Text> {
    @Override
    //
    public int getPartition(StrPair key, Text value, int numPartitions) {
    return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
    }
    }
    
    /**
    * Compare only the first part of the pair, so that reduce is called once
    * for each value of the first part.
    */
    
    // 调用这里
    public static class FirstGroupingComparator implements
    RawComparator<StrPair> {
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8,
    b2, s2, Integer.SIZE / 8);
    }
    
    @Override
    public int compare(StrPair o1, StrPair o2) {
    System.out.println("-----group2-----");
    Text l = o1.getFirst();
    Text r = o2.getFirst();
    return l.equals(r) ? 0 : 1;
    // return l == r ? 0 : (l < r ? -1 : 1);
    }
    }
    
    /**
    * Read two integers from each line and generate a key, value pair as
    * ((left, right), right).
    */
    public static class MapClass extends
    Mapper<LongWritable, Text, StrPair, NullWritable> {
    
    private final StrPair key = new StrPair();
    private final IntWritable value = new IntWritable();
    
    private Text left = new Text();
    private Text right = new Text();
    private Text third = new Text();
    private Text fourth = new Text();
    
    @Override
    public void map(LongWritable inKey, Text inValue, Context context)
    throws IOException, InterruptedException {
    System.out.println("value" + inValue.toString());
    StringTokenizer itr = new StringTokenizer(inValue.toString());
    if (itr.hasMoreTokens()) {
    left.set((itr.nextToken()));
    if (itr.hasMoreTokens()) {
    right.set(itr.nextToken());
    
    if (itr.hasMoreTokens()) {
    third.set(itr.nextToken());
    if (itr.hasMoreTokens()) {
    fourth.set(itr.nextToken());
    }
    }
    }
    key.set(left, right, third, fourth);
    // value.set(right);
    context.write(key, NullWritable.get());
    }
    }
    }
    
    /**
    * A reducer class that just emits the sum of the input values.
    */
    public static class Reduce extends
    Reducer<StrPair, NullWritable, Text, NullWritable> {
    private static final Text SEPARATOR = new Text(
    "------------------------------------------------");
    private final Text first = new Text();
    
    @Override
    public void reduce(StrPair key, Iterable<NullWritable> values,
    Context context) throws IOException, InterruptedException {
    // Text outkey = new Text(key.to);
    // context.write(SEPARATOR, null);
    // first.set(Integer.toString(key.getFirst()));
    
    // System.out.println("key1 " + key );
    for (NullWritable value : values) {
    System.out.println("key2 " + key);
    context.write(new Text(key.toString()), NullWritable.get());
    }
    }
    }
    
    private static boolean flag;
    
    public static boolean deleteFile(String sPath) {
    flag = false;
    File file = new File(sPath);
    // 路径为文件且不为空则进行删除
    if (file.isFile() && file.exists()) {
    file.delete();
    flag = true;
    }
    return flag;
    }
    
    public static boolean deleteDirectory(String sPath) {
    // 如果sPath不以文件分隔符结尾,自动添加文件分隔符
    if (!sPath.endsWith(File.separator)) {
    sPath = sPath + File.separator;
    }
    File dirFile = new File(sPath);
    // 如果dir对应的文件不存在,或者不是一个目录,则退出
    if (!dirFile.exists() || !dirFile.isDirectory()) {
    return false;
    }
    flag = true;
    // 删除文件夹下的所有文件(包括子目录)
    File[] files = dirFile.listFiles();
    for (int i = 0; i < files.length; i++) {
    // 删除子文件
    if (files[i].isFile()) {
    flag = deleteFile(files[i].getAbsolutePath());
    if (!flag)
    break;
    } // 删除子目录
    else {
    flag = deleteDirectory(files[i].getAbsolutePath());
    if (!flag)
    break;
    }
    }
    if (!flag)
    return false;
    // 删除当前目录
    if (dirFile.delete()) {
    return true;
    } else {
    return false;
    }
    }
    
    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    /*
    * conf.setBoolean("mapreduce.map.output.compress", true);
    * //conf.setBoolean("mapreduce.output.fileoutputformat.compress",
    * false);
    * conf.setClass("mapreduce.output.fileoutputformat.compress.codec",
    * GzipCodec.class, CompressionCodec.class);
    */
    
    // gzip
    /*
    * conf.setBoolean("mapreduce.map.output.compress", true);
    * conf.setClass("mapreduce.map.output.compression.codec",
    * GzipCodec.class, CompressionCodec.class);
    * conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
    * conf.setClass("mapreduce.output.fileoutputformat.compress.codec",
    * GzipCodec.class, CompressionCodec.class);
    */
    conf.set("mapreduce.map.log.level", "DEBUG");
    
    // snappy
    /*
    * conf.setBoolean("mapreduce.map.output.compress", true);
    * conf.setClass("mapreduce.map.output.compression.codec",
    * SnappyCodec.class, CompressionCodec.class);
    * conf.setBoolean("mapreduce.output.fileoutputformat.compress", false);
    * conf.setClass("mapreduce.output.fileoutputformat.compress.codec",
    * SnappyCodec.class, CompressionCodec.class);
    */
    
    String[] otherArgs = new GenericOptionsParser(conf, args)
    .getRemainingArgs();
    if (otherArgs.length != 2) {
    System.err.println("Usage: SecondSotrStr <in> <out>");
    System.exit(2);
    }
    
    Path outputDir = new Path(otherArgs[1]);
    FileSystem fs = FileSystem.get(conf);
    if (fs.exists(outputDir)) {
    fs.delete(outputDir, true);
    }
    
    Job job = new Job(conf, "secondary sort");
    job.setJarByClass(SecondSotrStr.class);
    job.setMapperClass(MapClass.class);
    job.setReducerClass(Reduce.class);
    
    /*
    * conf.setBoolean("mapred.output.compress", true); //
    * conf.setClass("mapred.output.compression.codec", GzipCodec.class,
    * CompressionCodec.class);
    * conf.setClass("mapred.output.compression.codec", SnappyCodec.class,
    * CompressionCodec.class);
    * 
    * conf.setBoolean("reduce.output.compress", true); //
    * conf.setClass("mapred.output.compression.codec", GzipCodec.class,
    * CompressionCodec.class);
    * conf.setClass("reduce.output.compression.codec", SnappyCodec.class,
    * CompressionCodec.class);
    * 
    * /* conf.setBoolean("mapreduce.output.compress", true);
    * conf.setClass("mapreduce.output.compression.codec", GzipCodec.class,
    * CompressionCodec.class);
    */
    
    // group and partition by the first int in the pair
    job.setPartitionerClass(FirstPartitioner.class);
    job.setGroupingComparatorClass(FirstGroupingComparator.class);
    
    // the map output is StrPair, IntWritable
    job.setMapOutputKeyClass(StrPair.class);
    job.setMapOutputValueClass(NullWritable.class);
    
    // the reduce output is Text, IntWritable
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    
    // lzo
    /*
    * conf.setBoolean("mapreduce.map.output.compress", true);
    * conf.setClass("mapreduce.map.output.compression.codec",
    * LzoCodec.class, CompressionCodec.class);
    * conf.setBoolean("mapreduce.output.fileoutputformat.compress", true);
    * conf.setClass("mapreduce.output.fileoutputformat.compress.codec",
    * LzoCodec.class, CompressionCodec.class);
    */
    // 块压缩
    // job.setOutputFormatClass(SequenceFileOutputFormat.class);
    conf.set("mapred.output.compression.type", "BLOCK");
    
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    
    }
    

      

  • 相关阅读:
    #maven解决乱码问题
    #jquery隐藏和启用
    date类型时间比较大小
    xml<>编译
    Linux分区有损坏修复
    linux部署相关命令
    Java实现4位数吸血鬼数字算法
    Java冒泡算法及中位数算法
    DT梦工厂 第25课 Scala中curring实战详解
    DT梦工厂 第24讲 scala中sam转换实战详解
  • 原文地址:https://www.cnblogs.com/chengxin1982/p/4076544.html
Copyright © 2011-2022 走看看