zoukankan      html  css  js  c++  java
  • MapReduce高级

    1-MapReduce-计数器

     

    SortMapper.java

    package com.mapreduce_sort;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Mapper;

    public class SortMapper extends Mapper<LongWritable, Text, PairWritable, Text>{
    @Override
    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
    //自定义计数器
    Counter counter = context.getCounter("MR_COUNT", "MapReduceCounter");//类型:"MR_COUNT", 变量:"MapReduceCounter"
    counter.increment(1L);//1L为每次执行map方法就计数一次信息,L为long类型


    //1.对每一行数据进行拆分,然后封装到PairWritable对象中,作为A2
    String[] split = value.toString().split(" ");
    PairWritable pairWritable = new PairWritable();
    pairWritable.setFirst(split[0]);
    pairWritable.setSecond(Integer.parseInt(split[1].trim()));

    //2.将k2和v2写入上下文中
    context.write(pairWritable, value);
    }
    }

     

    SortReduce.java

    package com.mapreduce_sort;

    import java.io.IOException;

    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class SortReducer extends Reducer<PairWritable,Text, PairWritable, NullWritable>{

    /**
    * a 1 <a 1,a 1>
    * a 1
    *
    */
    //自定义计数器:使用枚举
    public static enum MyCounter{//定义了两个计数器
    REDUCE_INPUT_KEY_RECORDS,REDUCE_INPUT_VALUE_RECORDS
    }

    @Override
    protected void reduce(PairWritable key, Iterable<Text> values,Context context)throws IOException, InterruptedException {
    //统计Reduce阶段key的个数
    context.getCounter(MyCounter.REDUCE_INPUT_KEY_RECORDS).increment(1L);

    //处理有两个a 1
    for (Text value : values) {
    //统计Reduce阶段value的个数
    context.getCounter(MyCounter.REDUCE_INPUT_VALUE_RECORDS).increment(1L);

    //NullWritable.get();.get()表示获取空对象
    context.write(key, NullWritable.get());
    }
    }
    }

     

     ====================================================================================================================================================

    2-MapReduce-Combiner规约-原理分析

     ============================================================================================================================================

    3-MapReduce-Combiner规约-代码实现

     WordCountMapper.java

    package com.mapreduce_combiner;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;


    /**
    * Mapper的泛型:
    * KEYIN:k1的类型 有偏移量 LongWritable
    * VALUEIN:v1的类型 一行的文本数据 Text
    * KEYOUT:k2的类型 每个单词 Text
    * VALUEOUT:v2的类型 固定值1 LongWritable
    *
    */
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
    /**
    * map方法是将k1和v1转为k2和v2
    * key:是k1
    * value:是v1
    * context:表示MapReduce上下文对象
    */
    /**
    * k1 v1
    * 0 hello,world
    * 11 hello,hadoop
    * ------------------------------------------
    * k2 v2
    * hello 1
    * world 1
    * hadoop 1
    */
    @Override
    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
    Text text=new Text();
    LongWritable writable = new LongWritable();
    //1.对每一行数据进行字符串拆分
    String line = value.toString();
    String[] split = line.split(",");
    //2.遍历数组,获取一个单词

    //靠context来连接
    for (String word : split) {
    text.set(word);
    writable.set(1);
    context.write(text,writable);
    }
    }
    }

    ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    WordCountReducer.java

    package com.mapreduce_combiner;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    /**
    * KEYIN:k2 Text 每个单词
    * VALUE:v2 LongWritable 集合中泛型的类型
    * KEYOUT:k3 Text 每个单词
    * VALUEOUT LongWritable 每个单词出现的次数
    */
    public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
    /**
    * reduce方法的作用是将k2和v2转为k3和v3
    * key:k2
    * value:集合
    * context:MapReduce的上下文对象
    */
    /**
    * 新 k2 v2
    * hello <1,1>
    * world <1,1>
    * hadoop <1,1,1>
    * -----------------------------
    * k3 v3(遍历集合相加)
    * hello 2
    * world 2
    * hadoop 3
    */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,
    Context context) throws IOException, InterruptedException {
    long count=0;
    //1.遍历values集合
    for (LongWritable value : values) {
    //2.将集合中的值相加
    count+=value.get();

    }
    //3:将k3和v3写入上下文中
    context.write(key, new LongWritable(count));
    }
    }

     ==========================================================================================================

    MyCombiner.java

    package com.mapreduce_combiner;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    //规约(减少网络传输数据量,提高网络传输效率)
    public class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,
    Context context) throws IOException, InterruptedException {
    long count=0;
    //1.遍历values集合
    for (LongWritable value : values) {
    //2.将集合中的值相加
    count+=value.get();

    }
    //3:将k3和v3写入上下文中
    context.write(key, new LongWritable(count));
    }

    }

    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    JobMain.java

    package com.mapreduce_combiner;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    public class JobMain extends Configured implements Tool{

    @Override
    public int run(String[] arg0) throws Exception {
    //创建一个任务对象
    Job job = Job.getInstance(super.getConf(),"mapreduce_wordcount");

    //打包在集群运行时,需要做一个配置
    job.setJarByClass(JobMain.class);

    //设置任务对象
    //第一步:设置读取文件的类:K1和V1(读取原文件TextInputFormat)
    job.setInputFormatClass(TextInputFormat.class);
    //设置从哪里读
    TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
    //第二步:设置Mapper类
    job.setMapperClass(WordCountMapper.class);
    //设置Map阶段的输出类型: k2和v2的类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
    //进入Shuffle阶段,采取默认分区,默认排序,默认规约,默认分组
    //第三,四,五,六步,采取默认分区,默认排序,默认规约,默认分组
    //设置我们的规约类
    job.setCombinerClass(MyCombiner.class);


    //第七步:设置Reducer类
    job.setReducerClass(WordCountReducer.class);
    //设置reduce阶段的输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);

    //第八步: 设置输出类
    job.setOutputFormatClass(TextOutputFormat.class);
    //设置输出的路径
    //注意:wordcount_out这个文件夹一定不能存在
    TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/wordcount_combiner"));

    boolean b= job.waitForCompletion(true);//固定写法;一旦任务提交,处于等待任务完成状态
    return b?0:1;
    }

    public static void main(String[] args) throws Exception {
    Configuration configuration = new Configuration();
    //启动一个任务
    //返回值0:执行成功
    int run = ToolRunner.run(configuration, new JobMain(), args);
    System.out.println(run);
    }
    }

     

     

     加入规约代码后执行截图:

     ===================================================================================================================================================

    4-MapReduce-流量统计求和-步骤分析(手机流量统计案例)

    data_flow.dat

    1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 游戏娱乐 24 27 2481 24681 200
    1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 jd.com 京东购物 4 0 264 0 200
    1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 taobao.com 淘宝购物 2 4 132 1512 200
    1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 cnblogs.com 技术门户 4 0 240 0 200
    1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
    1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 未知 20 16 4116 1432 200
    1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 sougou.com 综合门户 18 15 1116 954 200
    1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
    1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 baidu.com 综合搜索 4 0 240 0 200
    1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
    1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
    1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
    1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 zhilian.com 招聘门户 15 9 918 4938 200
    1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 csdn.net 技术门户 3 3 180 180 200
    1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
    1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 图片大全 12 12 3008 3720 200
    1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
    1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
    1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
    1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 http://youku.com/ 视频网站 2 2 120 120 200
    1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 img.qfc.cn 图片浏览 6 3 360 180 200
    1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 www.baidu.com 综合门户 18 138 1080 186852 200

     ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    只关心手机号和包数和流量

    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    流程图:

     ==============================================================================================================================

    5-MapReduce-流量统计求和-FlowBean和Mapper代码编写

     FlowCountMapper.java

    package com.mapreduce_flowcount;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    @Override
    protected void map(LongWritable key, Text value,Context context)
    throws IOException, InterruptedException {
    //1.拆分手机号
    String[] split = value.toString().split(" ");
    String phoneNum=split[1];
    //2.获取四个流量字段
    FlowBean flowBean = new FlowBean();
    flowBean.setUpFlow(Integer .parseInt(split[6]));
    flowBean.setDownFlow(Integer .parseInt(split[7]));
    flowBean.setUpCountFlow(Integer .parseInt(split[8]));
    flowBean.setDownCountFlow(Integer .parseInt(split[9]));

    //3.将k2和v2写入上下文中
    context.write(new Text(phoneNum), flowBean);
    }
    }

    ------------------------------------------------------------------------------------------------------------------------------------------------------------------

    FlowCountReducer.java

    package com.mapreduce_flowcount;

    import java.io.IOException;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)
    throws IOException, InterruptedException {
    //封装新的FlowBean
    FlowBean flowBean = new FlowBean();
    Integer upFlow = 0; // 上行包
    Integer downFlow = 0; // 下行包
    Integer upCountFlow = 0; // 上行流量
    Integer downCountFlow = 0;// 下行流量

    for (FlowBean value : values) {
    upFlow += value.getUpFlow();
    downFlow += value.getDownFlow();
    upCountFlow += value.getUpCountFlow();
    downCountFlow += value.getDownCountFlow();
    }
    flowBean.setUpFlow(upFlow);
    flowBean.setDownFlow(downFlow);
    flowBean.setUpCountFlow(upCountFlow);
    flowBean.setDownCountFlow(downCountFlow);

    //将k3和v3写入上下文中
    context.write(key, flowBean);
    }

    }

    ------------------------------------------------------------------------------------------------------------------------------------------

    FlowBean.java

    package com.mapreduce_flowcount;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.Writable;

    public class FlowBean implements Writable{
    private Integer upFlow; //上行包
    private Integer downFlow; //下行包
    private Integer upCountFlow; //上行流量
    private Integer downCountFlow;//下行流量
    public Integer getUpFlow() {
    return upFlow;
    }
    public void setUpFlow(Integer upFlow) {
    this.upFlow = upFlow;
    }
    public Integer getDownFlow() {
    return downFlow;
    }
    public void setDownFlow(Integer downFlow) {
    this.downFlow = downFlow;
    }
    public Integer getUpCountFlow() {
    return upCountFlow;
    }
    public void setUpCountFlow(Integer upCountFlow) {
    this.upCountFlow = upCountFlow;
    }
    public Integer getDownCountFlow() {
    return downCountFlow;
    }
    public void setDownCountFlow(Integer downCountFlow) {
    this.downCountFlow = downCountFlow;
    }
    //序列化格式的时候调用toString方法
    @Override
    public String toString() {
    return upFlow + " " + downFlow + " " + upCountFlow + " " + downCountFlow;
    }
    //反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
    this.upFlow=dataInput.readInt();
    this.downFlow=dataInput.readInt();
    this.upCountFlow=dataInput.readInt();
    this.downCountFlow=dataInput.readInt();
    }
    //序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeInt(upFlow);
    dataOutput.writeInt(downFlow);
    dataOutput.writeInt(upCountFlow);
    dataOutput.writeInt(downCountFlow);
    }
    }

    -----------------------------------------------------------------------------------------------------------------------------------------

    JobMain.java

    package com.mapreduce_flowcount;

    import java.io.InputStream;
    import java.io.OutputStream;
    import java.util.Set;

    import javax.lang.model.SourceVersion;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    public class JobMain extends Configured implements Tool{


    @Override
    public int run(String[] arg0) throws Exception {
    //创建一个任务对象
    Job job = Job.getInstance(super.getConf(),"mapreduce_flowcount");

    //打包在集群运行时,需要做一个配置
    job.setJarByClass(JobMain.class);

    //设置任务对象
    //第一步:设置读取文件的类:K1和V1(读取原文件TextInputFormat)
    job.setInputFormatClass(TextInputFormat.class);
    //设置从哪里读
    TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/input/flowcount"));
    //第二步:设置Mapper类
    job.setMapperClass(FlowCountMapper.class);
    //设置Map阶段的输出类型: k2和v2的类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowBean.class);
    //进入Shuffle阶段,采取默认分区,默认排序,默认规约,默认分组
    //第三,四,五,六步,采取默认分区,默认排序,默认规约,默认分组



    //第七步:设置Reducer类
    job.setReducerClass(FlowCountReducer.class);
    //设置reduce阶段的输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);

    //第八步: 设置输出类
    job.setOutputFormatClass(TextOutputFormat.class);
    //设置输出的路径
    //注意:wordcount_out这个文件夹一定不能存在
    TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/out/flowcount_out"));

    boolean b= job.waitForCompletion(true);//固定写法;一旦任务提交,处于等待任务完成状态
    return b?0:1;
    }

    public static void main(String[] args) throws Exception {
    Configuration configuration = new Configuration();
    //启动一个任务
    //返回值0:执行成功
    int run = ToolRunner.run(configuration, new JobMain(), args);
    System.out.println(run);
    }


    }

    -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

     

    ==========================================================================================================

     11-MapReduce-流量统计求和-分区代码实现(以手机号分区)

    复制FlowCount代码修改

    FlowPartition.java

    package com.flowcount_sort;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;

    public class FlowPartition extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text text, FlowBean flowBean, int i) {
    //判断手机号以那个数字开头然后返回不同的分区编号
    if (text.toString().startsWith("135")) {
    return 0;
    } else if (text.toString().startsWith("136")) {
    return 1;
    } else if (text.toString().startsWith("137")){
    return 2;
    }else {
    return 3;
    }
    }
    }

    ------------------------------------------------------------------------------------------------------------------------------------------------------------------

    Jobmain1.java

    package com.flowcount_sort;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    public class JobMain1 extends Configured implements Tool{


    @Override
    public int run(String[] arg0) throws Exception {
    //创建一个任务对象
    Job job = Job.getInstance(super.getConf(),"mapreduce_flowcount_partition");

    //打包在集群运行时,需要做一个配置
    job.setJarByClass(JobMain1.class);

    //设置任务对象
    //第一步:设置读取文件的类:K1和V1(读取原文件TextInputFormat)
    job.setInputFormatClass(TextInputFormat.class);
    //设置从哪里读
    TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/input/flowcount"));
    //第二步:设置Mapper类
    job.setMapperClass(FlowCountMapper.class);
    //设置Map阶段的输出类型: k2和v2的类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowBean.class);
    //进入Shuffle阶段,采取默认分区,默认排序,默认规约,默认分组
    //第三,四,五,六步,采取默认分区,默认排序,默认规约,默认分组
    //设置分区类
    job.setPartitionerClass(FlowPartition.class);
    //设置Reduce个数
    job.setNumReduceTasks(4);


    //第七步:设置Reducer类
    job.setReducerClass(FlowCountReducer.class);
    //设置reduce阶段的输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);

    //第八步: 设置输出类
    job.setOutputFormatClass(TextOutputFormat.class);
    //设置输出的路径
    //注意:wordcount_out这个文件夹一定不能存在
    TextOutputFormat.setOutputPath(job, new Path("hdfs://node01:8020/out/flowcount_out_partition"));

    boolean b= job.waitForCompletion(true);//固定写法;一旦任务提交,处于等待任务完成状态
    return b?0:1;
    }

    public static void main(String[] args) throws Exception {
    Configuration configuration = new Configuration();
    //启动一个任务
    //返回值0:执行成功
    int run = ToolRunner.run(configuration, new JobMain1(), args);
    System.out.println(run);
    }
    }

    -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

     

  • 相关阅读:
    js 原生ajax实现
    layer 查看图片
    c# 操作XML
    C# 扩展方法
    c# 依赖注入
    visual studio 快捷键
    HIS系统结算后,没有更新单据状态为“已结算”
    网络流四·最小路径覆盖 HihoCoder
    飞行员配对(二分图最大匹配) 51Nod
    开心的小Q 51Nod
  • 原文地址:https://www.cnblogs.com/curedfisher/p/12603200.html
Copyright © 2011-2022 走看看