zoukankan      html  css  js  c++  java
  • Hadoop MapReduce -wordcount学习

    (一)MapReduce简单介绍

    1. MapReduce是一种分布式的计算模型,主要用于搜索领域,解决海量数据的计算问题
    2. 它主要由两个阶段组成:Map和Reduce,用户只要实现map()和reduce()两个函数,就可以实现分布式计算
    3. 这两个函数的形参是key,value对,表示函数的输入信息

    (二)步骤分析

    1. map过程

        1.1 读取输入文件内容,解析成key、value对。对输入的文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
      
        2.2 写出自己的算法逻辑,对输入的key、value对进行处理,转换成新的key、value输出。
      
    2. reduce过程

       2.1 在reduce之前,shuffle会对多个map任务的输出进行合并、排序。
       2.2 在reduce函数中写出自己的算法逻辑,对输入的key、value对进行处理,转换成新的key、value输出。
       2.3 把输出保存到文件当中。
      

    (三)WordCount例子学习

    1. 准备需要的文件

      在这里,我写了一个source.txt文件并且通过

        hdfs dfs -put /home/txp/桌面/source.txt /wc/srcdata

    上传到HDFS中去(当然此时已经启动hadoop集群)

    1. 定义一个WcMapper类继承Mapper
          public class WcMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws IOException, InterruptedException {
            //接受数据
            String line=value.toString();
            //切分数据
            String [] words=StringUtils.split(line," ");
            //循环
            for (String w :words) {
                //出现一次  记个1
                context.write(new Text(w), new LongWritable(1));
            }
        }
    }

    对文件中的数据是一行一行的处理。

    1. 定义一个WcReducer类继承Reducer
     public class WcReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
        @Override
        protected void reduce(Text key, Iterable<LongWritable> value,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
            //定义一个计算器
            long count=0;
            //循环
            for(LongWritable v:value) {
                count+=v.get();
            }
            //输出
            context.write(key, new LongWritable(count));
        }
    }

    4.创建WordCount 类具体实现

       public class WordCount {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf=new Configuration();
            //conf.set("mapreduce.job.jar", "wc.jar");
            //新建一个job对象
            Job job = Job.getInstance(conf);
    
            //加载main方法所在的类
            job.setJarByClass(WordCount.class);
            //设置mapper与reducer类
            job.setMapperClass(WcMapper.class);
            job.setReducerClass(WcReducer.class);
    
            //Combiner组件
            //指定job所使用的combiner类定义
            job.setCombinerClass(WcReducer.class);
    
            //设置mapper输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            //设置reducer输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path("hdfs://Master:9000/wc/srcdata"));
    
            //如果已经有了output了  删除
            FileSystem fs=FileSystem.get(conf);
            Path output=new Path("/wc/output");
            if(fs.exists(output)) {
                fs.delete(output,true);
            }
            //指定处理之后的结果输出到那个路径
    
            FileOutputFormat.setOutputPath(job, new Path("hdfs://Master:9000/wc/output"));
    
            boolean res = job.waitForCompletion(true);
    
            System.out.println(res?1:0);
        }
    }

    5.运行模式

    5.1提交到集群中运行
        将开发好的该程序打成jar包,上传到服务器,执行命令
    
         hadoop  jar  包名.主类 输入目录  输出目录
    5.2在本地运行
        直接在WordCount中启动,这种也可以在yarn集群中运行
    
    希望在知识中书写人生的代码
  • 相关阅读:
    Python-流程控制之if判断
    Python-流程控制之循环
    Python-基本运算符
    Python-基本运算符
    Python-数据类型的基本使用
    python2中与用户交互
    Python-内存管理
    vue 替换表格中的数据
    实现单例的三个方法
    django----框架介绍
  • 原文地址:https://www.cnblogs.com/tongxupeng/p/10259554.html
Copyright © 2011-2022 走看看