zoukankan      html  css  js  c++  java
  • MapReduce实验——数据清洗

    实验要求

    https://pan.baidu.com/s/1hTnGnxdvgyNaLwckbKUgJA

    程序源代码

    package cleandata;
    
    import java.io.IOException;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class CleanResult {
        public static void main(String[] args) throws IOException,ClassNotFoundException,InterruptedException{
            Job job = Job.getInstance();
            job.setJobName("CleanResult");
            job.setJarByClass(CleanResult.class);
            job.setMapperClass(doMapper.class);
            job.setReducerClass(doReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            Path in = new Path("hdfs://192.168.12.128:9000/mapreduce/in/result.txt");
            Path out = new Path("hdfs://192.168.12.128:9000/mapreduce/out/clean");
            FileInputFormat.addInputPath(job,in);
            FileOutputFormat.setOutputPath(job,out);
            System.exit(job.waitForCompletion(true) ? 0:1);
        }
        public static class doMapper extends Mapper<Object,Text,Text,IntWritable>{
            public static Text word = new Text();
            public static final IntWritable id = new IntWritable();
            @Override
            protected void map(Object key,Text value,Context context) throws IOException,InterruptedException{
                String[] data = value.toString().split(",");
                String[] time = data[1].split(":");
                data[1] = "2016-11-10 " + time[1] + ":" + time[2] + ":"  + time[3].split(" ")[0];
                String traffic = "";
                for(int i=0;i<data[3].length();i++){
                    if(data[3].charAt(i) >= 48 && data[3].charAt(i)<=57) {
                        traffic += data[3].charAt(i);
                    }
                }
                data[3] = traffic;
                String newValue = data[0];
                for(int i=1;i<5;i++)
                    newValue += "	" + data[i];
                id.set(Integer.parseInt(data[5]));
                word.set(newValue);
                context.write(word, id);
            }
        }
        
        public static class doReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
            private IntWritable result = new IntWritable();
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                 System.out.println("key:" + key);
                    for(IntWritable value:values){
                        result.set(value.get());
                    }
                    System.out.println("value:" + result);
                    context.write(key, result);
            }
            
        }
    
    }

    清洗后数据导入到hive:

    命令行导入:

    load data inpath '/mapreduce/out/clean/part-r-00000' into table data;

    Java代码:

    package cleandata;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.Statement;
    
    public class LoadDataFileToHive {
    
        public static void main(String[] args) {
            String JDBC_DRIVER = "org.apache.hive.jdbc.HiveDriver";
            String CONNECTION_URL = "jdbc:hive2://192.168.12.128:10000/default";
            String username = "";
            String password = "";
            Connection con = null;
            
            try {
                Class.forName(JDBC_DRIVER);
                con = (Connection) DriverManager.getConnection(CONNECTION_URL,username,password);
                Statement stmt = con.createStatement();
                String sql = "load data inpath '/mapreduce/out/clean/part-r-00000' into table data";
                stmt.execute(sql);
                System.out.println("数据导入至hive成功!");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 关闭rs、ps和con
                if(con != null){
                    try {
                        con.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    运行结果截图

    数据清洗完毕:

     文件从hdfs上传至hive后

  • 相关阅读:
    spark 插入数据到mysql时遇到的问题 org.apache.spark.SparkException: Task not serializable
    Linux编译安装python3
    爬取基于Flex技术的网站数据
    转:MySQL到Greenplum迁移分析
    Greenplum基本介绍
    Hive on Spark
    Flume可分布式日志收集系统
    Redis高性能内存数据库
    Kafka 分布式发布-订阅消息系统
    Spark学习之Spark Streaming
  • 原文地址:https://www.cnblogs.com/dream0-0/p/11851540.html
Copyright © 2011-2022 走看看