zoukankan      html  css  js  c++  java
  • Java搭建MapReduce输出到DB具体步骤

    1、构建新的作业

    Configuration conf=getConf();
    String driverClassName="com.mysql.cj.jdbc.Driver";
    String dburl="jdbc:mysql://x:3306/mr_test";
    DBConfiguration.configureDB(conf, driverClassName, dburl, "root", "root");
    Job job=Job.getInstance(conf);
    job.setJarByClass(MaxTempSaveDB.class);

    2、设置输入输出

    FileInputFormat.addInputPath(job, new Path(conf.get("inpath")));
    DBOutputFormat.setOutput(job, "maxTmp", "year", "tmp");

    3、设置自定义Mapper Reducer

    job.setMapperClass(MaxTempSaveDBMapper.class);
    job.setReducerClass(MaxTempSaveDBReducer.class);

    4、设置Mapper Reducer输出类型

    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(YearTmp.class);
    job.setOutputValueClass(NullWritable.class);

    5、等待程序运行结束退出JVM

    return job.waitForCompletion(true)?0:1;

    注意事项:

    1、Reducer阶段context.write只有key值能写入DB中,所以需要创建类传给key接收传递数据

    2、传递给key的类中要重写write和readFields方法

    package com.mr;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    /*统计每年的最高气温*/
    public class MaxTempSaveDB extends Configured implements Tool {
        /*自定义Mapper类*/
        public static class MaxTempSaveDBMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String[] info = value.toString().split(" ");
                context.write(new IntWritable(Integer.parseInt(info[0])),new IntWritable(Integer.parseInt(info[3])));
            }
        }
        /*自定义Reduce类
         * */
        public static class MaxTempSaveDBReducer extends Reducer<IntWritable, IntWritable, YearTmp, NullWritable> {
            @Override
            protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
                    throws IOException, InterruptedException {
                int max=0;
                for(IntWritable v:values){
                    if(v.get()>max){
                        max=v.get();
                    }
                }
                YearTmp maxkey=new YearTmp(key.get(), max);
                context.write(maxkey, NullWritable.get());
            }
        }
        /*
         * 构建main方法*/
        public static void main(String[] args) throws Exception {
            int result=ToolRunner.run(new MaxTempSaveDB(), args);
            System.out.println(result);
        }
        @Override
        public int run(String[] strings) throws Exception {
            //1、构建新的作业
            Configuration conf=getConf();
            String driverClassName="com.mysql.cj.jdbc.Driver";
            String dburl="jdbc:mysql://x:3306/mr_test";
            DBConfiguration.configureDB(conf, driverClassName, dburl, "root", "root");
            Job job=Job.getInstance(conf);
            job.setJarByClass(MaxTempSaveDB.class);
            //2、设置输入输出路径
            FileInputFormat.addInputPath(job, new Path(conf.get("inpath")));
            DBOutputFormat.setOutput(job, "maxTmp", "year", "tmp");
            //3、设置Mapper和Reducer
            job.setMapperClass(MaxTempSaveDBMapper.class);
            job.setReducerClass(MaxTempSaveDBReducer.class);
            //4、设置Mapper Reducer输出类型
            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(YearTmp.class);
            job.setOutputValueClass(NullWritable.class);
            //5、等待程序运行推出JVM
            return job.waitForCompletion(true)?0:1;
        }
    }
  • 相关阅读:
    CentOS 7 使用NVM管理nodejs(转)
    Linux下Git安装及配置 (转)
    linux添加计划任务(转载)
    centos编译libcurl库找不到ssl的问题
    windows下搭建nginx+php+虚拟主机配置过程(转)
    xocde中宏定义使用
    unity shader vs高亮提示插件
    xcode8 自动打包
    3DMax中如何刷顶点色
    Adreno Profiler调试注意事项
  • 原文地址:https://www.cnblogs.com/qiangang/p/13697906.html
Copyright © 2011-2022 走看看