zoukankan      html  css  js  c++  java
  • MR 数据过滤

    MR:

    package com.euphe.filter;
    
    import com.euphe.util.HUtils;
    import com.euphe.util.Utils;
    import com.euphe.util.standardUtil.Location;
    import com.euphe.util.standardUtil.StringListTools;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.Tool;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import static com.euphe.util.standardUtil.Shufflter.shufflter;
    
    public class FilterJob extends Configured implements Tool {
        public static class Map extends Mapper<Object, Text, Text, Text> {
            private static Text text = new Text();
    
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                List<String> FirstList = new ArrayList<String>();
                FirstList = StringListTools.StringToList(line,"	");
                String time = FirstList.get(Location.DATE_TIME);
                context.write(new Text(time), new Text(line));//时间作为key,一行作为value
            }
        }
    
        public static class Reduce extends Reducer<Text, Text, NullWritable, Text> {
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                try{
                    String line = "";
                    String result = "";
                    boolean fflg = false;
                    List<String> resultList = new ArrayList<String>();
                    for(Text value : values){//对一个分区的每个数据进行处理
                        line = value.toString();
                        fflg = shufflter(line);//对这行进行过滤
    
                        if(fflg){
                            resultList.add(line);//满足条件添加进list
                        }
                    }
                    result = StringListTools.ListToString(resultList, "
    ");
                    context.write(NullWritable.get(), new Text(result));
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = HUtils.getConf();
            conf.set("mapreduce.job.jar", Utils.getRootPathBasedPath("WEB-INF/jars/filter.jar"));//打包运行
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();//解析命令行参数
            if (otherArgs.length !=2) {//要求必须有输入和输出路径两个参数
                System.err.println("Usage: com.euphe.filter.FilterJob <in> <out>");
                System.exit(2);
            }
            Job job =  Job.getInstance(conf,"Filter input  :"+otherArgs[0]+" to "+otherArgs[1]);
            job.setJarByClass(FilterJob.class);
            job.setMapperClass(FilterJob.Map.class);
            job.setReducerClass(FilterJob.Reduce.class);
            job.setNumReduceTasks(1);
    
            //设置map输出的key value
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //设置reducer输出的key,value类型
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
    
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
            FileSystem.get(conf).delete(new Path(otherArgs[1]), true);//调用任务前先删除输出目录
            return job.waitForCompletion(true) ? 0 : 1;
        }
    }

    shufflter函数:

    package com.euphe.util.standardUtil;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class Shufflter {
        public static boolean shufflter(String line){
            boolean fflg = true;
            List<String> tmpList = new ArrayList<String>();
            tmpList = StringListTools.StringToList(line, "	");
    
            try{//这四个元素不全为空时返回bool值为true
                String dk1 = tmpList.get(Location.DK1);
                String of = tmpList.get(Location.osFamily);
                String uf = tmpList.get(Location.uaFamily);
                String ty = tmpList.get(Location.type);
                if(dk1.equals("null")
                        && of.equals("unknown")
                        && uf.equals("unknown")
                        && ty.equals("unknown"))
                    fflg = false;
    
            }catch (Exception e){
                e.printStackTrace();
            }
            return fflg;
        }
    }

    Location工具:

    package com.euphe.util.standardUtil;
    
    public class Location {
        //原始数据中各属性的位置
        public static final int DATE_TIME = 0;
    
        //shufflter阶段各属性位置
        public static final int DK1 = 5;
        public static final int osFamily = 8;
        public static final int uaFamily = 9;
        public static final int type = 13;
    }

    StringList工具:

    package com.euphe.util.standardUtil;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class StringListTools {
        public static List<String> StringToList(String str, String seperator){
            //该函数读入日志文件的一行,根据分隔符将各个项保存到List中
            if(str == null)
                return null;
    
            List<String> strList = new ArrayList<String>();
            String[] strArray = str.split(seperator);
            for(String text : strArray)
                strList.add(text);
            return strList;
        }
    
        public static String ListToString(List<String> tempList, String seperator) {
            //该函数根据分隔符将List保存为String
            if (tempList == null)
                return null;
    
            String temp = new String();
            for(int i = 0; i < tempList.size()-1; i++){
                temp = temp + tempList.get(i) + seperator;
            }
            temp = temp + tempList.get(tempList.size()-1);
            return temp;
        }
    }
  • 相关阅读:
    20170411linux常用命令
    20170411oracle常用命令
    20170411-oracle 查询指定节点下的所有子节点包括直到叶子节点
    20170329oracle安装教程
    20170329plsql连接oracle
    20170329001怎么让plsql窗口列表保持
    Eclispse 换主题、皮肤、配色,换黑色主题护眼
    zbb20170303使用ssh一直找不到session,报错not found session in current thread
    zbb20170303_ant_build.xml详解
    hdu Farm Irrigation
  • 原文地址:https://www.cnblogs.com/xym4869/p/8960951.html
Copyright © 2011-2022 走看看