zoukankan      html  css  js  c++  java
  • 自己的第一个MapReduce程序

    数据源:来自互联网招聘hadoop岗位的薪资数据,其中几行示例数据如下:

    美团    3-5年经验   15-30k  北京    【够牛就来】hadoop高级工程...
    北信源  3-5年经验   15-20k  北京    Java高级工程师(有Hadoo...
    蘑菇街  3-5年经验   10-24k   杭州    hadoop开发工程师
    晶赞科技    1-3年经验   10-30k  上海    hadoop研发工程师
    秒针系统    3-5年经验   10-20k  北京    hadoop开发工程师
    搜狐    1-3年经验   18-23k  北京    大数据平台开发工程师(Hadoo...
    执御    1-3年经验   8-14k   杭州    hadoop工程师
    KK唱响  3-5年经验   15-30k  杭州    高级hadoop开发工程师
    晶赞科技    1-3年经验   12-30k  上海    高级数据分析师(hadoop)
    亿玛在线(北京)科技有限公司    3-5年经验    18-30k  北京    hadoop工程师
    酷讯    1-3年经验   10-20k  北京    hadoop Engineer/...
    游族网络    5-10年经验  20-35k  上海    hadoop研发工程师
    易车公司    3-5年经验   15-30k  北京    hadoop工程师
    爱图购  1-3年经验   8-15k   杭州    hadoop开发工程师
    晶赞科技    3-5年经验   15-33k  上海    hadoop研发工程师

    功能需求:基于这份数据统计Hadoop工程师各工作年限段的薪资水平,输出如下:

    1-3年经验    8-30k
    3-5年经验    10-33k
    5-10年经验    20-35k 
    import java.io.IOException;
    import java.util.regex.Pattern;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    
    /**
     * 基于样本数据做Hadoop工程师薪资统计:计算各工作年限段的薪水范围
     */
    public class SalaryCount extends Configured implements Tool {
    
        public static class SalaryMapper extends Mapper< LongWritable, Text, Text, Text> {
             public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //红色自己编写
    //示例数据:美团 3-5年经验 15-30k 北京 hadoop高级工程 String line = value.toString();//读取每行数据         String[] record = line.split( "\s+");//使用空格正则解析数据         //key=record[1]:输出3-5年经验        //value=record[2]:15-30k       //作为Mapper输出,发给 Reduce 端         if(record.length >= 3){         context.write( new Text(record[1]), new Text(record[2])
           );
        } } }
    public static class SalaryReducer extends Reducer< Text, Text, Text, Text> { public void reduce(Text Key, Iterable< Text> Values, Context context) throws IOException, InterruptedException { int low = 0;//记录最低工资 int high = 0;//记录最高工资 int count = 1; //针对同一个工作年限(key),循环薪资集合(values),并拆分value值,统计出最低工资low和最高工资high for (Text value : Values) {     String[] arr = value.toString().split("-");     int l = filterSalary(arr[0]);     int h = filterSalary(arr[1]);     if(count==1 || l< low){      low = l;     }     if(count==1 || h>high){      high = h;      }     count++;       }       context.write(Key, new Text(low + "-" +high + "k"));   } } //正则表达式提取工资值 public static int filterSalary(String salary) { String sal = Pattern.compile("[^0-9]").matcher(salary).replaceAll(""); return Integer.parseInt(sal); } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration();//读取配置文件 Path out = new Path(args[1]); FileSystem hdfs = out.getFileSystem(conf); if (hdfs.isDirectory(out)) {//删除已经存在的输出目录 hdfs.delete(out, true); } Job job = new Job(conf, "SalaryCount" );//新建一个任务 job.setJarByClass(SalaryCount.class);// 主类 FileInputFormat.addInputPath(job, new Path(args[0]));// 文件输入路径 FileOutputFormat.setOutputPath(job, new Path(args[1]));// 文件输出路径 job.setMapperClass(SalaryMapper.class);// Mapper job.setReducerClass(SalaryReducer.class);// Reducer job.setOutputKeyClass(Text.class);//输出结果key类型 job.setOutputValueClass(Text.class);//输出结果的value类型 job.waitForCompletion(true);//等待完成退出作业 return 0; } /** * @param args 输入文件、输出路径,可在Eclipse中Run Configurations中配Arguments,如: * hdfs://single.hadoop.dajiangtai.com:9000/junior/salary.txt * hdfs://single.hadoop.dajiangtai.com:9000/junior/salary-out/ */ public static void main(String[] args) throws Exception { try { int res = ToolRunner.run(new Configuration(), new SalaryCount(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } }

    总结,通过左右对比可看出,70%的代码几乎一样,重要的是修改map、reduce部分,以及run方法中的部分设置。好了,在你搭建好的Eclipse开发环境中跑一下这个程序看看结果吧。

  • 相关阅读:
    在java中使用ffmpeg将amr格式的语音转为mp3格式
    keras实现不同形态的模型
    TF版本的Word2Vec和余弦相似度的计算
    TensorFlow中的两种conv2d方法和kernel_initializer
    CNN中的padding
    记一次sqoop同步到mysql
    理解HDFS
    TensorFlow中的优化算法
    使用TensorFlow实现分类
    使用TensorFlow实现回归预测
  • 原文地址:https://www.cnblogs.com/bob-wzb/p/5145211.html
Copyright © 2011-2022 走看看