zoukankan      html  css  js  c++  java
  • MapReduce实例

    一、MapReduce 原理
    MapReduce 是一种变成模式,用于大规模的数据集的分布式运算。通俗的将就是会将任务分给不同的机器做完,然后在收集汇总。
    MapReduce有两个核心:Map,Reduce,它们分别单独计算任务,每个机器尽量计算自己hdfs内部的保存信息,Reduce则将计算结果汇总。

    一、WordCount单词统计

    1.1 数据准备test.txt

    hello hadoop
    wille learn hadoop WordCount
    but the hadoop is not easy
    

    1.2 Map程序:

    package com.ice.hadoop.test.wordcount;
    
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
      @Override
      protected void map(LongWritable key, Text value, Context context)
          throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
          context.write(new Text(word), new IntWritable(1));
        }
      }
    }
    

    这里定义了一个mapper类,其中有一个map方法。MapReduce框架每读到一行数据,就会调用一次这个map方法。
    Mapper<LongWritable, Text, Text, IntWritable>其中的4个类型分别是:输入key类型、输入value类型、输出key类型、输出value类型。
    MapReduce框架读到一行数据侯以key value形式传进来,key默认情况下是mr矿机所读到一行文本的起始偏移量(Long类型),value默认情况下是mr框架所读到的一行的数据内容(String类型)。
    输出也是key value形式的,是用户自定义逻辑处理完成后定义的key,用户自己决定用什么作为key,value是用户自定义逻辑处理完成后的value,内容和类型也是用户自己决定。
    此例中,输出key就是word(字符串类型),输出value就是单词数量(整型)。
    这里的数据类型和我们常用的不一样,因为MapReduce程序的输出数据需要在不同机器间传输,所以必须是可序列化的,例如Long类型,Hadoop中定义了自己的可序列化类型LongWritable,String对应的是Text,int对应的是IntWritable。

    1.3 Reduce程序:

    package com.ice.hadoop.test.wordcount;
    
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
      @Override
      protected void reduce(Text key, Iterable<IntWritable> values, Context context)
          throws IOException, InterruptedException {
        Integer count = 0;
        for (IntWritable value : values) {
          count += value.get();
        }
        context.write(key, new IntWritable(count));
      }
    }
    

    这里定义了一个Reducer类和一个reduce方法。当传给reduce方法时,就变为:Reducer<Text, IntWritable, Text, IntWritable> 4个类型分别指:输入key的类型、输入value的类型、输出key的类型、输出value的类型。
    需要注意,reduce方法接收的是:一个字符串类型的key、一个可迭代的数据集。因为reduce任务读取到map任务处理结果是这样的:
    (good,1)(good,1)(good,1)(good,1)
    当传给reduce方法时,就变为:
    key:good
    value:(1,1,1,1)
    所以,reduce方法接收到的是同一个key的一组value。

    1.4 Main程序

    package com.ice.hadoop.test.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCountMapReduce {
    
      public static void main(String[] args) throws Exception {
        //创建配置对象
        Configuration conf = new Configuration();
        //创建Job对象
        Job job = Job.getInstance(conf, "wordCount");
        //设置mapper类
        job.setMapperClass(WordcountMapper.class);
        //设置 Reduce类
        job.setReducerClass(WordCountReducer.class);
    
        //设置运行job类
        job.setJarByClass(WordCountMapReduce.class);
    
        //设置map输出的key,value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置reduce输出的key,value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
    
        //设置输入路径金额输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //提交job
        boolean b = job.waitForCompletion(true);
    
        if (!b){
          System.out.println("word count failed!");
        }
      }
    }
    

    编译打包后:

    hdfs dfs -mkdir -p /wordcount/input
    hdfs dfs -put test.txt /wordcount/input
    

    执行wordcount jar
    hadoop jar mapreduce-wordcount-0.0.1-SNAPSHOT.jar com/ice/hadoop/test/wordcount/WordCountMapReduce /wordcount/input /wordcount/output

    执行完成后验证
    hdfs dfs -cat /wordcount/output/*

    二、hadoop 序列化

    hadoop 为什么不使用java序列化
    Hadoop的序列化机制与java的序列化机制不同,它将对象序列化到流中,值得一提的是java的序列化机制是不断的创建对象,但在Hadoop的序列化机制中,用户可以复用对象,这样就减少了java对象的分配和回收,提高了应用效率。

    Hadoop定义了新的序列化接口——writable:

     package org.apache.hadoop.io
    
     import java.io.DataOutput
     import java.io.DataInput
     import java.io.IOException
    
     public interface Writable{
        void write(DataOutput out) throws IOException;
        void readFields(DataInput in) throws IOException;
     }
    

    通过实现 Writable 接口,完成序列化与反序列化。

    但更多的时候,Hadoop要求同时实现序列化与可对比性,因此更常见的情况下需要实现的是 WritableComparable 接口。同时给出默认的构造函数供 MapReduce 进行实例化。下面给出一个自定义Hadoop可序列化类的示例:

    import java.io.*;
    import org.apache.hadoop.io.*;
    public class TextPair implements WritableComparable<TextPair> {
    
      private Text first;
      private Text second;
      public TextPair() {
        set(new Text(), new Text());
      }
    
      public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
      }
    
      public TextPair(Text first, Text second) {
        set(first, second);
      }
    
      public void set(Text first, Text second) {
        this.first = first;
        this.second = second;
      }
    
      public Text getFirst() {
        return first;
      }
    
      public Text getSecond() {
        return second;
      }
    
      @Override
      public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
      }
    
      @Override
      public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
      }
    
      @Override
      public int hashCode() {
        return first.hashCode() * 163 + second.hashCode();
      }
    
      @Override
      public boolean equals(Object o) {
        if (o instanceof TextPair) {
          TextPair tp = (TextPair) o;
          return first.equals(tp.first) && second.equals(tp.second);
        }
        return false;
      }
    
      @Override
      public String toString() {
        return first + "	" + second;
      }
    
      @Override
      public int compareTo(TextPair tp) {
        int cmp = first.compareTo(tp.first);
        if (cmp != 0) {
          return cmp;
        }
        return second.compareTo(tp.second);
      }
    }
    

    2.1 需求与实现思路

    需要统计手机用户流量日志,日志内容实例

    手机号 上行流量 下行流量
    1252548225 200 1100
    1345858685 300 1200
    1862538225 400 1300
    1545858645 100 300
    1502236225 500 1300
    1362858685 300 1100

    要把同一个用户的上行流量、下行流量进行累加,并计算出综合。

    例如上面的13897230503有两条记录,就要对这两条记录进行累加,计算总和,得到:

    13897230503,500,1600,2100

    2.2 实现思路

    • map
      接收日志的一行数据,key为行的偏移量,value为此行数据。
      输出时,应以手机号为key,value应为一个整体,包括:上行流量、下行流量、总流量。
      手机号是字符串类型Text,而这个整体不能用基本数据类型表示,需要我们自定义一个bean对象,并且要实现可序列化。
      key: 13897230503
      value: < upFlow:100, dFlow:300, sumFlow:400 >
    • reduce
      接收一个手机号标识的key,及这个手机号对应的bean对象集合

    例如:
    key:
    13897230503
    value:
    < upFlow:400, dFlow:1300, sumFlow:1700 >,
    < upFlow:100, dFlow:300, sumFlow:400 >
    迭代bean对象集合,累加各项,形成一个新的bean对象,例如:
    < upFlow:400+100, dFlow:1300+300, sumFlow:1700+400 >
    最后输出:
    key: 13897230503
    value: < upFlow:500, dFlow:1600, sumFlow:2100 >

    2.3 map程序

    创建实体并实现Writable

    package com.ice.hadoop.test.flowbean;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import org.apache.hadoop.io.Writable;
    
    /**
     * @author:ice
     * @Date: 2019/2/22 0022
     */
    public class FlowBean implements Writable {
    
      private long upFlow;
      private long dFlow;
      private long sumFlow;
    
      public FlowBean() {
      }
    
      public FlowBean(long upFlow, long dFlow) {
        this.upFlow = upFlow;
        this.dFlow = dFlow;
        this.sumFlow = upFlow + dFlow;
      }
    
      @Override
      public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);//wirte写入的顺序与read读取顺序
        out.writeLong(dFlow);
        out.writeLong(sumFlow);
      }
    
      @Override
      public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dFlow = in.readLong();
        sumFlow = in.readLong();
      }
    
      public long getUpFlow() {
        return upFlow;
      }
    
      public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
      }
    
      public long getdFlow() {
        return dFlow;
      }
    
      public void setdFlow(long dFlow) {
        this.dFlow = dFlow;
      }
    
      public long getSumFlow() {
        return sumFlow;
      }
    
      public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
      }
    
      @Override
      public String toString() {
        return "FlowBean{" +
            "upFlow=" + upFlow +
            ", dFlow=" + dFlow +
            ", sumFlow=" + sumFlow +
            '}';
      }
    }
    

    MapReduce程序:

    package com.ice.hadoop.test.flowbean;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * @author:ice
     * @Date: 2019/2/22 0022
     */
    public class FlowCount {
    
      static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
          String line = value.toString();
          String[] fields = line.split(" ");
          String phone = fields[0];
          Long upFlow = Long.parseLong(fields[1]);
          Long dFlow = Long.parseLong(fields[2]);
          context.write(new Text(phone), new FlowBean(upFlow, dFlow));
        }
      }
    
      static class FlowCountReduce extends Reducer<Text, FlowBean, Text, FlowBean> {
    
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context)
            throws IOException, InterruptedException {
    
          long sumUpFlow = 0L;
          long sumDFlow = 0L;
          for (FlowBean bean : values) {
            sumUpFlow += bean.getUpFlow();
            sumDFlow += bean.getdFlow();
          }
    
          FlowBean sumBean = new FlowBean(sumUpFlow, sumDFlow);
          context.write(key, sumBean);
        }
      }
    
      public static void main(String[] args) throws Exception {
        //创建配置对象
        Configuration conf = new Configuration();
        //创建Job对象
        Job job = Job.getInstance(conf, "FlowCount");
        //设置mapper类
        job.setMapperClass(FlowCountMapper.class);
        //设置 Reduce类
        job.setReducerClass(FlowCountReduce.class);
    
        //设置运行job类
        job.setJarByClass(FlowCount.class);
    
        //设置map输出的key,value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        //设置reduce输出的key,value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
    
        //设置输入路径金额输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //提交job
        boolean b = job.waitForCompletion(true);
    
        System.exit(b ? 0 : 1);
      }
    
    }
    

    编译打包步骤是一样的。

    三、合并小文件

    为什么要合并小文件,因为mapReduce会将每一个小文件都当做一个任务,当特别多的小文件时,导致创建非常多的任务从而效率损耗

    如何实现:文件的读取有map负责,为了将小文件合并,需要使用Inputformat,RecordReader,RecordReader负责实现一次读取一个完整文件封装为key value,map接收到文件内容,然后以文件名为key,以文件内容为value,向外输出的格式要注意,要使用SequenceFileOutPutFormat(用来输出对象)。

    因为reduce收到的key value都是对象,不是普通的文本,reduce默认的输出格式是TextOutputFormat,使用它的话,最终输出的内容就是对象ID,所以要使用SequenceFileOutPutFormat进行输出

    3.1 代码实践

    package com.ice.hadoop.test.mergefile;
    
    import java.io.IOException;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.ByteWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    
    public class MyInputFormat extends FileInputFormat<NullWritable, ByteWritable> {
    
      @Override
      public org.apache.hadoop.mapreduce.RecordReader<NullWritable, ByteWritable> createRecordReader(
          org.apache.hadoop.mapreduce.InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
          throws IOException, InterruptedException {
        MyRecordReader reader = new MyRecordReader();
        reader.initialize(inputSplit, taskAttemptContext);
        return null;
      }
    
      @Override
      protected boolean isSplitable(JobContext context, Path filename) {
        //设置每个小文件不可分割,保证一个小文件生成一个key-value键值对
        return false;
      }
    }
    
    

    createRecordReader方法中创建一个自定义的reader

    package com.ice.hadoop.test.mergefile;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class MyRecordReader extends RecordReader<NullWritable, BytesWritable> {
    
      private FileSplit fileSplit;
      private Configuration conf;
      private BytesWritable value = new BytesWritable();
      private boolean processed = false;
    
      @Override
      public void initialize(InputSplit inputSplit, TaskAttemptContext context)
          throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) inputSplit;
        this.conf = context.getConfiguration();
      }
    
      @Override
      public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
          byte[] contents = new byte[(int) fileSplit.getLength()];
          Path file = fileSplit.getPath();
          FileSystem fs = file.getFileSystem(conf);
          FSDataInputStream in = null;
    
          try {
            in = fs.open(file);
            IOUtils.readFully(in, contents, 0, contents.length);
            value.set(contents, 0, contents.length);
          } finally {
            IOUtils.closeStream(in);
          }
          processed = true;
          return true;
        }
        return false;
      }
    
      @Override
      public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
      }
    
      @Override
      public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
      }
    
      @Override
      public float getProgress() throws IOException, InterruptedException {
        return processed ? 1.0f : 0.0f;
      }
    
      @Override
      public void close() throws IOException {
    
      }
    }
    
    

    其中有3个核心方法:nextKeyValue、getCurrentKey、getCurrentValue。

    nextKeyValue负责生成要传递给map方法的key和value。getCurrentKey、getCurrentValue是实际获取key和value的。所以RecordReader的核心机制就是:通过nextKeyValue生成key value,然后通过getCurrentKey和getCurrentValue来返回上面构造好的key value。这里的nextKeyValue负责把整个文件内容作为value。

    MapReduce程序:

    package com.ice.hadoop.test.mergefile;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    
    /**
     * @author:ice
     * @Date: 2019/2/22 0022
     */
    public class ManyToOne {
    
      static class FileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    
        private Text fileNameKey;
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
          InputSplit split = context.getInputSplit();
          Path path = ((FileSplit) split).getPath();
          fileNameKey = new Text(path.toString());
        }
    
        @Override
        protected void map(NullWritable key, BytesWritable value, Context context)
            throws IOException, InterruptedException {
          context.write(fileNameKey, value);
        }
      }
    
      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(ManyToOne.class);
    
        job.setInputFormatClass(MyInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        job.setMapperClass(FileMapper.class);
    
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
        job.waitForCompletion(true);
      }
    
    }
    
  • 相关阅读:
    dw通过iis运行asp网站总结
    idea 连接mysql报错:Access denied for user 'root'@'localhost'(using password:YES)。
    idea 为模块添加Tomcat依赖 解决: Intelij IDEA 创建WEB项目时没有Servlet的jar包
    IntelliJ IDEA 高效率配置
    idea发布web项目在tomcat位置问题
    idea中HTML格式化时标签缩进问题
    IDEA 安装破解
    python中map()函数的用法讲解
    mac终端下svn常用命令
    SVN常用命令详解
  • 原文地址:https://www.cnblogs.com/skyice/p/10421419.html
Copyright © 2011-2022 走看看