zoukankan      html  css  js  c++  java
  • hadoop-ykt(自定义key)

    1.自定义key对象,实现WritableComparable接口

     1 package com.zhangdan.ykt;
     2 
     3 import java.io.DataInput;
     4 import java.io.DataOutput;
     5 import java.io.IOException;
     6 import org.apache.hadoop.io.WritableComparable;
     7 
     8 /**
     9  * 自定义的key类型
    10  * 
    11  * @author vlab
    12  *
    13  */
    14 public class GdzBeans implements WritableComparable<GdzBeans> {
    15     private String key1;
    16     private String key2;
    17 
    18     public GdzBeans() {// 自定义数据类型必须有一个无参的构造方法,为了防止反射
    19 
    20     }
    21 
    22     public GdzBeans(String key1, String key2) {
    23         this.key1 = key1;
    24         this.key2 = key2;
    25     }
    26 
    27     public String getKey1() {
    28         return key1;
    29     }
    30 
    31     public String getKey2() {
    32         return key2;
    33     }
    34 
    35     public void setKey(String key1, String key2) {
    36         this.key1 = key1;
    37         this.key2 = key2;
    38     }
    39 
    40     public void setKey1(String key1) {
    41         this.key1 = key1;
    42     }
    43 
    44     public void setKey2(String key2) {
    45         this.key2 = key2;
    46     }
    47 
    48     @Override
    49     public void readFields(DataInput in) throws IOException {
    50         this.key1 = in.readUTF();
    51         this.key2 = in.readUTF();
    52 
    53     }
    54 
    55     @Override
    56     public void write(DataOutput out) throws IOException {
    57         out.writeUTF(key1);
    58         out.writeUTF(key2);
    59     }
    60 
    61     @Override
    62     public int compareTo(GdzBeans o) {
    63         int cmp1 = key1.compareTo(o.key1);
    64         if (cmp1 != 0) {
    65             return cmp1;
    66         }
    67         int cmp2 = key2.compareTo(o.key2);
    68         return cmp2;
    69     }
    70 
    71     @Override
    72     public boolean equals(Object obj) {
    73         if (obj instanceof GdzBeans) {
    74             GdzBeans pw = (GdzBeans) obj;
    75             return key1.equals(pw.key1) && key2.equals(pw.key2);
    76         }
    77         return false;
    78     }
    79 
    80     @Override
    81     public int hashCode() {
    82         return this.key1.hashCode() * 3 + this.key2.hashCode() * 5;
    83     }
    84 
    85     @Override
    86     public String toString() {
    87         return this.key1 + "," + this.key2;
    88     }
    89 
    90 }
     自定义的key要实现WritableComparable,然后复写对应的几个方法,
    Writable---toString()、equals()、hasCode()
    Comparable---compareTo(),这个是关键,在这里可以对key排序,之前总不明白这个是什么个逻辑,除了很多错误,比较坑.

    2.mapreduce

     1 package com.zhangdan.ykt;
     2 
     3 import java.io.IOException;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.Path;
     7 import org.apache.hadoop.io.IntWritable;
     8 import org.apache.hadoop.io.LongWritable;
     9 import org.apache.hadoop.io.Text;
    10 import org.apache.hadoop.mapreduce.Job;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 import org.apache.hadoop.mapreduce.Reducer;
    13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    17 
    18 public class GetConnectionCount {
    19     private static final String Input_Path = "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid07_result04/connection/part-r-*";
    20     private static final String Output_Path = "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid07_result04/connectioncount";
    21     // private static final String Input_Path =
    22     // "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid06_result09/connection/part-r-*";
    23     // private static final String Output_Path =
    24     // "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid06_result09test/connectioncount";
    25 
    26     public static class MyMap extends Mapper<LongWritable, Text, GdzBeans, IntWritable> {
    27         IntWritable one = new IntWritable(1);
    28 
    29         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    30             String line = value.toString();
    31             // System.out.println(line);
    32             String[] tt;
    33             tt = line.split(",");
    34             context.write(new GdzBeans(tt[0], tt[1]), one);
    35         }
    36     }
    37 
    38     public static class MyReduce extends Reducer<GdzBeans, IntWritable, GdzBeans, IntWritable> {
    39         public void reduce(GdzBeans key, Iterable<IntWritable> values, Context context)
    40                 throws IOException, InterruptedException {
    41             int n = 0;
    42             for (IntWritable i : values) {
    43                 n += i.get();
    44             }
    45             if (key.getKey1() != null && key.getKey2() != null && key.getKey1().trim() != ""
    46                     && key.getKey2().trim() != "")
    47                 context.write(key, new IntWritable(n));
    48         }
    49     }
    50 
    51     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    52         Configuration conf = new Configuration();
    53         conf.set("mapred.textoutputformat.ignoreseparator", "true"); // 去掉key/value对的自动补tab符
    54         conf.set("mapred.textoutputformat.separator", ",");// 设置以“,”为key/value的分隔符
    55         Job job = new Job(conf);
    56         job.setJarByClass(GdzTest.class);
    57         job.setJobName("GetConnectionCount");
    58 
    59         job.setMapperClass(MyMap.class);
    60         job.setMapOutputKeyClass(GdzBeans.class);
    61         job.setMapOutputValueClass(IntWritable.class);
    62 
    63         job.setReducerClass(MyReduce.class);
    64         job.setCombinerClass(MyReduce.class);
    65         job.setOutputKeyClass(GdzBeans.class);
    66         job.setOutputValueClass(IntWritable.class);
    67 
    68         job.setInputFormatClass(TextInputFormat.class);
    69         job.setOutputFormatClass(TextOutputFormat.class);
    70 
    71         FileInputFormat.addInputPath(job, new Path(Input_Path));
    72         FileOutputFormat.setOutputPath(job, new Path(Output_Path));
    73 
    74         System.exit(job.waitForCompletion(true) ? 0 : 1);
    75 
    76     }
    77 }
     这个这里有很多改进,很多之前不理解的也慢慢理解了,但是像这个我想实现二次排序还是没有得到比较满意的解决,
    看了好多大家写的二次排序的代码,但是总是和我这情况有点不一致,复杂的类我还得慢慢适应。
  • 相关阅读:
    osg编译日志
    MFC加载大型osg模型
    osg gdal加载tif数据文件
    osg创建灯光
    ubuntu挂载新硬盘
    MFC加载osg模型
    nginx中Geoip_module模块的使用
    centos中释放缓存的方法
    python连接kafka生产者,消费者脚本
    python初始化环境记录
  • 原文地址:https://www.cnblogs.com/xunyingFree/p/5147595.html
Copyright © 2011-2022 走看看