zoukankan      html  css  js  c++  java
  • MapReduce Demo

    功能:统计公司员工一个月内手机上网上行流量、下行流量及总流量。 

    测试数据如下:

    13612345678     6000    1000

    13612345678     2000    3000

    13812345678     2000    100
    13812345678     1500    300
    13512345678     9000    200
    13512345678     500     200
    13112345678     1000    200
    13112345678     800     200

    代码:

       程序入口类:DataCount  

    package cn.terry.mr;
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
    public class DataCount {
      
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf=new Configuration();
    Job job=Job.getInstance(conf);
             job.setJarByClass(DataCount.class);
             job.setMapperClass(MRMap.class);  
             FileInputFormat.setInputPaths(job, new Path(args[0]));    
      
             job.setReducerClass(MRReduce.class);
             job.setMapOutputKeyClass(Text.class);
             job.setMapOutputValueClass(DataBean.class);    
             FileOutputFormat.setOutputPath(job, new Path(args[1]));
             job.waitForCompletion(true);
    }         
    }  

    数据实体类:  DataBean.java

    package cn.terry.mr; 
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import org.apache.hadoop.io.Writable;
    public class DataBean implements Writable {
    private String telNo;
    private Long upPayLoad;
    private Long downPayLoad;
    private Long totalPayLoad;
    public String getTelNo() {
    return telNo;
    }
    public void setTelNo(String telNo) {
    this.telNo = telNo;
    }
    public Long getUpPayLoad() {
    return upPayLoad;
    }
    public void setUpPayLoad(Long upPayLoad) {
    this.upPayLoad = upPayLoad;
    }
    public Long getDownPayLoad() {
    return downPayLoad;
    }
    public void setDownPayLoad(Long downPayLoad) {
    this.downPayLoad = downPayLoad;
    }
    public Long getTotalPayLoad() {
    return totalPayLoad;
    }
    public void setTotalPayLoad(Long totalPayLoad) {
    this.totalPayLoad = totalPayLoad;
    }  
    public DataBean() {  
    }
    public DataBean(String telNo, Long upPayLoad, Long downPayLoad) {
     
    this.telNo = telNo;
    this.upPayLoad = upPayLoad;
    this.downPayLoad = downPayLoad;
    this.totalPayLoad=this.upPayLoad+this.downPayLoad;
    }
    //serialize
    @Override
    public void write(DataOutput out) throws IOException {
    // TODO Auto-generated method stub
        out.writeUTF(telNo);
        out.writeLong(upPayLoad);
        out.writeLong(downPayLoad);
        out.writeLong(totalPayLoad);
    }
    //deserrialize
    @Override
    public void readFields(DataInput in) throws IOException {
    // TODO Auto-generated method stub
    this.telNo=in.readUTF();
    this.upPayLoad=in.readLong();
    this.downPayLoad=in.readLong();
    this.totalPayLoad=in.readLong();
    }
    @Override
    public String toString() {
    // TODO Auto-generated method stub
    return  this.upPayLoad+" "+ this.downPayLoad+" " + this.totalPayLoad;
    }
    Map类:MRMap.java  
    package cn.terry.mr;
    import java.io.IOException;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    public  class MRMap extends Mapper<LongWritable,Text,Text,DataBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {  
    String line=value.toString();
    String[] fields=line.split(" ");
    String telNo=fields[0];
    Long up=Long.parseLong(fields[1]);
    Long down= Long.parseLong(fields[2]);
    DataBean bean=new DataBean(telNo,up,down);
    context.write(new Text(telNo), bean);
    }   

    Reduce类:MRReduce.java

    package cn.terry.mr;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    public class MRReduce extends Reducer<Text,DataBean,Text,DataBean> {
    @Override
    protected void reduce(Text key, Iterable<DataBean> v2,  Context context) throws IOException, InterruptedException {
    long up_sum=0;
    long down_sum=0;
    for(DataBean bean :v2)
    {
    up_sum+=bean.getUpPayLoad();
    down_sum+=bean.getDownPayLoad();
    }
    DataBean bean=new DataBean("",up_sum,down_sum);
    context.write(key, bean);
    }
     运行:
    [root@master bin]# hadoop jar /home/hadoop/mpCount.jar cn.terry.mr.DataCount /data.txt /mrOut

    17/11/08 11:34:25 INFO client.RMProxy: Connecting to ResourceManager at master/1:80 32

    17/11/08 11:34:27 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not p erformed. Implement the Tool interface and execute your application with ToolRunner to remedy this.

    17/11/08 11:34:27 INFO input.FileInputFormat: Total input paths to process : 1

    17/11/08 11:34:28 INFO mapreduce.JobSubmitter: number of splits:1

    17/11/08 11:34:28 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1509957441313_00 02

    17/11/08 11:34:29 INFO impl.YarnClientImpl: Submitted application application_1509957441313_00 02

    17/11/08 11:34:29 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/appli cation_1509957441313_0002/

    17/11/08 11:34:29 INFO mapreduce.Job: Running job: job_1509957441313_0002

    17/11/08 11:34:46 INFO mapreduce.Job: Job job_1509957441313_0002 running in uber mode : false

    17/11/08 11:34:46 INFO mapreduce.Job: map 0% reduce 0%

    17/11/08 11:34:55 INFO mapreduce.Job: Task Id : attempt_1509957441313_0002_m_000000_0, Status : FAILED Error: java.io.IOException: Initialization of all the collectors failed. Error in last collect or was :class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:415) at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81) at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746 

    以上错误可看出hadoop引用的Text包出错,需要将DataCount类中Text的包引用改为 import org.apache.hadoop.io.Text;

     再次运行:  

    [root@master bin]# hadoop jar /home/hadoop/mpCount.jar cn.terry.mr.DataCount /data3.txt /MROut4 
    17/11/08 16:23:45 INFO client.RMProxy: Connecting to ResourceManager at master/x.x.x.x:8032
    17/11/08 16:23:46 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
    17/11/08 16:23:47 INFO input.FileInputFormat: Total input paths to process : 1
    17/11/08 16:23:47 INFO mapreduce.JobSubmitter: number of splits:1
    17/11/08 16:23:47 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1509957441313_0008
    17/11/08 16:23:48 INFO impl.YarnClientImpl: Submitted application application_1509957441313_0008
    17/11/08 16:23:48 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1509957441313_0008/
    17/11/08 16:23:48 INFO mapreduce.Job: Running job: job_1509957441313_0008
    17/11/08 16:24:02 INFO mapreduce.Job: Job job_1509957441313_0008 running in uber mode : false
    17/11/08 16:24:02 INFO mapreduce.Job:  map 0% reduce 0%
    17/11/08 16:24:14 INFO mapreduce.Job:  map 100% reduce 0%
    17/11/08 16:24:25 INFO mapreduce.Job:  map 100% reduce 100%
    17/11/08 16:24:26 INFO mapreduce.Job: Job job_1509957441313_0008 completed successfully 

    查看结果:

    [root@master bin]# hdfs dfs -ls /MROut4
    Found 2 items
    -rw-r--r--   2 root supergroup          0 2017-11-08 16:24 /MROut4/_SUCCESS
    -rw-r--r--   2 root supergroup        106 2017-11-08 16:24 /MROut4/part-r-00000
    [root@master bin]# hdfs dfs -cat /MROut4/part-r-00000
    13112345678     1800    400     2200
    13512345678     9500    400     9900
    13612345678     8000    4000    12000
    13812345678     3500    400     3900

     由于我的chrome和IE版本无法兼容cnblogs的插入code和picture功能,抱歉没能将代码及结果以友好的方式呈现。

  • 相关阅读:
    反射的概述_反射应用实例
    日期类之SimpleDateFormat
    StringBuffer & StringBuilder
    String与包装类_字节数组_字符数组间的转换
    两种方法k8s安装dashboard组件
    git学习
    Prometheus搭建
    python学习博客
    Python的全局变量和局部变量
    python参数
  • 原文地址:https://www.cnblogs.com/abcdwxc/p/7804803.html
Copyright © 2011-2022 走看看