zoukankan      html  css  js  c++  java
  • Hadoop通过HCatalog编写Mapreduce任务访问hive库中schema数据

    1、dirver

    package com.kangaroo.hadoop.drive;
    
    import java.util.Map;
    import java.util.Properties;
    
    import com.kangaroo.hadoop.mapper.AggregateMapper;
    import com.kangaroo.hadoop.reducer.AggregateReducer;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.util.ToolRunner;
    import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.kangaroo.hadoop.utils.PropertiesUtil;
    
    
    public class DriveMain extends Configured implements Tool {
    
        private static final Logger logger = LoggerFactory.getLogger(DriveMain.class);
        private Configuration conf;
        private PropertiesUtil propUtil;
    
        public DriveMain() {
            this.conf = new Configuration();
            this.propUtil = new PropertiesUtil("configure.properties");
        }
    
        public int run(String[] args) throws Exception {
            try {
                logger.info("MapReduce Job Beginning.");
                String dbName = args[0];
                String tableName = args[1];
                String partition = args[2];
                String sumField = args[3];
                String outPath = args[4];
                String partFilter = partitionFormat(partition);
                logger.info("[Params] dbName:{}; tableName:{}, partition:{}, sumField:{}, outPath:{}, partFilter:{}",
                        dbName, tableName, partition, sumField, outPath, partFilter);
                this.conf.set("sumField", sumField);
                this.setMapRedConfiguration();
                Job job = this.setJobConfiguration(this.conf);
                HCatInputFormat.setInput(job, dbName, tableName, partFilter);
                logger.info("setInput successfully.");
                FileOutputFormat.setOutputPath(job, new Path(outPath));
                logger.info("setOutput successfully.");
                return (job.waitForCompletion(true) ? 0 : 1);
            } catch (Exception ex) {
                logger.error(ex.getMessage());
                throw ex;
            }
        }
    
        private Job setJobConfiguration(Configuration conf) throws Exception {
            try {
                logger.info("enter setJobConfiguration");
                Job job = Job.getInstance(conf);
                job.setJarByClass(DriveMain.class);
                job.setInputFormatClass(HCatInputFormat.class);
                job.setMapperClass(AggregateMapper.class);
                job.setReducerClass(AggregateReducer.class);
    
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                job.setNumReduceTasks(1);
                logger.info("setJobConfiguration successfully.");
                return job;
            } catch (Exception ex) {
                logger.error("setJobConfiguration: " + ex.getMessage());
                throw new Exception(ex);
            }
        }
    
        private void setMapRedConfiguration() {
            try {
                Properties properties = propUtil.getProperties();
                logger.info("Load MapReduce Configuration Successfully.");
                for (Map.Entry entry : properties.entrySet()) {
                    if (entry.getKey().toString().startsWith("mapred")) {
                        conf.set(entry.getKey().toString(), entry.getValue().toString());
                        logger.info("[MR][Config] key:{}, value:{}", entry.getKey().toString(), entry.getValue().toString());
                    }
                }
                logger.info("[MR][Config] Set MapReduce Configuration Successfully.");
            } catch (Exception e) {
    
            }
    
        }
    
    
        private String partitionFormat(String partition) {
            String format = "";
            if(!partition.contains("pt") && ! partition.contains("dt")) {
                String[] items = partition.split("/");
                String[] keys = {"year","month","day", "hour"};
                for(int i=0; i<items.length; i++) {
                    if (i == items.length-1) {
                        format += keys[i] + "='" + items[i] + "'";
                    } else {
                        format += keys[i] + "='" + items[i] + "' and ";
                    }
                }
            } else {
                format = partition;
            }
            return format;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new DriveMain(), args);
            System.exit(exitCode);
        }
    
    }

    2、Mapper

    package com.kangaroo.hadoop.mapper;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hive.hcatalog.data.HCatRecord;
    import org.apache.hive.hcatalog.data.schema.HCatSchema;
    import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    @SuppressWarnings("rawtypes")
    public class AggregateMapper extends Mapper<WritableComparable, HCatRecord, Text, Text> {
    
        private static final Logger logger = LoggerFactory.getLogger(AggregateMapper.class);
    
        private HCatSchema schema;
        private Text outKey;
        private Text outValue;
        private IntWritable one;
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            outKey = new Text();
            outValue = new Text();
            schema = HCatInputFormat.getTableSchema(context.getConfiguration());
        }
    
        @Override
        protected void map(WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException {
            String sumField = context.getConfiguration().get("sumField");
            Map<String, String> recordMap = new HashMap<String, String>();
            for (String fieldName : schema.getFieldNames()) {
                logger.info("fieldName={}", fieldName);
                String fieldValue = value.get(fieldName, schema).toString();
                logger.info("fieldName={}, fieldValue={}", fieldName, fieldValue);
                recordMap.put(fieldName, fieldValue);
                logger.info("recordMap={}", recordMap.toString());
            }
            outKey.set(recordMap.get(sumField));
            outValue.set("1");
        }
    
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            context.write(outKey, outValue);
        }
    }

    3、Reducer

    package com.kangaroo.hadoop.reducer;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hive.hcatalog.data.schema.HCatSchema;
    import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    @SuppressWarnings("rawtypes")
    public class AggregateReducer extends Reducer<Text, Text, Text, Text> {
        protected static final Logger logger = LoggerFactory.getLogger(AggregateReducer.class);
        HCatSchema schema;
        Text outKey;
        Text outValue;
    
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            schema = HCatInputFormat.getTableSchema(context.getConfiguration());
        }
    
        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException {
            outKey.set(key);
            int sum = 0;
            for (Text value : values) {
                sum += Integer.parseInt(value.toString());
            }
            outValue.set(String.valueOf(sum));
        }
    
        protected void cleanup(Context context) throws IOException, InterruptedException {
            context.write(outKey, outValue);
        }
    }

    4、propertyUtil

    package com.kangaroo.hadoop.utils;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.OutputStreamWriter;
    import java.io.UnsupportedEncodingException;
    import java.util.Enumeration;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    
    
    public class PropertiesUtil {
        private String filePath;
    
        public PropertiesUtil() {
            this.filePath = "configure.properties";
        }
    
        public PropertiesUtil(String filePath) {
            this.filePath = filePath;
        }
    
        public Properties getProperties() throws IOException {
            Properties prop;
            InputStream inStream = null;
            try {
                inStream = PropertiesUtil.class.getClassLoader()
                        .getResourceAsStream(this.filePath);
                prop = new Properties();
                prop.load(inStream);
    
                return prop;
            } finally {
                if (inStream != null)
                    inStream.close();
            }
        }
    }

    5、配置

    mapred.job.queue.name=root.XXX
    mapred.jar=./XXX.jar
    mapred.map.tasks=300
    mapred.reduce.tasks=100
    #mapred.map.capacity=1
    #mapred.reduce.capacity=1
    mapred.job.priority=HIGH
    mapred.job.name=XXX
  • 相关阅读:
    差分约束
    POJ 2449 Remmarguts' Date[k短路]
    K短路
    hdu4034 Graph(floyd)
    hdu2089不要62(数位dp)
    POJ3468 A Simple Problem with Integers ( 线段树)
    POJ3255:Roadblocks(次短路 SPFA+A星)
    usaco2.1Ordered Fractions( 枚举, 数学)
    hdu1565方格取数(1) (状态压缩dp)
    poj3259 Wormholes(spfa)
  • 原文地址:https://www.cnblogs.com/kangoroo/p/7139391.html
Copyright © 2011-2022 走看看