zoukankan      html  css  js  c++  java
  • 2020112401

    实在是没想到能通过控制变量找到MapReduce实验的问题所在:300多MB的文件生成的LinkedHashMap对于Map阶段来说还是太大了。

    分别拿两个缩减了数据的表文件和完整的表文件后发现的。也在正式运行之后体会到了MapReduce在大量数据的处理上的优势:对于1GB的数据文件,使用MySQL之类的关系型数据库,光是导入就可能需要数小时,而两表的合并对于MapReduce只需要一至数分钟就能完成。

    当然使用关系型数据库的SQL语句进行计算也有自己的优势:无需编程的同时又支持各种逻辑判断,一行语句就能完成非常复杂的功能(当然,一般不建议这么做),而且在数据量不大时能快速获得结果。而MapReduce在开始Map之前的准备都要不少时间,对于比较复杂的操作需要分很多次的MapReduce。比如这次的实验第一步的合并计算的步骤,采用MapReduce就分为了第一次MapReduce将两个文件合并(key为发票ID),第二次的MapReduce将同种商品的进销进行合并(key为商品名称+规格+单位)。而使用SQL只用JOIN并SUM之后GROUP BY一下,一句语句就能搞定,求合并排序也是如此。

    不过个人本次实验采用的纯MapReduce,因而对最后的判断无能为力,只能通过最原始的进出货数量进行比对,保留单位也是为了方便具体比对,但是如何将其数值化有些无能为力。因而最后只是普通的数值求合然后排序了一遍。

    如果要很好地进行分析,需要借助第三张表生成决策树/训练神经网络来对结果进行分析。

    合并两表:

    package konoha.pkg.pkg01;
    
    import java.io.IOException;
    import java.net.URISyntaxException;
    import java.util.Vector;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    class DealInfo {
        public String buyerId = null;
        public String sellerId = null;
    
        public DealInfo(String buyer, String seller) {
            buyerId = buyer;
            sellerId = seller;
        }
    }
    
    public class MapRedStep1 {
    
        public static class Type0Mapper extends Mapper<Object, Text, Text, Text> {
    
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String filePath = ((FileSplit) context.getInputSplit()).getPath().toString();
                String line = value.toString();
                line = line.substring(1, line.length() - 1);
                // System.out.println("[DEBUG]Processing Line:" + line);
                String str[] = line.split(",");
                if (filePath.contains("hwmx")) {
                    if (str.length == 10) {
                        if (!str[9].equals("Y"))
                            context.write(new Text(str[0]),
                                    new Text("nya" + str[2] + "	" + str[3] + "	" + str[4] + "," + str[5]));
                    }
                } else {
                    if (str.length == 9) {
                        context.write(new Text(str[0]), new Text("kon" + str[1] + "	" + str[2]));
                    }
    
                }
    
            }
        }
    
        public static class Type0Reducer extends Reducer<Text, Text, Text, Text> {
            @Override
            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                String buyer = null;
                String seller = null;
                Vector<String> nya = new Vector<String>();
                String tmp = null;
                String str[] = null;
                for (Text val : values) {
                    tmp = val.toString();
                    if (tmp.startsWith("kon")) {
                        str = tmp.substring(3).split("	");
                        buyer = str[0];
                        seller = str[1];
                    } else if (tmp.startsWith("nya")) {
                        nya.add(tmp.substring(3));
                    }
                }
                if (buyer != null && seller != null) {
                    for (int j = 0; j < nya.size(); j++) {
                        str = nya.get(j).split(",");
                        if (!str[1].equals("null")) {
                            context.write(new Text(seller), new Text(str[0] + "	" + (-Double.parseDouble(str[1]))));
                            context.write(new Text(buyer), new Text(str[0] + "	" + Double.parseDouble(str[1])));
                        }
                    }
                }
            }
        }
    
        public static void main(String[] args)
                throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
            Configuration conf = new Configuration();
            System.out.println("[INFO]Starting Job...");
            Job job = new Job(conf, "MyAverageStep1");
            System.out.print("[INFO]Setting Jar By Class...");
            job.setJarByClass(MapRedStep1.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Mapper Class...");
            job.setMapperClass(Type0Mapper.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Reducer Class...");
            job.setReducerClass(Type0Reducer.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Output Key Class...");
            job.setOutputKeyClass(Text.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Output Value Class...");
            job.setOutputValueClass(Text.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Input Format Class...");
            job.setInputFormatClass(TextInputFormat.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting OutputFormatClass...");
            job.setOutputFormatClass(TextOutputFormat.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Creating Path Varable(s)...");
            Path left = new Path("hdfs://localhost:9000/user/test/zzsfp");
            Path right = new Path("hdfs://localhost:9000/user/test/zzsfp_hwmx");
            Path out = new Path("hdfs://localhost:9000/user/test/temp");
            System.out.println("[DONE]");
            System.out.print("[INFO]Adding Left Input Path...");
            FileInputFormat.addInputPath(job, left);
            System.out.println("[DONE]");
            System.out.print("[INFO]Adding Right Input Path...");
            FileInputFormat.addInputPath(job, right);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Output Path...");
            FileOutputFormat.setOutputPath(job, out);
            System.out.println("[DONE]");
            System.out.println("[INFO]Process Now Running...");
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    MapRedStep1.java

    分项求和:

    package konoha.pkg.pkg01;
    
    import java.io.IOException;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    
    public class MapRedStep2 {
        public static class Type0Mapper extends Mapper<Object, Text, Text, DoubleWritable> {
    
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String str[] = line.split("	");
                if (str.length == 5) {
                    context.write(new Text(str[0] + "	" + str[1] + "	" + str[2] + "	" + str[3]),
                            new DoubleWritable(Double.parseDouble(str[4])));
                }
    
            }
        }
    
        public static class Type0Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
            @Override
            public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
                    throws IOException, InterruptedException {
                double sum = 0.0;
                for (DoubleWritable val : values) {
                    sum = sum + val.get();
                }
                context.write(new Text(key), new DoubleWritable(sum));
            }
        }
    
        public static void main(String[] args)
                throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
            Configuration conf = new Configuration();
            System.out.println("[INFO]Starting Job...");
            Job job = new Job(conf, "MyAverageStep2");
            System.out.print("[INFO]Setting Jar By Class...");
            job.setJarByClass(MapRedStep2.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Mapper Class...");
            job.setMapperClass(Type0Mapper.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Reducer Class...");
            job.setReducerClass(Type0Reducer.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Output Key Class...");
            job.setOutputKeyClass(Text.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Output Value Class...");
            job.setOutputValueClass(DoubleWritable.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Input Format Class...");
            job.setInputFormatClass(TextInputFormat.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting OutputFormatClass...");
            job.setOutputFormatClass(TextOutputFormat.class);
            System.out.println("[DONE]");
            System.out.print("[INFO]Creating Path Varable(s)...");
            Path in = new Path("hdfs://localhost:9000/user/test/temp/part-r-00000");
            Path out = new Path("hdfs://localhost:9000/user/test/result");
            System.out.println("[DONE]");
            System.out.print("[INFO]Adding Left Input Path...");
            FileInputFormat.addInputPath(job, in);
            System.out.println("[DONE]");
            System.out.print("[INFO]Setting Output Path...");
            FileOutputFormat.setOutputPath(job, out);
            System.out.println("[DONE]");
            System.out.println("[INFO]Process Now Running...");
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    MapRedStep2.java

    单纯的进货出货差的统计并按差值排序的MapReduce(共分3步,差值的绝对值求合(最大数据值不采用方差也极大),按最大数据值排序(由于MapReduce是递增排序,故先全部转为负数,再在输出时转回正数),合并公司表和输出结果):

     1 package konoha.pkg.pkg01;
     2 
     3 import java.io.IOException;
     4 import java.net.URISyntaxException;
     5 
     6 import org.apache.hadoop.conf.Configuration;
     7 import org.apache.hadoop.fs.Path;
     8 import org.apache.hadoop.io.DoubleWritable;
     9 import org.apache.hadoop.io.Text;
    10 import org.apache.hadoop.mapreduce.Job;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 import org.apache.hadoop.mapreduce.Reducer;
    13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    17 
    18 public class MapRedExtraStep1_1 {
    19     public static class Type0Mapper extends Mapper<Object, Text, Text, DoubleWritable> {
    20 
    21         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    22             String line = value.toString();
    23             String str[] = line.split("	");
    24             if (str.length == 5) {
    25                 context.write(new Text(str[0]), new DoubleWritable(Double.parseDouble(str[4])));
    26             }
    27         }
    28     }
    29 
    30     public static class Type0Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
    31         @Override
    32         public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
    33                 throws IOException, InterruptedException {
    34             double sum = 0.0;
    35             for (DoubleWritable val : values) {
    36                 sum = sum + Math.abs(val.get());
    37             }
    38             context.write(new Text(key), new DoubleWritable(sum));
    39         }
    40     }
    41 
    42     public static void main(String[] args)
    43             throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
    44         Configuration conf = new Configuration();
    45         System.out.println("[INFO]Starting Job...");
    46         Job job = new Job(conf, "MyAverageExtraStep1_1");
    47         System.out.print("[INFO]Setting Jar By Class...");
    48         job.setJarByClass(MapRedExtraStep1_1.class);
    49         System.out.println("[DONE]");
    50         System.out.print("[INFO]Setting Mapper Class...");
    51         job.setMapperClass(Type0Mapper.class);
    52         System.out.println("[DONE]");
    53         System.out.print("[INFO]Setting Reducer Class...");
    54         job.setReducerClass(Type0Reducer.class);
    55         System.out.println("[DONE]");
    56         System.out.print("[INFO]Setting Output Key Class...");
    57         job.setOutputKeyClass(Text.class);
    58         System.out.println("[DONE]");
    59         System.out.print("[INFO]Setting Output Value Class...");
    60         job.setOutputValueClass(DoubleWritable.class);
    61         System.out.println("[DONE]");
    62         System.out.print("[INFO]Setting Input Format Class...");
    63         job.setInputFormatClass(TextInputFormat.class);
    64         System.out.println("[DONE]");
    65         System.out.print("[INFO]Setting OutputFormatClass...");
    66         job.setOutputFormatClass(TextOutputFormat.class);
    67         System.out.println("[DONE]");
    68         System.out.print("[INFO]Creating Path Varable(s)...");
    69         Path in = new Path("hdfs://localhost:9000/user/test/result/part-r-00000");
    70         Path out = new Path("hdfs://localhost:9000/user/test/none_output");
    71         System.out.println("[DONE]");
    72         System.out.print("[INFO]Adding Left Input Path...");
    73         FileInputFormat.addInputPath(job, in);
    74         System.out.println("[DONE]");
    75         System.out.print("[INFO]Setting Output Path...");
    76         FileOutputFormat.setOutputPath(job, out);
    77         System.out.println("[DONE]");
    78         System.out.println("[INFO]Process Now Running...");
    79         System.exit(job.waitForCompletion(true) ? 0 : 1);
    80     }
    81 }
    MapRedExtraStep1_1
     1 package konoha.pkg.pkg01;
     2 
     3 import java.io.IOException;
     4 import java.net.URISyntaxException;
     5 
     6 import org.apache.hadoop.conf.Configuration;
     7 import org.apache.hadoop.fs.Path;
     8 import org.apache.hadoop.io.DoubleWritable;
     9 import org.apache.hadoop.io.Text;
    10 import org.apache.hadoop.mapreduce.Job;
    11 import org.apache.hadoop.mapreduce.Mapper;
    12 import org.apache.hadoop.mapreduce.Reducer;
    13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    17 
    18 public class MapRedExtraStep1_2 {
    19     public static class Type0Mapper extends Mapper<Object, Text, DoubleWritable, Text> {
    20 
    21         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    22             String line = value.toString();
    23             String str[] = line.split("	");
    24             if (str.length == 2) {
    25                 context.write(new DoubleWritable(-Double.parseDouble(str[1])), new Text(str[0]));
    26             }
    27         }
    28     }
    29 
    30     public static class Type0Reducer extends Reducer<DoubleWritable, Text, Text, DoubleWritable> {
    31         @Override
    32         public void reduce(DoubleWritable key, Iterable<Text> values, Context context)
    33                 throws IOException, InterruptedException {
    34             for (Text val : values) {
    35                 context.write(val, new DoubleWritable(-key.get()));
    36             }
    37         }
    38     }
    39 
    40     public static void main(String[] args)
    41             throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
    42         Configuration conf = new Configuration();
    43         System.out.println("[INFO]Starting Job...");
    44         Job job = new Job(conf, "MyAverageExtraStep1_1");
    45         System.out.print("[INFO]Setting Jar By Class...");
    46         job.setJarByClass(MapRedExtraStep1_2.class);
    47         System.out.println("[DONE]");
    48         System.out.print("[INFO]Setting Mapper Class...");
    49         job.setMapperClass(Type0Mapper.class);
    50         System.out.println("[DONE]");
    51         System.out.print("[INFO]Setting Reducer Class...");
    52         job.setReducerClass(Type0Reducer.class);
    53         System.out.println("[DONE]");
    54         System.out.print("[INFO]Setting Output Key Class...");
    55         job.setOutputKeyClass(Text.class);
    56         System.out.println("[DONE]");
    57         System.out.print("[INFO]Setting Output Value Class...");
    58         job.setOutputValueClass(DoubleWritable.class);
    59         System.out.println("[DONE]");
    60         System.out.print("[INFO]Setting Input Format Class...");
    61         job.setInputFormatClass(TextInputFormat.class);
    62         System.out.println("[DONE]");
    63         System.out.print("[INFO]Setting Output Format Class...");
    64         job.setOutputFormatClass(TextOutputFormat.class);
    65         System.out.println("[DONE]");
    66         System.out.print("[INFO]Setting Map Output Key Class...");
    67         job.setMapOutputKeyClass(DoubleWritable.class);
    68         System.out.println("[DONE]");
    69         System.out.print("[INFO]Setting Map Output Value Class...");
    70         job.setMapOutputValueClass(Text.class);
    71         System.out.println("[DONE]");
    72         System.out.print("[INFO]Creating Path Varable(s)...");
    73         Path in = new Path("hdfs://localhost:9000/user/test/none_output/part-r-00000");
    74         Path out = new Path("hdfs://localhost:9000/user/test/none_output1");
    75         System.out.println("[DONE]");
    76         System.out.print("[INFO]Adding Left Input Path...");
    77         FileInputFormat.addInputPath(job, in);
    78         System.out.println("[DONE]");
    79         System.out.print("[INFO]Setting Output Path...");
    80         FileOutputFormat.setOutputPath(job, out);
    81         System.out.println("[DONE]");
    82         System.out.println("[INFO]Process Now Running...");
    83         System.exit(job.waitForCompletion(true) ? 0 : 1);
    84     }
    85 }
    MapRedExtraStep1_2
      1 package konoha.pkg.pkg01;
      2 
      3 import java.io.BufferedReader;
      4 import java.io.FileReader;
      5 import java.io.IOException;
      6 import java.net.URI;
      7 import java.net.URISyntaxException;
      8 import java.util.HashMap;
      9 
     10 import org.apache.hadoop.conf.Configuration;
     11 import org.apache.hadoop.fs.Path;
     12 import org.apache.hadoop.io.Text;
     13 import org.apache.hadoop.mapreduce.Job;
     14 import org.apache.hadoop.mapreduce.Mapper;
     15 import org.apache.hadoop.mapreduce.Reducer;
     16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     20 
     21 public class MapRedExtraStep1_3 {
     22     public static double limit = 5000;
     23 
     24     public static class Type0Mapper extends Mapper<Object, Text, Text, Text> {
     25         public HashMap<String, String> corp = new HashMap<String, String>();
     26 
     27         @Override
     28         public void setup(Context context) throws IOException {
     29             String fileName = context.getLocalCacheFiles()[0].getName();
     30             BufferedReader reader = new BufferedReader(new FileReader(fileName));
     31             String line = null;
     32             String str[] = null;
     33             for (int i = 0; i < 300 && null != (line = reader.readLine()); i++) {
     34                 str = line.split("	");
     35                 if (str.length == 2) {
     36                     corp.put(str[0], "");
     37                 }
     38             }
     39             reader.close();
     40         }
     41 
     42         @Override
     43         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
     44             String line = value.toString();
     45             String str[] = line.split("	");
     46             if (str.length == 5 && corp.containsKey(str[0])) {
     47                 context.write(new Text(str[0]), new Text(str[1] + "	" + str[2] + "	" + str[3] + "," + str[4]));
     48             }
     49         }
     50     }
     51 
     52     public static class Type0Reducer extends Reducer<Text, Text, Text, Text> {
     53         @Override
     54         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
     55             String str[] = null;
     56             for (Text val : values) {
     57                 str = val.toString().split(",");
     58                 if (str.length == 2 && Math.abs(Double.parseDouble(str[1])) >= limit) {
     59                     String res = "虚开发票(商品缺失)";
     60                     if (Double.parseDouble(str[1]) > 0) {
     61                         res = "漏开发票(商品溢出)";
     62                     }
     63                     context.write(key, new Text(str[0] + "	" + str[1] + "	" + res));
     64                 }
     65             }
     66         }
     67     }
     68 
     69     public static void main(String[] args)
     70             throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
     71         Configuration conf = new Configuration();
     72         System.out.println("[INFO]Starting Job...");
     73         Job job = new Job(conf, "MyAverageExtraStep1_1");
     74         System.out.print("[INFO]Setting Jar By Class...");
     75         job.setJarByClass(MapRedExtraStep1_3.class);
     76         System.out.println("[DONE]");
     77         System.out.print("[INFO]Setting Mapper Class...");
     78         job.setMapperClass(Type0Mapper.class);
     79         System.out.println("[DONE]");
     80         System.out.print("[INFO]Setting Reducer Class...");
     81         job.setReducerClass(Type0Reducer.class);
     82         System.out.println("[DONE]");
     83         System.out.print("[INFO]Setting Output Key Class...");
     84         job.setOutputKeyClass(Text.class);
     85         System.out.println("[DONE]");
     86         System.out.print("[INFO]Setting Output Value Class...");
     87         job.setOutputValueClass(Text.class);
     88         System.out.println("[DONE]");
     89         System.out.print("[INFO]Setting Input Format Class...");
     90         job.setInputFormatClass(TextInputFormat.class);
     91         System.out.println("[DONE]");
     92         System.out.print("[INFO]Setting Output Format Class...");
     93         job.setOutputFormatClass(TextOutputFormat.class);
     94         System.out.println("[DONE]");
     95         System.out.print("[INFO]Setting Map Output Key Class...");
     96         job.setMapOutputKeyClass(Text.class);
     97         System.out.println("[DONE]");
     98         System.out.print("[INFO]Setting Map Output Value Class...");
     99         job.setMapOutputValueClass(Text.class);
    100         System.out.println("[DONE]");
    101         System.out.print("[INFO]Creating Path Varable(s)...");
    102         Path in = new Path("hdfs://localhost:9000/user/test/result/part-r-00000");
    103         URI uri = new URI("hdfs://localhost:9000/user/test/none_output1/part-r-00000");
    104         Path out = new Path("hdfs://localhost:9000/user/test/none_output2");
    105         System.out.println("[DONE]");
    106         System.out.print("[INFO]Adding Input Path...");
    107         FileInputFormat.addInputPath(job, in);
    108         System.out.println("[DONE]");
    109         System.out.print("[INFO]Adding Input URI...");
    110         job.addCacheFile(uri);
    111         System.out.println("[DONE]");
    112         System.out.print("[INFO]Setting Output Path...");
    113         FileOutputFormat.setOutputPath(job, out);
    114         System.out.println("[DONE]");
    115         System.out.println("[INFO]Process Now Running...");
    116         System.exit(job.waitForCompletion(true) ? 0 : 1);
    117     }
    118 }
    MapRedExtraStep1_3

    -----------------------------------------------------------------------------------26.11.2020更新-----------------------------------------------------------------------------------

    发现第一步的合并表的废票判断貌似写反了,修改且改为将两表大多数信息一并合并的MapRedStep1如下

      1 package konoha.pkg.pkg01;
      2 
      3 import java.io.IOException;
      4 import java.net.URISyntaxException;
      5 import java.util.Vector;
      6 
      7 import org.apache.hadoop.conf.Configuration;
      8 import org.apache.hadoop.fs.Path;
      9 import org.apache.hadoop.io.Text;
     10 import org.apache.hadoop.mapreduce.Job;
     11 import org.apache.hadoop.mapreduce.Mapper;
     12 import org.apache.hadoop.mapreduce.Reducer;
     13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     14 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
     16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     18 
     19 class DealInfo {
     20     public String buyerId = null;
     21     public String sellerId = null;
     22 
     23     public DealInfo(String buyer, String seller) {
     24         buyerId = buyer;
     25         sellerId = seller;
     26     }
     27 }
     28 
     29 public class MapRedStep1 {
     30 
     31     public static class Type0Mapper extends Mapper<Object, Text, Text, Text> {
     32 
     33         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
     34             String filePath = ((FileSplit) context.getInputSplit()).getPath().toString();
     35             String line = value.toString();
     36             line = line.substring(1, line.length() - 1);
     37             // System.out.println("[DEBUG]Processing Line:" + line);
     38             String str[] = line.split(",");
     39             if (filePath.contains("hwmx")) {
     40                 if (str.length == 10) {
     41                     context.write(new Text(str[0]), new Text("nya" + str[1] + "	" + str[2] + "	" + str[3] + "	"
     42                             + str[4] + "	" + str[6] + "	" + str[7] + "	" + str[8] + "	" + str[9] + "," + str[5]));
     43                 }
     44             } else {
     45                 if (str.length == 9) {
     46                     if (!str[8].equals("Y"))
     47                         context.write(new Text(str[0]), new Text("kon" + str[1] + "	" + str[2]));
     48                 }
     49 
     50             }
     51 
     52         }
     53     }
     54 
     55     public static class Type0Reducer extends Reducer<Text, Text, Text, Text> {
     56         @Override
     57         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
     58             String buyer = null;
     59             String seller = null;
     60             Vector<String> nya = new Vector<String>();
     61             String tmp = null;
     62             String str[] = null;
     63             for (Text val : values) {
     64                 tmp = val.toString();
     65                 if (tmp.startsWith("kon")) {
     66                     str = tmp.substring(3).split("	");
     67                     buyer = str[0];
     68                     seller = str[1];
     69                 } else if (tmp.startsWith("nya")) {
     70                     nya.add(tmp.substring(3));
     71                 }
     72             }
     73             if (buyer != null && seller != null) {
     74                 for (int j = 0; j < nya.size(); j++) {
     75                     str = nya.get(j).split(",");
     76                     if (!str[1].equals("null")) {
     77                         context.write(new Text(seller), new Text(str[0] + "	" + (-Double.parseDouble(str[1]))));
     78                         context.write(new Text(buyer), new Text(str[0] + "	" + Double.parseDouble(str[1])));
     79                     }
     80                 }
     81             }
     82         }
     83     }
     84 
     85     public static void main(String[] args)
     86             throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
     87         Configuration conf = new Configuration();
     88         System.out.println("[INFO]Starting Job...");
     89         Job job = new Job(conf, "MyAverageStep1");
     90         System.out.print("[INFO]Setting Jar By Class...");
     91         job.setJarByClass(MapRedStep1.class);
     92         System.out.println("[DONE]");
     93         System.out.print("[INFO]Setting Mapper Class...");
     94         job.setMapperClass(Type0Mapper.class);
     95         System.out.println("[DONE]");
     96         System.out.print("[INFO]Setting Reducer Class...");
     97         job.setReducerClass(Type0Reducer.class);
     98         System.out.println("[DONE]");
     99         System.out.print("[INFO]Setting Output Key Class...");
    100         job.setOutputKeyClass(Text.class);
    101         System.out.println("[DONE]");
    102         System.out.print("[INFO]Setting Output Value Class...");
    103         job.setOutputValueClass(Text.class);
    104         System.out.println("[DONE]");
    105         System.out.print("[INFO]Setting Input Format Class...");
    106         job.setInputFormatClass(TextInputFormat.class);
    107         System.out.println("[DONE]");
    108         System.out.print("[INFO]Setting OutputFormatClass...");
    109         job.setOutputFormatClass(TextOutputFormat.class);
    110         System.out.println("[DONE]");
    111         System.out.print("[INFO]Creating Path Varable(s)...");
    112         Path left = new Path("hdfs://localhost:9000/user/test/zzsfp");
    113         Path right = new Path("hdfs://localhost:9000/user/test/zzsfp_hwmx");
    114         Path out = new Path("hdfs://localhost:9000/user/test/temp");
    115         System.out.println("[DONE]");
    116         System.out.print("[INFO]Adding Left Input Path...");
    117         FileInputFormat.addInputPath(job, left);
    118         System.out.println("[DONE]");
    119         System.out.print("[INFO]Adding Right Input Path...");
    120         FileInputFormat.addInputPath(job, right);
    121         System.out.println("[DONE]");
    122         System.out.print("[INFO]Setting Output Path...");
    123         FileOutputFormat.setOutputPath(job, out);
    124         System.out.println("[DONE]");
    125         System.out.println("[INFO]Process Now Running...");
    126         System.exit(job.waitForCompletion(true) ? 0 : 1);
    127     }
    128 }
    MapRedStep1.java(修复并修改合并后结构)
  • 相关阅读:
    lodash源码分析之自减的两种形式
    lodash源码分析之NaN不是NaN
    lodash源码分析之Hash缓存
    lodash源码分析之compact中的遍历
    navigate15安装教程
    jmeter线程组调度器使用
    jmeter 注册选择文件编码格式有问题
    jmete插件下载
    jmeter linux 无gui模式分布式压测
    pycharm原码编辑界面快捷键
  • 原文地址:https://www.cnblogs.com/minadukirinno/p/14033211.html
Copyright © 2011-2022 走看看