zoukankan      html  css  js  c++  java
  • 【Hadoop离线基础总结】MapReduce入门

    MapReduce入门


    Mapreduce思想

    • 概述
      MapReduce的思想核心分而治之,适用于大量复杂的任务处理场景(大规模数据处理场景)。
      最主要的特点就是把一个大的问题划分成很多小的子问题并且每个小的子问题的求取思路与我们大问题的求取思路一样
      最主要有两个阶段:一个map阶段,负责拆分;一个是reduce阶段,负责聚合

    • 思想模型
      在这里插入图片描述
      一个文件切块(Split)对应一个mapTask
      mapreduce没有block的概念,默认一个切块(Split)对应block块的大小(128M)
      MapReduce处理的数据类型是<key,value>键值对


    MapReduce运行流程

    • 可控的八个步骤
      1.读取文件,解析成<key,value>对。  这里是<k1,v1>
      2.接收<k1,v1>,自定义我们的map逻辑,然后转换成新的<k2,v2>进行输出,往下发送。  这里发送出去的是<k2,v2>
      3.分区。相同key的value发送到同一个reduce里面去,key合并,value形成一个集合
      4.排序。
      5.规约。
      6.分组。
      7.接收<k2,v2>,自定义reduce逻辑,转换成新的<k3,v3>进行输出
      8.将<k3,v3>进行输出

    • 运行流程图
      在这里插入图片描述

    • 运行流程代码
      定义一个主类,用来描述job并提交job

    package cn.itcast.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * 定义一个主类,用来描述job并提交job
     * 除了javaweb的程序打成一个war包进行运行,其他程序都是打成一个jar包进行运行
     * 运行一个jar包需要main方法,作为程序的入口类
     */
    public class MainCount extends Configured implements Tool {
        /**
         * 程序入口类
         *
         * @param args
         */
        public static void main(String[] args) throws Exception {
            //创建Configuration对象
            Configuration configuration = new Configuration();
            //执行ToolRunner得到一个int类型的返回值,表示程序的退出状态码(如果退出状态码是0代表运行成功,其他数字代表运行失败)
            int run = ToolRunner.run(configuration, new MainCount(), args);
            //程序退出
            System.exit(run);
    
        }
    
        /**
         * 这个run方法很重要,这里面主要通过job对象来组装程序,就是组装步骤的八个类(每一个步骤都是一个类)
         *
         * @param args
         * @return
         * @throws Exception
         */
        @Override
        public int run(String[] args) throws Exception {
            /*
            第一步:读取文件,解析成<key,value>对
            从父类里获取Configuration
            getInstance需要两个参数。一个是Configuration配置文件,一个是jobName(可以随便起)
             */
            Job job = Job.getInstance(super.getConf(), "xxx");
    
            //指定输入类
            job.setInputFormatClass(TextInputFormat.class);
            //TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
            TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/3.大数据离线第三天/wordcount/input"));
    
    
            //第二步:设置mapper类
            job.setMapperClass(WordCountMapper.class);
            //设置k2和v2的类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            /**
             * 第三步到第六步省略
             * 分区   相同key的value发到同一个reduce,key合并,valuex形成一个集合
             * 排序
             * 规约
             * 分组
             */
    
            //第七步:设置reduce类
            job.setReducerClass(WordCountReducer.class);
            //设置k3和v3的类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            //第八步:设置输出类以及输出路径
            job.setOutputFormatClass(TextOutputFormat.class);
            //TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcount_out"));
            TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/3.大数据离线第三天/wordcount/output/"));
    
            //将任务提交到集群上去
            boolean b = job.waitForCompletion(true);
            return b ? 0 : 1;
        }
    }
    
    

    定义一个mapper类

    package cn.itcast.wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    import java.io.InputStream;
    
    /**
     * 设置Mapper类
     * 自定义Map逻辑,把<k1,v1>转换成新的<k2,v2>进行输出,往下发送
     */
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        /**
         * 重写map方法,实现自定义逻辑,接收<k1,v1>,转换成新的<k2,v2>
         *
         * @param key     k1
         * @param value   v1
         * @param context 上下文对象(承上启下,衔接上面的组件和下面的组件)
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //第一步:切开第一行数据
            String line = value.toString();
            //转换成<k2,v2>往下发送
            String[] split = line.split(",");
            //循环遍历数据
            for (String word : split) {
                //创建k2和v2对象
                Text k2 = new Text(word);
                IntWritable v2 = new IntWritable(1);
                //通过write方法将数据往下发送
                context.write(k2, v2);
            }
        }
    }
    

    定义一个reducer类

    package cn.itcast.wordcount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        /**
         * 重写reduce方法
         *
         * @param key     k2
         * @param values  集合 v2
         * @param context 上下文对象
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            //循环遍历集合
            int count = 0;
            for (IntWritable value : values) {
                //IntWritable没有"+"方法,需要转换成int类型
                int num = value.get();
                count += num;
            }
            //输出 <k3,v3>类型
            context.write(key, new IntWritable(count));
        }
    }
    
    • 如果遇到此问题
      Caused by:
      org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=admin, access=WRITE, inode="/":root:supergroup:drwxr-xr-x

    第一种解决方法:关闭HDFS权限

    cd /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop
    
    vim hdfs-site.com
    

    修改hdfs-site.com配置文件,将true改为false即可

    <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property>
    

    第二种解决方法:将代码打包成jar包,放到集群上执行

    yarn jar hadoop_hdfs_operate-1.0-SNAPSHOT.jar cn.itcast.hdfs.demo1.JobMain
    

    第三种解决方法:在本地系统运行

    //修改输入和输出路径
     TextInputFormat.setInputPaths(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/3.大数据离线第三天/wordcount/input"));
    
     TextOutputFormat.setOutputPath(job, new Path("file:////Volumes/赵壮备份/大数据离线课程资料/3.大数据离线第三天/wordcount/output/"));
    
    

  • 相关阅读:
    NodeJs实现图片上传
    初步认识Express框架渲染视图
    在javascript中使用replace
    javascript作用域与闭包
    eviews面板数据的操作
    我做的python常用的小技巧
    css3学习笔记
    div垂直居中的问题
    display和visibility的区别
    关于call和apply的那点事儿
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772492.html
Copyright © 2011-2022 走看看