zoukankan      html  css  js  c++  java
  • Hadoop之MapReduce学习(三)之ip去重、MaxScore示例、TotalScoreMapper示例

    一、IP去重示例

    数据文件:

    192.168.10.111
    192.168.10.111
    10.32.100.111
    192.168.21.111
    192.168.10.112
    192.168.10.111
    192.168.11.111
    192.168.12.112
    192.168.11.111
    IPMapper:
    package com.blb.ip;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class IPMapper
            extends Mapper<LongWritable, Text, Text, NullWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 每一行数据就是一个ip
            context.write(value, NullWritable.get());
        }
    }
    IPReducer:
    package com.blb.ip;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class IPReducer
            extends Reducer<Text, NullWritable, Text, NullWritable> {
    
        // key:ip
        // values:null,null,null...
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
    
            context.write(key, NullWritable.get());
    
        }
    }
    IPDriver:
    package com.blb.ip;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class IPDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(IPDriver.class);
            job.setMapperClass(IPMapper.class);
            job.setReducerClass(IPReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            FileInputFormat.addInputPath(job,
                    new Path("hdfs://192.168.10.131:9000/txt/ip.txt"));
    
            FileOutputFormat.setOutputPath(job,
                    new Path("hdfs://192.168.10.131:9000/result/ip"));
    
            job.waitForCompletion(true);
        }
    }

    二、axScore示例

    数据文件:

    张三 684
    李四 312
    王五 721
    赵六 548
    田七 470
    王八 668
    陈九 340

    MaxScoreMapper:

    package com.blb.maxscore;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class MaxScoreMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    String[] arr = value.toString().split(" ");
    context.write(new Text(arr[0]),
    new IntWritable(Integer.parseInt(arr[1])));
    }
    }

    MaxScoreReducer:

    package com.blb.maxscore;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class MaxScoreReducer
    extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    // 在MapReduce中,为了节省内存,减少对象的创建和销毁,采取了地址复用的机制
    // 即在迭代过程中,对象只创建一次
    IntWritable max = new IntWritable(0);
    // IntWritable max = new IntWritable(0);
    // IntWritable value = new IntWritable();
    // values:684 312 721 548...
    // value.set(684);
    // value.get() > max.get() --- 684 > 0 --- true
    // max = value; 此时max和value都是引用类型,给的是地址
    // 也就意味着max和value的地址一样
    // value.set(312); 此时max的值也是312
    // value.get() > max.get() --- 312 > 312 --- false
    // 继续迭代,max和value的地址始终一致
    // max的值就是迭代的最后一个值
    for (IntWritable value : values) {
    if (value.get() > max.get())
    // max = value;
    max.set(value.get());
    }
    context.write(key, max);
    }
    }

    MaxScoreDriver:

    package com.blb.maxscore;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class MaxScoreDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    job.setJarByClass(MaxScoreDriver.class);
    job.setMapperClass(MaxScoreMapper.class);
    job.setReducerClass(MaxScoreReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job,
    new Path("hdfs://192.168.10.131:9000/txt/score2.txt"));

    FileOutputFormat.setOutputPath(job,
    new Path("hdfs://192.168.10.131:9000/result/maxscore2"));

    job.waitForCompletion(true);
    }
    }

     三、TotalScoreMapper示例

    数据文件:

    张三 78
    李四 66
    王五 73
    张三 88
    田七 75
    张三 65
    陈九 90
    李四 67
    王五 78

    IPMapper:

    package com.blb.maxscore;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class MaxScoreMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    String[] arr = value.toString().split(" ");
    context.write(new Text(arr[0]),
    new IntWritable(Integer.parseInt(arr[1])));
    }
    }

    TotalScoreMapper:

    package com.blb.totalscore;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class TotalScoreMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 拆分名字和分数
    String[] arr = value.toString().split(" ");
    context.write(new Text(arr[0]),
    new IntWritable(Integer.parseInt(arr[1])));
    }
    }

    TotalScoreReducer:

    package com.blb.totalscore;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class TotalScoreReducer
    extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable value : values) {
    sum += value.get();
    }
    context.write(key, new IntWritable(sum));
    }

    }

    TotalScoreDriver:

    package com.blb.totalscore;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class TotalScoreDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);

    job.setJarByClass(TotalScoreDriver.class);
    job.setMapperClass(TotalScoreMapper.class);
    job.setReducerClass(TotalScoreReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job,
    new Path("hdfs://192.168.10.131:9000/txt/score2/"));

    FileOutputFormat.setOutputPath(job,
    new Path("hdfs://192.168.10.131:9000/result/totalscore"));

    job.waitForCompletion(true);
    }
    }

     
  • 相关阅读:
    23种设计模式-----行为模式
    23种设计模式-----创建型模式、结构型模式
    字节码操作、javassist使用
    反射机制(reflection)
    NFC手机
    NFC简介
    不同技术的过滤条件的定义
    [linux] ubuntu系统tips
    图算法(一)
    跳跃表skiplist
  • 原文地址:https://www.cnblogs.com/theyang/p/12709604.html
Copyright © 2011-2022 走看看