zoukankan      html  css  js  c++  java
  • hadoop编程实践

    项目文件:Github

    需求一:

    package test.dataclean;
    
    import java.io.IOException;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    /*
     * @ author:Kouch
     * 
     *  “清洗”思路:
     *      1 map: 获取的一行数据;判断一行字符串长度;
     *      2 reduce:
     * 
     *  注:结合具体需求;
     */
    
    public class DataHandle1 {
        
        //map
        public static class Map extends Mapper<Object,Text,Text,Text>{
            
            private static Text line=new Text();
            public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
                line=value;
                //测试
                System.out.println("内容:"+line);
                
                //一行字符串长度;
                String str=line.toString();
                //System.out.println("zhuan:"+str);
                
                if(str.length()>20) {
                    context.write(line, new Text(""));
                }
            }
        }
        
        //reduce
        public static class Reduce extends Reducer<Text,Text,Text,Text>{
            public void reduce(Text key,Iterable<Text>values,Context context) throws IOException, InterruptedException {
                
                //测试
                //System.out.println("内容:"+key);
                context.write(key, new Text(""));
            }
        }
        
        
        //main
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            
            //配置类
            Configuration conf=new Configuration();
            conf.set("mapred.job.tracker", "localhost:9000");
            
            //获取传参
            //方式一:
            String[] ioArgs=new String[] {"input/dailydata1.txt","out"};
            String[] otherArgs=new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
            if(otherArgs.length!=2) {
                System.err.println("Usage:Data Clean <in> <out> - path?");
                System.exit(2);
            }
            
            //判断输出文件是否存在;存在-删除;
            String url="hdfs://localhost:9000/user/kouch/"+ioArgs[1];
            FileSystem fs=FileSystem.get(URI.create(url), conf);
            if(fs.delete(new Path(url), true)) {//true:文件夹下所有文件;false:如果此文件存在其他文件就不删除
                System.out.println("删除"+url);
            }
            
            //Job设置
            Job job=Job.getInstance();
            job.setJarByClass(Deduplication.class);
            job.setMapperClass(Map.class);
            job.setCombinerClass(Reduce.class);
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            //设置输入输出目录
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            
            //等待job完成之后再返回结果并退出程序
            System.exit(job.waitForCompletion(true)?0:1);
            
        }
        
        
    }

    需求二:

    package test.dataclean;
    
    import java.io.IOException;
    import java.net.URI;
    import java.util.Iterator;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    /*
     * @ author:Kouch
     * 
     *  “清洗”思路:
     *      1 map: 获取的一行数据;去除错误数据;截取有效字段;输入context;
     *      2 reduce:
     * 
     *  注:结合具体需求;
     *  
     *  统计:get/post/head 请求;
     */
    
    public class DataHandle2 {
        
        //map
        public static class Map extends Mapper<Object,Text,Text,IntWritable>{
            private static final IntWritable one = new IntWritable(1);
            private static Text line=new Text();
            public void map(Object key,Text value,Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                line=value;
                //测试
                //System.out.println("内容:"+line);
                String str=line.toString();
                
                if(!(str.indexOf("%")>0)) {
                    //System.out.println("内容:"+line);
                    String[] strs=str.split(""");
                    //System.out.println("内容:"+strs[1]);
                    String need=strs[1];
                    
                    if(need.startsWith("G")) {
                        //System.out.println("G");
                        context.write(new Text("Get"), one);
                    }else if(need.startsWith("H")) {
                        //System.out.println("H");
                        context.write(new Text("Head"), one);
                    }else if(need.startsWith("P")){
                        //System.out.println("P");
                        context.write(new Text("Post"), one);
                    }else {
                        
                    }
                }
                
            }
        }
        
        //reduce
        public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
            
            private IntWritable result = new IntWritable();
            
            public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException {
                
                //测试
                //System.out.println("内容:"+key);
                
                int sum=0;
                
                //迭代累计频率;
                IntWritable val;
                for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                    val = (IntWritable)i$.next();
                }
    
                this.result.set(sum);
                context.write(key, this.result);
            }
        }
        
        
        //main
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            
            //配置类
            Configuration conf=new Configuration();
            conf.set("mapred.job.tracker", "localhost:9000");
            
            //获取传参
            //方式一:
            String[] ioArgs=new String[] {"input/daya2.txt","out3"};
            String[] otherArgs=new GenericOptionsParser(conf,ioArgs).getRemainingArgs();
            if(otherArgs.length!=2) {
                System.err.println("Usage:Data Clean <in> <out> - path?");
                System.exit(2);
            }
            
            //判断输出文件是否存在;存在-删除;
            String url="hdfs://localhost:9000/user/kouch/"+ioArgs[1];
            FileSystem fs=FileSystem.get(URI.create(url), conf);
            if(fs.delete(new Path(url), true)) {//true:文件夹下所有文件;false:如果此文件存在其他文件就不删除
                System.out.println("删除"+url);
            }
            
            //Job设置
            Job job=Job.getInstance();
            job.setJarByClass(Deduplication.class);
            job.setMapperClass(Map.class);
            job.setCombinerClass(Reduce.class);
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            //设置输入输出目录
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            
            //等待job完成之后再返回结果并退出程序
            System.exit(job.waitForCompletion(true)?0:1);
            
        }
        
        
    }
    ...................................................
  • 相关阅读:
    Linux入门之常用命令(14) kill
    【JVM命令系列】jstack
    【JVM命令系列】javap
    【JVM命令系列】jmap
    Hadoop安全(2)——————UserGroupInformation
    Hadoop安全(1)——————美团Hadoop安全实践
    软件测试流程进阶----两年软件测试总结
    每个程序员应该阅读的10本书籍
    成为优秀程序员的黄金10条法则
    Java异常的深入研究与分析
  • 原文地址:https://www.cnblogs.com/floakss/p/11455797.html
Copyright © 2011-2022 走看看