zoukankan      html  css  js  c++  java
  • MapReduce实现排序功能

    期间遇到了无法转value的值为int型,我採用try catch解决


    str2 2
    str1 1
    str3 3
    str1 4
    str4 7
    str2 5
    str3 9

    用的 隔开,得到结果 

    str1 1,4 

    str2 2,5

    str3 3,9

    str4 7

    我这里map。reduce都是单独出来的类。用了自己定义的key

    package com.kane.mr;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;


    import org.apache.hadoop.io.WritableComparable;


    import com.j_spaces.obf.fi;
    //str2 2
    //str1 1
    //str3 3
    //str1 4
    //str4 7
    //str2 5
    //str3 9
    public class IntPair implements WritableComparable<IntPair>{
    public String getFirstKey() {
    return firstKey;
    }


    public void setFirstKey(String firstKey) {
    this.firstKey = firstKey;
    }


    public int getSecondKey() {
    return secondKey;
    }


    public void setSecondKey(int secondKey) {
    this.secondKey = secondKey;
    }


    private String firstKey;//str1
    private int secondKey;//1
    @Override
    public void write(DataOutput out) throws IOException {
    out.writeUTF(firstKey);
    out.writeInt(secondKey);

    }


    @Override
    public void readFields(DataInput in) throws IOException {
    firstKey=in.readUTF();
    secondKey=in.readInt();

    }
    //这里做比較,还有一个是自身本类,对key进行排序
    @Override
    public int compareTo(IntPair o) {
    // int first=o.getFirstKey().compareTo(this.firstKey);
    // if (first!=0) {
    // return first;
    // }
    // else {
    // return o.getSecondKey()-this.secondKey;
    // }
    return o.getFirstKey().compareTo(this.getFirstKey());

    }


    }

    package com.kane.mr;


    import java.io.IOException;


    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    public class SortMapper extends Mapper<Object,Text,IntPair,IntWritable>{
    public IntPair intPair=new IntPair();
    public IntWritable intWritable=new IntWritable(0);
    @Override
    protected void map(Object key, Text value,//str1 1
    Context context)
    throws IOException, InterruptedException {
    //String[] values=value.toString().split("/t");
    System.out.println(value);
    int intValue;
    try {
    intValue = Integer.parseInt(value.toString());
    } catch (NumberFormatException e) {
    intValue=6;
    }//不加try catch总是读取value时,无法转成int型

    intPair.setFirstKey(key.toString());
    intPair.setSecondKey(intValue);
    intWritable.set(intValue);
    context.write(intPair, intWritable);// key(str2 2) 2
    }

    }


    package com.kane.mr;


    import java.io.IOException;
    import java.util.Iterator;


    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;


    public class SortReducer extends Reducer<IntPair, IntWritable, Text,Text>{


    @Override
    protected void reduce(IntPair key, Iterable<IntWritable> values,
    Context context)
    throws IOException, InterruptedException {
    StringBuffer combineValue=new StringBuffer();
    Iterator<IntWritable> itr=values.iterator();
    while (itr.hasNext()) {
    int value=itr.next().get();
    combineValue.append(value+",");
    }
    context.write(new Text(key.getFirstKey()),new Text(combineValue.toString()));
    }



    }

    package com.kane.mr;


    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Partitioner;

    public class PartionTest extends Partitioner<IntPair, IntWritable>{

    @Override
    public int getPartition(IntPair key, IntWritable value, int numPartitions) {//reduce个数

    return (key.getFirstKey().hashCode()&Integer.MAX_VALUE%numPartitions);
    }


    }


    package com.kane.mr;


    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;


    public class TextComparator extends WritableComparator{
    public TextComparator(){
    super(IntPair.class,true);
    }


    @Override
    public int compare(WritableComparable a, WritableComparable b) {
    IntPair o1=(IntPair)a;
    IntPair o2=(IntPair)b;
    return o1.getFirstKey().compareTo(o2.getFirstKey());
    }

    }

    package com.kane.mr;


    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    @SuppressWarnings("rawtypes")
    public class TextIntCompartor extends WritableComparator{


    protected TextIntCompartor() {
    super(IntPair.class,true);
    }


    @Override
    public int compare(WritableComparable a,  WritableComparable b) {
    IntPair o1=(IntPair)a;
    IntPair o2=(IntPair)b;
    int first=o1.getFirstKey().compareTo(o2.getFirstKey());
    if (first!=0) {
    return first;
    }
    else {
    return o1.getSecondKey()-o2.getSecondKey();
    }
    }



    }



    package com.kane.mr;


    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;


    public class SortMain {
    public static void main(String[] args) throws Exception{
    Configuration conf = new Configuration();
       String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
       if (otherArgs.length != 2) {
         System.err.println("Usage: wordcount <in> <out>");
         System.exit(2);
       }
       Job job = new Job(conf, "Sort");
       job.setJarByClass(SortMain.class);
       job.setInputFormatClass(KeyValueTextInputFormat.class);//设定输入的格式是key(中间 隔开)value
       job.setMapperClass(SortMapper.class);
       //job.setCombinerClass(IntSumReducer.class);
       job.setReducerClass(SortReducer.class);
       
       job.setMapOutputKeyClass(IntPair.class);
       job.setMapOutputValueClass(IntWritable.class);
       
       job.setSortComparatorClass(TextIntCompartor.class);
       job.setGroupingComparatorClass(TextComparator.class);//以key 进行group by
       job.setPartitionerClass(PartionTest.class);
       
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(Text.class);
       
       FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//输入參数,相应hadoop jar 相应类执行时在后面加的第一个參数
       FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//输出參数
       System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    }

    导出jar包放到hadoop下。然后讲sort.txt放入到hdfs中。然后用hadoop jar KaneTest/sort.jar com.kane.mr.SoetMain /kane/sort.txt /kane/output命令运行


  • 相关阅读:
    GlusterFS 配置及使用
    zabbix-监控Linux服务器
    ansible安装及使用
    使用ansible 完成yum安装lamp环境
    mysql基础
    shell基础
    shell
    Javascript动画效果(四)
    Javascript动画效果(三)
    Javascript动画效果(二)
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/7224191.html
Copyright © 2011-2022 走看看