zoukankan      html  css  js  c++  java
  • Hadoop工程师面试题(1)--MapReduce实现单表汇总统计

    数据源格式描述:

    输入t1.txt源数据,数据文件分隔符”*&*”,字段说明如下:

    字段序号 字段英文名称 字段中文名称 字段类型 字段长度
    1 TIME_ID 时间(到时) 字符型 12
    2 Session 会话时长 数值型 8
    3 MSISDN 用户号码 字符型 11
    4 SP_DOMAIN SP域名 数值型 64
    5 USER_AGENT_ORIGN 终端字串 字符型 128
    6 USER_AGENT 终端类别 字符型 64
    7 UPSTREAM_VOL 上行流量 数值型 8
    8 DOWNSTREAM_VOL 下行流量 数值型 8
    9 URL_CNT 访问次数 数值型 20

    用mapreduce实现单表汇总:

    在数据源的基础上,根据终端类型汇总出总流量及访问次数。汇总模型字段说明如下:

    字段序号 字段英文名称 字段中文名称 字段类型 字段长度
    1 USER_AGENT 终端类型 字符型
    2 TOT_FLUX 总流量 数值型 30
    3 URL_CNT 访问次数 数值型 30

    代码如下:

    package mianshi;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

    import com.google.protobuf.TextFormat;

    public class Test1 {

        /**
         * @param args
         * @throws IOException
         * @throws InterruptedException
         * @throws ClassNotFoundException
         */
        public static void main(String[] args) throws Exception {
            //创建配置文件
            Configuration conf=new Configuration();
            //创建job
            Job job = new Job(conf,Test1.class.getName());
            //设置jar包运行
            job.setJarByClass(Test1.class);
            //设置输入路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            //设置输入格式
            job.setInputFormatClass(TextInputFormat.class);
            //设置自定义Mapper
            job.setMapperClass(MyMapper.class);
            //设置Map输出的Value类型,也就是V2
            job.setMapOutputValueClass(Model.class);
            //设置Map输出的Key类型,也就是K2
            job.setMapOutputKeyClass(Text.class);
            //设置分区类型
            job.setPartitionerClass(HashPartitioner.class);
            //设置Rudece任务数
            job.setNumReduceTasks(1);
            //设置自定义Reduce类
            job.setReducerClass(MyReducer.class);
            //设置输出K3的类型
            job.setOutputKeyClass(Text.class);
            //设置输出的V3类型
            job.setOutputValueClass(Model.class);
            //设置输出的格式
            job.setOutputFormatClass(TextOutputFormat.class);
            //指定输出路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            //提交job
            job.waitForCompletion(true);

        }
        static class MyMapper extends Mapper<LongWritable, Text, Text, Model>{
            @Override
            protected void map(LongWritable k1, Text v1,Context context)
                    throws IOException, InterruptedException {
                /**
                 * 切割字符串有点意思!
                 * “*”是特殊字符,需要用[]
                 * "&"需要用\转义
                 *
                 *    
                 */
                String[] split = v1.toString().split("[*]\&[*]");
                Text user_agent = new Text(split[5]);
                Long tot_flux = new Long(split[6])+new Long(split[7]);
                Long url_cnt = new Long(split[8]);
                Model v2 = new Model(tot_flux, url_cnt);
                context.write(user_agent, v2);
               
            }
        }
       
        static class MyReducer extends Reducer<Text, Model, Text, Model>{
           
            @Override
            protected void reduce(Text k2, Iterable<Model> v2s,Context context)
                    throws IOException, InterruptedException {
               
                //定义计数器
                long sum_flux =0L;
                long sum_url = 0L;
                for(Model model : v2s){
                    sum_flux += model.tot_flux;
                    sum_url += model.url_cnt;
                }
                Model v3 = new Model(sum_flux,sum_url);
                context.write(k2, v3);
            }
           
        }
       

    }

    /**
    * 自定义类型必须实现Writable
    * @author Sky
    *
    */
    class Model implements Writable{
       
        long tot_flux;
        long url_cnt;
       
        public Model(){}
        public Model(Long tot_flux,Long url_cnt){
            this.tot_flux = tot_flux;
            this.url_cnt = url_cnt;
        }

        public void write(DataOutput out) throws IOException {
            //序列化出去
            out.writeLong(tot_flux);
            out.writeLong(url_cnt);
        }

        public void readFields(DataInput in) throws IOException {
            //和序列化出去的一样
            this.tot_flux = in.readLong();
            this.url_cnt = in.readLong();
           
        }
       
        //必须覆写toString方法,否则输出的值是内存值
        @Override
        public String toString() {
            return tot_flux+" "+url_cnt;
        }
       
       
    }

    文章参考论坛:超人hadoop网络学院论坛

  • 相关阅读:
    029- 位运算符
    028- 三目运算符
    027- 字符串链接运算符
    026- 布尔运算符
    lucene 结合数据库做搜索
    JDK 1.8判断集合种的元素是否存在相同
    Springboot 集成jpa使用
    json 的使用 Java对象转json
    Java 短信发送
    1 eclipse 离线安装activiti插件
  • 原文地址:https://www.cnblogs.com/luguoyuanf/p/3631930.html
Copyright © 2011-2022 走看看