zoukankan      html  css  js  c++  java
  • Hadoop系列(七)Hadoop三大核心之MapReduce-程序编写


    接下来以一个简单的WordCount为例子,介绍Java版本的MapReduce的程序编写。

    mapreduce程序主要分三部分:1.map部分,2.reduce部分,3.提交部分。

    1. 准备部分

    hadoop中,针对数据类型自成一体,与java的数据类型对应。封装在hadoop.io包中,主要分为基本类型和其它类型。

    • 基本数据类型
    数据类型 hadoop数据类型 Java数据类型
    布尔 BooleanWritable boolean
    整型 IntWritable int
    长整型 LongWritable long
    浮点型 FloatWritable float
    双精浮点 DoubleWritable double
    字节 ByteWritable byte
    • 其它类型
    数据类型 hadoop数据类型 Java数据类型
    字符串 Text String
    数组 ArrayWritable Array
    Map MapWritable Map

    2. jar包依赖

    创建一个maven工程,pom.xml文件中,添加以下依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>
    </dependencies>
    

    3. Map部分

    映射部分,将数据逐条处理

    首先,需要继承Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>类

    四个泛型参数分别代表:输入key 输入value 输出key 输出value

    然后重写mapper的map方法

    map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException

    主体代码:

    public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        /**
         *  流程,输入key和value,map的结果写入到context中
         */
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //读取一行数据
            String line = value.toString();
            //因为英文字母是以“ ”为间隔的,因此使用“ ”分隔符将一行数据切成多个单词并存在数组中
            String str[] = line.split(" ");
            //循环迭代字符串,将一个单词变成<key,value>形式,及<"hello",1>
            for (String s : str) {
                context.write(new Text(s), new IntWritable(1));
            }
        }
    
    }
    

    4.Reduce部分

    归并部分,将map处理和shuffle之后的数据进行归并。shuffle过程由hadoop控制

    Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

    四个泛型参数分别代表:输入key 输入value 输出key 输出value

    然后重写reducer的reduce方法

    reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException

    主体代码:

    public class WordCountReduce extends Reducer<Text,IntWritable,Text, IntWritable> {
        /**
         *  流程,输入key和value,map的结果写入到context中
         */
        @Override
        public void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException,InterruptedException{
            int count = 0;
    
            for(IntWritable value: values) {
                count++;
            }
            context.write(key,new IntWritable(count));
        }
    }
    

    5.提交部分

    mapreduce的入口

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        // 构造一个job对象来封装本mapreduce业务到所有信息
        Job mrJob = Job.getInstance(conf);
        // 指定本job工作用到的jar包位置
        mrJob.setJarByClass(WordCount.class);
        // 指定本job用到的mapper类
        mrJob.setMapperClass(WordCountMap.class);
        // 指定本job用到的reducer类
        mrJob.setReducerClass(WordCountReduce.class);
    
        // 指定mapper输出的kv类型
        mrJob.setMapOutputKeyClass(Text.class);
        mrJob.setMapOutputValueClass(IntWritable.class);
    
    
        // 指定reducer输出到kv数据类型(setOutputKeyClass 会对mapper和reducer都起作用,如果上面mapper不设置的话)
        mrJob.setOutputKeyClass(Text.class);
        mrJob.setOutputValueClass(IntWritable.class);
    
    
        FileInputFormat.addInputPath(mrJob,new Path(args[0]));
    
        //设置mapreduce程序的输出路径,MapReduce的结果都是输入到文件中
        FileOutputFormat.setOutputPath(mrJob,new Path(args[1]));
    
        // 最后提交任务
        boolean waitForCompletion = mrJob.waitForCompletion(true);
        System.exit(waitForCompletion ? 0 : 1);
    }
    

    6.打包提交

    将程序打成jar包,提交到hadoop集群中,然后使用命令行进行任务提交

    对于输入输出路径,均可接受本地路径和hdfs路径。

    本地路径前缀:file://

    hdfs路径前缀:hdfs://

    /hadoop-3.1.2/bin/hadoop jar my_mapreduce-1.0-SNAPSHOT.jar com.breakthrough.wordcount.WordCount file:///home/hadoop/english.txt file:///home/hadoop/output
    
  • 相关阅读:
    halcon学习笔记——(5)HDevelop language(异常处理)
    halcon学习笔记——(7)HALCON标定后的二维测量
    halcon学习笔记——(4)HDevelop language(结构语句)
    halcon学习笔记——(3)HDevelop language(基本语句)
    halcon学习笔记——(2)HDevelop language(基本数据类型)
    VMware虚拟机克隆Linux系统后找不到eth0网卡的问题
    完全分布模式安装Hadoop
    Hadoop小程序数据筛选
    SpringMVC访问静态资源
    Win7下IE9访问QC
  • 原文地址:https://www.cnblogs.com/valjeanshaw/p/11679956.html
Copyright © 2011-2022 走看看