zoukankan      html  css  js  c++  java
  • MapReduce(分布式计算)_01

    13-MapReduce排序-流程分析1-MapReduce介绍

     

     ===========================================================================================================================

    2-MapReduce的构思和框架结构

     ===============================================================================================================

    3-MapReduce的编程规范

     ===================================================================================================================

    4-MapReduce案例-wordcount-步骤分析

     ======================================================================================================================

    5-MapReduce案例-wordcount-准备工作

     

     

     =========================================================================================================================

    6-MapReduce案例-wordcount-Map阶段代码

     

     

     WordCountMapper.java

    package com.mapreduce;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;


    /**
    * Mapper的泛型:
    * KEYIN:k1的类型 有偏移量 LongWritable
    * VALUEIN:v1的类型 一行的文本数据 Text
    * KEYOUT:k2的类型 每个单词 Text
    * VALUEOUT:v2的类型 固定值1 LongWritable
    *
    */
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
    /**
    * map方法是将k1和v1转为k2和v2
    * key:是k1
    * value:是v1
    * context:表示MapReduce上下文对象
    */
    /**
    * k1 v1
    * 0 hello,world
    * 11 hello,hadoop
    * ------------------------------------------
    * k2 v2
    * hello 1
    * world 1
    * hadoop 1
    */
    @Override
    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
    Text text=new Text();
    LongWritable writable = new LongWritable();
    //1.对每一行数据进行字符串拆分
    String line = value.toString();
    String[] split = line.split(",");
    //2.遍历数组,获取一个单词

    //靠context来连接
    for (String word : split) {
    text.set(word);
    writable.set(1);
    context.write(text,writable);
    }
    }
    }

    ============================================================================================================

    7-MapReduce案例-wordcount-Reduce阶段代码 
    WordCountReducer.java

    package com.mapreduce;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    /**
    * KEYIN:k2 Text 每个单词
    * VALUE:v2 LongWritable 集合中泛型的类型
    * KEYOUT:k3 Text 每个单词
    * VALUEOUT LongWritable 每个单词出现的次数
    */
    public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    /**
    * reduce方法的作用是将k2和v2转为k3和v3
    * key:k2
    * value:集合
    * context:MapReduce的上下文对象
    */
    /**
    * 新 k2 v2
    * hello <1,1>
    * world <1,1>
    * hadoop <1,1,1>
    * -----------------------------
    * k3 v3(遍历集合相加)
    * hello 2
    * world 2
    * hadoop 3
    */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,
    Context context) throws IOException, InterruptedException {
    long count=0;
    //1.遍历values集合
    for (LongWritable value : values) {
    //2.将集合中的值相加
    count+=value.get();

    }
    //3:将k3和v3写入上下文中
    context.write(key, new LongWritable(count));
    }
    }

    ===================================================================================================

    8-MapReduce案例-wordcount-JobMain代码   (主程序类的编写)

    MapReduce主程序的固定模板

    //创建一个任务对象
    Job job = Job.getInstance(super.getConf(),"mapreduce_wordcount");

    JobMain.class

    package com.mapreduce;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    public class JobMain extends Configured implements Tool{

    @Override
    public int run(String[] arg0) throws Exception {
    //创建一个任务对象
    Job job = Job.getInstance(super.getConf(),"mapreduce_wordcount");

    //打包在集群运行时,需要做一个配置
    job.setJarByClass(JobMain.class);

    //设置任务对象
    //第一步:设置读取文件的类:K1和V1(读取原文件TextInputFormat)
    job.setInputFormatClass(TextInputFormat.class);
    //设置从哪里读
    TextInputFormat.addInputPath(job,new Path("hdfs://http://192.168.187.100:50070/wordcount"));
    //第二步:设置Mapper类
    job.setMapperClass(WordCountMapper.class);
    //设置Map阶段的输出类型: k2和v2的类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
    //进入Shuffle阶段,采取默认分区,默认排序,默认规约,默认分组
    //第三,四,五,六步,采取默认分区,默认排序,默认规约,默认分组

    //第七步:设置Reducer类
    job.setReducerClass(WordCountReducer.class);
    //设置reduce阶段的输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);

    //第八步: 设置输出类
    job.setOutputFormatClass(TextOutputFormat.class);
    //设置输出的路径
    //注意:wordcount_out这个文件夹一定不能存在
    TextOutputFormat.setOutputPath(job, new Path("hdfs://http://192.168.187.100:50070/wordcount_out"));

    boolean b= job.waitForCompletion(true);//固定写法;一旦任务提交,处于等待任务完成状态
    return b?0:1;
    }

    public static void main(String[] args) throws Exception {
    Configuration configuration = new Configuration();
    //启动一个任务
    //返回值0:执行成功
    int run = ToolRunner.run(configuration, new JobMain(), args);
    System.out.println(run);
    }
    }

    =============================================================================================

     9-MapReduce案例-wordcount-集群运行

     

    运行成功:

     

     ======================================================================================================================

    10-MapReduce分区-原理

     ===========================================================================================================

    11-MapReduce分区-代码实现

    PartitonerOwn.java

    package com.mapreduce;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;

    public class PartitonerOwn extends Partitioner<Text,LongWritable>{
    /**
    * text:表示k2
    * longWritable:表示v2
    * i:reduce个数
    */
    @Override
    public int getPartition(Text text, LongWritable longWritable, int i) {
    //如果单词的长度>=5,进入第一个分区-->第一个reduceTask-->reduce编号为0
    if(text.toString().length()>=5) {
    return 0;
    }else {
    return 1;
    }
    }

    }

    ------------------------------------------------------------------------------------------------------------------------------------------------------------

    JobMain.java

    package com.mapreduce;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    public class JobMain extends Configured implements Tool{

    @Override
    public int run(String[] arg0) throws Exception {
    //创建一个任务对象
    Job job = Job.getInstance(super.getConf(),"mapreduce_wordcount");

    //打包在集群运行时,需要做一个配置
    job.setJarByClass(JobMain.class);
    //设置任务对象
    //第一步:设置读取文件的类:K1和V1(读取原文件TextInputFormat)
    job.setInputFormatClass(TextInputFormat.class);
    //设置从哪里读
    TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
    //第二步:设置Mapper类
    job.setMapperClass(WordCountMapper.class);
    //设置Map阶段的输出类型: k2和v2的类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
    //进入Shuffle阶段,采取默认分区,默认排序,默认规约,默认分组
    //第三,四,五,六步,采取默认分区,默认排序,默认规约,默认分组
    //设置我们的分区类
    job.setPartitionerClass(PartitonerOwn.class);

    //第七步:设置Reducer类
    job.setReducerClass(WordCountReducer.class);
    //设置reduce阶段的输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);

    //设置reduce的个数
    job.setNumReduceTasks(2);

    //第八步: 设置输出类
    job.setOutputFormatClass(TextOutputFormat.class);
    //设置输出的路径
    //注意:wordcount_out这个文件夹一定不能存在
    TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/wordcount_out"));

    boolean b= job.waitForCompletion(true);//固定写法;一旦任务提交,处于等待任务完成状态
    return b?0:1;
    }

    public static void main(String[] args) throws Exception {
    Configuration configuration = new Configuration();
    //启动一个任务
    //返回值0:执行成功
    int run = ToolRunner.run(configuration, new JobMain(), args);
    System.out.println(run);
    }
    }

    ------------------------------------------------------------------------------------------------------------------------------------------------------------

    修改代码之后,重新打成jar包,上传到服务器,执行代码jar包

     

     

     ==============================================================================================================

    12-MapReduce排序-概述

     ==============================================================================================================

    13-MapReduce排序-流程分析

     注意:k1为每一行的偏移量

     

     ================================================================================================================

    14-MapReduce排序-实现比较器和序列化代码

     PairWritable.java

    package com.mapreduce_sort;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;


    import org.apache.hadoop.io.WritableComparable;

    public class PairWritable implements WritableComparable<PairWritable>{
    private String first;
    private int second;


    public String getFirst() {
    return first;
    }

    public void setFirst(String first) {
    this.first = first;
    }

    public int getSecond() {
    return second;
    }

    public void setSecond(int second) {
    this.second = second;
    }


    @Override
    public String toString() {
    return first+' '+second;
    }

    //实现反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
    this.first=dataInput.readUTF();
    this.second=dataInput.readInt();
    }

    //实现序列化(将对象变成字节流)
    @Override
    public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeUTF(first);
    dataOutput.writeInt(second);
    }

    //实现排序规则
    @Override
    public int compareTo(PairWritable other) {
    //先比较first,如果first相同,则比较second
    int result=this.first.compareTo(other.first);//abc abb 先a比较;后比较b;最后发现c的ASCII值大于b
    if (result==0) {
    return this.second-other.second;
    }
    return result;
    }

    }

    ----------------------------------------------------------------------------------------------------------------------------------------------------------------

    package com.mapreduce_sort;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    public class SortMapper extends Mapper<LongWritable, Text, PairWritable, Text>{
    @Override
    protected void map(LongWritable key, Text value,Context context)
    throws IOException, InterruptedException {
    //1.对每一行数据进行拆分,然后封装到PairWritable对象中,作为A2
    String[] split = value.toString().split(" "); 
    PairWritable pairWritable = new PairWritable();
    pairWritable.setFirst(split[0]);
    pairWritable.setSecond(Integer.parseInt(split[1].trim()));

    //2.将k2和v2写入上下文中
    context.write(pairWritable, value);
    }
    }

    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    SortReducer.java

    package com.mapreduce_sort;

    import java.io.IOException;

    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class SortReducer extends Reducer<PairWritable,Text, PairWritable, NullWritable>{

    /**
    * a 1 <a 1,a 1>
    * a 1
    *
    */

    @Override
    protected void reduce(PairWritable key, Iterable<Text> values,Context context)throws IOException, InterruptedException {
    //处理有两个a 1
    for (Text value : values) {
    //NullWritable.get();.get()表示获取空对象
    context.write(key, NullWritable.get());
    }
    }
    }

    ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    JobMain.java

    package com.mapreduce_sort;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    public class JobMain extends Configured implements Tool{

    @Override
    public int run(String[] arg0) throws Exception {
    //创建一个任务对象
    Job job = Job.getInstance(super.getConf(),"mapreduce_sort");
    //打包在集群运行时,需要做一个配置
    job.setJarByClass(JobMain.class);
    //设置任务对象
    //第一步:设置读取文件的类:K1和V1(读取原文件TextInputFormat)
    job.setInputFormatClass(TextInputFormat.class);
    //设置从哪里读
    TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/input/sort"));
    //第二步:设置Mapper类
    job.setMapperClass(SortMapper.class);
    //设置Map阶段的输出类型: k2和v2的类型
    job.setMapOutputKeyClass(PairWritable.class);
    job.setMapOutputValueClass(Text.class);
    //进入Shuffle阶段,采取默认分区,默认排序,默认规约,默认分组
    //第三,四,五,六步,采取默认分区,默认排序,默认规约,默认分组

    //第七步:设置Reducer类
    job.setReducerClass(SortReducer.class);
    //设置reduce阶段的输出类型
    job.setOutputKeyClass(PairWritable.class);
    job.setOutputValueClass(NullWritable.class);

    //第八步: 设置输出类
    job.setOutputFormatClass(TextOutputFormat.class);
    //设置输出的路径
    //注意:wordcount_out这个文件夹一定不能存在
    TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/sort_out"));

    boolean b= job.waitForCompletion(true);//固定写法;一旦任务提交,处于等待任务完成状态
    return b?0:1;
    }

    public static void main(String[] args) throws Exception {
    Configuration configuration = new Configuration();

    //启动一个任务
    int run=ToolRunner.run(configuration, new JobMain(),args);
    //任务退出
    System.exit(run);
    }

    }

    =====================================================================================================================

    16-MapReduce排序-集群运行

     

     

  • 相关阅读:
    JSP/Servlet相关
    mysql 相关问题解决
    Git常用
    利用JDBC连接MySQL并使用MySQL
    memcache、redis原理对比
    Python 2.7.x 和 3.x 版本的重要区别
    python 单例模式
    python 装饰器原理及用法
    python 冒泡排序
    python 迭代器和生成器
  • 原文地址:https://www.cnblogs.com/curedfisher/p/12581753.html
Copyright © 2011-2022 走看看