zoukankan      html  css  js  c++  java
  • 自定义MapReduce方法

    (前期准备:hadoop搭建成功,hdfs和yarn都得配置好)

    1.首先自定义的MapReduce方法要对hdfs中存储的文件进行操作,所以要先在pom.xml中配置包,使maven支持hdfs文件操作

            <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.3</version>
            </dependency>
          <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>
          <dependency>
              <groupId>jdk.tools</groupId>
              <artifactId>jdk.tools</artifactId>
              <version>1.8</version>
              <scope>system</scope>
              <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
          </dependency>

     其中hadoop-common 和hadoop-client 是maven从仓库自动加载,tools需要自己找到本地jdk目录,tools.jar在lib下,只得注意的是hadoop的版本和jdk的版本要注意

    2.自定义MapReduce 方法是用继承的方法重写方法,所以要加载MapReduce的包

          <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.3</version>
            <scope>test</scope>
        </dependency>

    3. 以重写Wordcount为例

      (1)重写map,继承mapper,重写其中的map方法

    
    
    import java.io.IOException;
    
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;


    public
    class MyMap extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { // TODO 自动生成的方法存根 //读行的值 String line = value.toString(); //按空格截取字符串的值 放入数组 String[] words = line.split(" "); for (String word : words) { //trim 去前后空格 写入到reduce 输入的key 和 value context.write(new Text(word.trim()), new IntWritable(1)); } } }

             1)需要注意的是:导包一定要导对!!!!

         ·Mapper对应的是 import org.apache.hadoop.mapreduce.Mapper;

               ·Text 对应的是import org.apache.hadoop.io.Text;

       2)Mapper<KEY,VALUE,KEY,VALUE>前两个是输入的key和value值,后俩个是输出到reduce的key和value值。

                   ·第一个KEY是偏移量,是每一行第一个的偏移量,在MapReduce中是LongWritable,其                  在文件系统中持久化

                   ·第一个value 是每一行的值

                   ·第二个key和value是mapper的输出类型

      (2)重写reduce,继承Reducer,重写reduce方法

        

    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // TODO 自动生成的方法存根
            int sum = 0;
            //reduce自动整合相同KEY Iterable迭代器 将key所携带的value值存入迭代器
            for (IntWritable intWritable : values) {
                sum += intWritable.get();//将value存入的1累加
            }
            //输出结果
            context.write(key, new IntWritable(sum));
        }
    }

          ·注意第一个KEY 和 VALUE的类型要和map输出的类型一致

            ·输入类型要用到迭代器,其作用类似于数组,将相同key值存在一个数组里

      (3)实现job类,来实现自定义的map和reduce,继承和实现接口重写run,其中用到了反射的原理

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    //继承和实现接口重写run
    public class MyJob extends Configured implements Tool {
        public static void main(String[] args) throws Exception {
            MyJob myJob=new MyJob();
            ToolRunner.run(myJob, null);
        }
        @Override
        public int run(String[] args) throws Exception {
            // TODO 自动生成的方法存根
            //配置连接
            Configuration conf=new Configuration();
            conf.set("fs.defaultFS", "hdfs://192.168.146.110:9000");
            Job job=Job.getInstance(conf);
            
            //反射调用
            job.setJarByClass(MyJob.class);
            job.setMapperClass(MyMap.class);
            job.setReducerClass(MyReduce.class);
            
            //输出 KEY 类型 输出 value 类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            //输入路径
            FileInputFormat.addInputPath(job, new Path("/hadoop/hadoop.txt"));
            //输出路径
            FileOutputFormat.setOutputPath(job, new Path("/hadoop/out"));
            //提交作业
            job.waitForCompletion(true);
            return 0;
        }
    }

     总结原理(个人理解):

      MapReduce分为两个步骤,map阶段和reduce阶段

      (1)map阶段:数据的读取阶段,读取HDFS中的文件,每一行解析成一个<KEY,VALUE>,,分区存放

       (2)reduce阶段:数据的处理阶段,整合键值对中相同KEY,输出整合后的VALUE。

  • 相关阅读:
    前后端分离的若依(ruoyi)基本使用
    Redis (error) NOAUTH Authentication required.解决方法
    Creating Server TCP listening socket 127.0.0.1:6379: bind: No error。。。启动遇到问题的解决办法
    持续集成
    Java compiler level does not match the version of the installed Java project facet问题解决
    控制台报错:Unknown database 'xxxxx'
    走进docker-machine
    走进docker-compose
    java新手学习路线
    Spring WebFlux 教程:如何构建反应式 Web 应用程序
  • 原文地址:https://www.cnblogs.com/wf1647790534/p/9325271.html
Copyright © 2011-2022 走看看