zoukankan      html  css  js  c++  java
  • Hadoop MR编程

    Hadoop开发job需要定一个Map/Reduce/Job(启动MR job,并传入参数信息),以下代码示例实现的功能:

    1)将一个用逗号分割的文件,替换为“|”分割的文件;

    2)对小文件合并,将文件合并为reduceNum个文件。

    DataMap.java

    package com.dx.fpd_load;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class DataMap extends Mapper<LongWritable, Text, Text, Text> {
        private final Text key = new Text();
    
        @Override
        protected void map(LongWritable longWritable, Text value, Context context) throws IOException, InterruptedException {
            // 如果数据为空,则不进行处理,跳出map输入
            if (value.getLength() == 0) {
                return;
            }
    
            
            String newValue = value.toString().replace(",", "|") + "|NULL|NULL";
            String[] newValues = newValue.split("\|");
    
            // 输入的文件路径
            String filePath = context.getInputSplit().toString().toUpperCase();
    
            // 如果路径包含了fpd_bak才进行处理否则不处理
            if (filePath.contains("fpd_bak".toUpperCase()) && newValues.length > 10) {
                key.set(newValues[6]); //objid
    
                context.write(key, new Text(newValue));
            }
        }
    }

    DataReducer.java

    package com.dx.fpd_load;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    
    import java.io.IOException;
    
    public class DataReducer extends Reducer<Text, Text, NullWritable, Text> {
        public MultipleOutputs multipleOutputs;
        public final Text key = new Text();
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            multipleOutputs = new MultipleOutputs(context);
        }
    
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text text : values) {
                String data = text.toString();
    
                String[] p_days = context.getConfiguration().getStrings("p_day");
                String[] p_cities = context.getConfiguration().getStrings("p_city");
    
                String p_day = "p_day";
                if (p_days != null) {
                    p_day = p_days[0];
                }
                String p_city = "p_city";
                if (p_cities != null) {
                    p_city = p_cities[0];
                }
    
                multipleOutputs.write("fpdload", NullWritable.get(), new Text(data), "/thetenet/my_hive_db/fpd_new/p_day=" + p_day + "/p_city=" + p_city + "/fpd_data");
            }
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            multipleOutputs.close();
        }
    }

    DataJob.java

    package com.dx.fpd_load;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class FingerLib_Load_DataJob {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            String p_city = otherArgs[0];
            String p_day = otherArgs[1];
            String reducerNum = otherArgs[2];
            String inputPath = otherArgs[3];
            String outputPath = otherArgs[4];
    
            if (p_day == null) {
                throw new Exception("p_day is null");
            }
            conf.set("p_day", p_day);
            if (p_city == null) {
                throw new Exception("p_city is null");
            }
            conf.set("p_city", p_city);
    
            Job job = Job.getInstance(conf);
            job.setJobName("LoadDataIntoFPD_p_city" + p_city + "_p_day_" + p_day);
            job.setJarByClass(DataJob.class);
            job.setMapperClass(DataMap.class);
            job.setReducerClass(DataReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
            job.setNumReduceTasks(Integer.parseInt(reducerNum));
    
            MultipleOutputs.addNamedOutput(job, "fpdload", TextOutputFormat.class, NullWritable.class, Text.class);
    
            FileInputFormat.addInputPath(job, new Path(inputPath));
            FileOutputFormat.setOutputPath(job, new Path(outputPath));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    调用脚本:

    #!/usr/bin/env bash
    source /app/mylinux/login.sh
    #./submit_fpdload.sh 20171225 570 400
    DAY=$1
    CITY=$2
    REDUCER_NUMBER=$3
    
    JAR="/app/mylinux/service/dx-1.0-SNAPSHOT.jar"
    
    MAIN_CLASS="com.dx.fpd_load.DataJob"
    INPUT_PATH="/thetenet/my_hive_db/fpd_bak/p_day=$DAY/p_city=$CITY/"
    OUT_DIR="/thetenet/my_hive_db/fpd_load_out/"
    
    hadoop fs -rm -r /thetenet/my_hive_db/fpd_new/p_day=$DAY/p_city=$CITY/
    hadoop fs -rm -r $OUT_DIR
    
    
    time yarn jar $JAR $MAIN_CLASS $CITY $DAY $REDUCER_NUMBER $INPUT_PATH $OUT_DIR
    
    #beeline -e "
    #alter table my_hive_db.fpd_new add if not exists partition(p_day=$DAY,p_city=$CITY)
    #location '/thetenet/my_hive_db/fpd_new/p_day=$DAY/p_city=$CITY/';"
    
    echo "Complete..."
  • 相关阅读:
    vue-cli(vue脚手架)简单流程
    windows环境之node.js安装与环境配置
    fiddler的下载和简单使用
    Linux 配置nginx 代理tomcat,配置ssl
    来聊一聊导出数据问题
    作为一个开发人员应该具备怎么样技术栈和职业素养
    NODEJS的误打误撞
    聊一下程序员的日常
    openstack安装部署私有云详细图文
    openstack:OpenStack架构详解,
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/8555107.html
Copyright © 2011-2022 走看看