zoukankan      html  css  js  c++  java
  • 大数据之mapreduce小实战

    手写wordcount的程序

    1、pom.xml

    <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.3</version>
    </dependency>
    </dependencies>

    2、新建Mapper类

    package com.hsiehchou.wordcount;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    /**
    * 海量数据
    *
    * hello hsiehchou
    * nihao
    *
    * 数据的输入与输出以Key value进行传输
    * keyIN:LongWritable(Long) 数据的起始偏移量
    * valuewIN:具体数据
    *
    * mapper需要把数据传递到reducer阶段(<hello,1>)
    * keyOut:单词 Text
    * valueOut:出现的次数IntWritable
    *
    */
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    //对数据进行打散 ctrl+o
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //1、接入数据 hello nihao
    String line = value.toString();
    //2、对数据进行切分
    String[] words = line.split(" ");
    //3、写出以<hello,1>
    for (String w:words){
    //写出reducer端
    context.write(new Text(w), new IntWritable(1));
    }
    }
    }

    3、新建Reducer类

    package com.hsiehchou.wordcount;
    import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException;
    /**
    * reducer阶段接收的是Mapper输出的数据
    * mapper的输出是reducer输入
    *
    * keyIn:mapper输出的key的类型
    * valueIn:mapper输出的value的类型
    *
    * reducer端输出的数据类型,想要一个什么样的结果<hello,1888>
    * keyOut:Text
    * valueOut:IntWritalble
    *
    */
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    //key-->单词 value-->次数
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    //1、记录出现的次数
    int sum = 0;
    for (IntWritable v:values){
    sum += v.get();
    }
    //2、l累加求和输出
    context.write(key, new IntWritable(sum));
    }
    }

    4、新建驱动类

    package com.hsiehchou.wordcount;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //1、创建job任务
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    //2、指定jar包位置
    job.setJarByClass(WordCountDriver.class);
    //3、关联使用的Mapper类
    job.setMapperClass(WordCountMapper.class);
    //4、关联使用的Reducer类
    job.setReducerClass(WordCountReducer.class);
    //5、设置mapper阶段输出的数据类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    //6、设置reducer阶段输出的数据类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    //7、设置数据输入的路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    //8设置数据输出的路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //9、提交任务
    boolean rs = job.waitForCompletion(true);
    System.exit(rs ? 0:1);
    }
    }
    运行结果
    [root@hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.wordcount.WordCountDriver /wc/in /wc/out
    [root@hsiehchou121 ~]# hdfs dfs -cat /wc/out/part-r-00000
    fd 1
    fdgs 1
    fdsbv 1
    gd 1
    hello 3

    5、IDEA的相关使用

    Ctrl+O导入相关未实现的方法
    Maven中的Lifecycle的package可以直接打包成jar

    案例分析

    需求:运营商流量日志
    10086
    计算每个用户当前使用的总流量
    思路?总流量 = 上行流量+下行流量
    三个字段:手机号 上行流量 下行流量
    技术选型:PB+
    数据分析:海量数据(存储hdfs)
    海量数据计算(分布式计算框架MapReduce)

    实现

    FlowBean类
    package com.hsiehchou.logs;
    import org.apache.hadoop.io.Writable;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    /**
    * 封装数据类型需要怎么做
    * hadoop数据类型实现了序列化接口
    * 如果自定义需要实现这个序列化接口
    */
    public class FlowBean implements Writable {
    //定义属性:上行流量 下行流量 总流量总和
    private long upFlow;
    private long dfFlow;
    private long flowsum;
    public FlowBean(){}
    public FlowBean(long upFlow, long dfFlow){
    this.upFlow = upFlow;
    this.dfFlow = dfFlow;
    this.flowsum = upFlow + dfFlow;
    }
    public long getUpFlow(){
    return upFlow;
    }
    public void setUpFlow(long upFlow){
    this.upFlow = upFlow;
    }
    public long getDfFlow(){
    return dfFlow;
    }
    public void setDfFlow(long dfFlow){
    this.dfFlow = dfFlow;
    }
    public long getFlowsum(){
    return flowsum;
    }
    public void setFlowsum(long flowsum){
    this.flowsum = flowsum;
    }
    //序列化
    public void write(DataOutput out) throws IOException {
    out.writeLong(upFlow);
    out.writeLong(dfFlow);
    out.writeLong(flowsum);
    }
    //反序列化
    public void readFields(DataInput in) throws IOException {
    upFlow = in.readLong();
    dfFlow = in.readLong();
    flowsum = in.readLong();
    }
    @Override
    public String toString() {
    return upFlow + " " + dfFlow + " " + flowsum;
    }
    }
    FlowCountMapper类
    package com.hsiehchou.logs;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException;
    /**
    * keyIN:
    * valueIN:
    *
    * 思路:根据想要的结果的kv类型 手机号 流量总和(上行+下行)自定义类
    * keyOut:
    * valueOut:
    */
    public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //1、接入数据源
    String line = value.toString();
    //2、切割
    String[] fields = line.split(" ");
    //3、拿到关键字段
    String phoneNr = fields[1];
    long upFlow = Long.parseLong(fields[fields.length - 3]);
    long dfFlow = Long.parseLong(fields[fields.length - 2]);
    //4、写出到reducer
    context.write(new Text(phoneNr), new FlowBean(upFlow,dfFlow));
    }
    }
    FlowCountReducer类
    package com.hsiehchou.logs;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException;
    public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
    long upFlow_sum = 0;
    long dfFlow_sum = 0;
    for (FlowBean v:values){
    upFlow_sum += v.getUpFlow();
    dfFlow_sum += v.getDfFlow();
    }
    FlowBean rsSum = new FlowBean(upFlow_sum, dfFlow_sum);
    //输出结果
    context.write(key, rsSum);
    }
    }
    FlowCountDriver类
    package com.hsiehchou.logs;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    public class FlowCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //1.创建job任务
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    //2.指定kjar包位置
    job.setJarByClass(FlowCountDriver.class);
    //3.关联使用的Mapper
    job.setMapperClass(FlowCountMapper.class);
    //4.关联使用的Reducer类
    job.setReducerClass(FlowCountReducer.class);
    //5.设置mapper阶段输出的数据类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowBean.class);
    //6.设置reducer阶段输出的数据类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);
    //优化含有大量小文件的数据
    //设置读取数据切片的类
    job.setInputFormatClass(CombineTextInputFormat.class);
    //最大切片大小8M
    CombineTextInputFormat.setMaxInputSplitSize(job, 8388608);
    //最小切片大小6M
    CombineTextInputFormat.setMinInputSplitSize(job, 6291456);
    //7.设置数据输入的路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    //8.设置数据输出的路径
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    //9.提交任务
    boolean rs = job.waitForCompletion(true);
    System.exit(rs? 0:1);
    }
    }
    运行结果
    [root@hsiehchou121 ~]# hdfs dfs -mkdir -p /flow/in
    [root@hsiehchou121 ~]# hdfs dfs -put HTTP_20180313143750.dat /flow/in
    [root@hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.logs.FlowCountDriver /flow/in /flow/out
    [root@hsiehchou121 ~]# hdfs dfs -cat /flow/out/part-r-00000
    13480253104 120 1320 1440
    13502468823 735 11349 12084
    13510439658 1116 954 2070
    13560436326 1136 94 1230
    13560436666 1136 94 1230
    13560439658 918 4938 5856
    13602846565 198 910 1108
    13660577991 660 690 1350
    13719199419 240 0 240
    13726130503 299 681 980
    13726238888 2481 24681 27162
    13760778710 120 120 240
    13822544101 264 0 264
    13884138413 4116 1432 5548
    13922314466 3008 3720 6728
    13925057413 11058 4243 15301
    13926251106 240 0 240
    13926435656 132 1512 1644
    15013685858 369 338 707
    15889002119 938 380 1318
    15920133257 316 296 612
    18212575961 1527 2106 3633
    18320173382 9531 212 9743

    小文件优化

    如果企业中存在海量的小文件数据
    TextInputFormat按照文件规划切片,文件不管多小都是一个单独的切片,启动mapt
    ask任务去执行,这样会产生大量的maptask,浪费资源。
    优化手段:
    小文件合并大文件,如果不动这个小文件内容。

  • 相关阅读:
    oracle expdp和impdp常用命令选项
    oracle expdp导出远程数据到本地
    oracle目录操作
    反射
    设置查询对话框的F7
    Uncaught TypeError: timeout.close is not a function. when try to use clearInterval
    timestamp to time 时间戳转日期
    react+antd 选项卡切换
    react antd Warning: must set key for <rc-animate> children
    微信企业号报Error: echostr校验失败,请您检查是否正确解密并输出明文
  • 原文地址:https://www.cnblogs.com/hsiehchou/p/10403452.html
Copyright © 2011-2022 走看看