实在是没想到能通过控制变量找到MapReduce实验的问题所在:300多MB的文件生成的LinkedHashMap对于Map阶段来说还是太大了。
分别拿两个缩减了数据的表文件和完整的表文件后发现的。也在正式运行之后体会到了MapReduce在大量数据的处理上的优势:对于1GB的数据文件,使用MySQL之类的关系型数据库,光是导入就可能需要数小时,而两表的合并对于MapReduce只需要一至数分钟就能完成。
当然使用关系型数据库的SQL语句进行计算也有自己的优势:无需编程的同时又支持各种逻辑判断,一行语句就能完成非常复杂的功能(当然,一般不建议这么做),而且在数据量不大时能快速获得结果。而MapReduce在开始Map之前的准备都要不少时间,对于比较复杂的操作需要分很多次的MapReduce。比如这次的实验第一步的合并计算的步骤,采用MapReduce就分为了第一次MapReduce将两个文件合并(key为发票ID),第二次的MapReduce将同种商品的进销进行合并(key为商品名称+规格+单位)。而使用SQL只用JOIN并SUM之后GROUP BY一下,一句语句就能搞定,求合并排序也是如此。
不过个人本次实验采用的纯MapReduce,因而对最后的判断无能为力,只能通过最原始的进出货数量进行比对,保留单位也是为了方便具体比对,但是如何将其数值化有些无能为力。因而最后只是普通的数值求合然后排序了一遍。
如果要很好地进行分析,需要借助第三张表生成决策树/训练神经网络来对结果进行分析。
合并两表:
package konoha.pkg.pkg01; import java.io.IOException; import java.net.URISyntaxException; import java.util.Vector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; class DealInfo { public String buyerId = null; public String sellerId = null; public DealInfo(String buyer, String seller) { buyerId = buyer; sellerId = seller; } } public class MapRedStep1 { public static class Type0Mapper extends Mapper<Object, Text, Text, Text> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String filePath = ((FileSplit) context.getInputSplit()).getPath().toString(); String line = value.toString(); line = line.substring(1, line.length() - 1); // System.out.println("[DEBUG]Processing Line:" + line); String str[] = line.split(","); if (filePath.contains("hwmx")) { if (str.length == 10) { if (!str[9].equals("Y")) context.write(new Text(str[0]), new Text("nya" + str[2] + " " + str[3] + " " + str[4] + "," + str[5])); } } else { if (str.length == 9) { context.write(new Text(str[0]), new Text("kon" + str[1] + " " + str[2])); } } } } public static class Type0Reducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String buyer = null; String seller = null; Vector<String> nya = new Vector<String>(); String tmp = null; String str[] = null; for (Text val : values) { tmp = val.toString(); if (tmp.startsWith("kon")) { str = tmp.substring(3).split(" "); buyer = str[0]; seller = str[1]; } else if (tmp.startsWith("nya")) { nya.add(tmp.substring(3)); } } if (buyer != null && seller != null) { for (int j = 0; j < nya.size(); j++) { str = nya.get(j).split(","); if (!str[1].equals("null")) { context.write(new Text(seller), new Text(str[0] + " " + (-Double.parseDouble(str[1])))); context.write(new Text(buyer), new Text(str[0] + " " + Double.parseDouble(str[1]))); } } } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); System.out.println("[INFO]Starting Job..."); Job job = new Job(conf, "MyAverageStep1"); System.out.print("[INFO]Setting Jar By Class..."); job.setJarByClass(MapRedStep1.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting Mapper Class..."); job.setMapperClass(Type0Mapper.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting Reducer Class..."); job.setReducerClass(Type0Reducer.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting Output Key Class..."); job.setOutputKeyClass(Text.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting Output Value Class..."); job.setOutputValueClass(Text.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting Input Format Class..."); job.setInputFormatClass(TextInputFormat.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting OutputFormatClass..."); job.setOutputFormatClass(TextOutputFormat.class); System.out.println("[DONE]"); System.out.print("[INFO]Creating Path Varable(s)..."); Path left = new Path("hdfs://localhost:9000/user/test/zzsfp"); Path right = new Path("hdfs://localhost:9000/user/test/zzsfp_hwmx"); Path out = new Path("hdfs://localhost:9000/user/test/temp"); System.out.println("[DONE]"); System.out.print("[INFO]Adding Left Input Path..."); FileInputFormat.addInputPath(job, left); System.out.println("[DONE]"); System.out.print("[INFO]Adding Right Input Path..."); FileInputFormat.addInputPath(job, right); System.out.println("[DONE]"); System.out.print("[INFO]Setting Output Path..."); FileOutputFormat.setOutputPath(job, out); System.out.println("[DONE]"); System.out.println("[INFO]Process Now Running..."); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
分项求和:
package konoha.pkg.pkg01; import java.io.IOException; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MapRedStep2 { public static class Type0Mapper extends Mapper<Object, Text, Text, DoubleWritable> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String str[] = line.split(" "); if (str.length == 5) { context.write(new Text(str[0] + " " + str[1] + " " + str[2] + " " + str[3]), new DoubleWritable(Double.parseDouble(str[4]))); } } } public static class Type0Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override public void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { double sum = 0.0; for (DoubleWritable val : values) { sum = sum + val.get(); } context.write(new Text(key), new DoubleWritable(sum)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); System.out.println("[INFO]Starting Job..."); Job job = new Job(conf, "MyAverageStep2"); System.out.print("[INFO]Setting Jar By Class..."); job.setJarByClass(MapRedStep2.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting Mapper Class..."); job.setMapperClass(Type0Mapper.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting Reducer Class..."); job.setReducerClass(Type0Reducer.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting Output Key Class..."); job.setOutputKeyClass(Text.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting Output Value Class..."); job.setOutputValueClass(DoubleWritable.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting Input Format Class..."); job.setInputFormatClass(TextInputFormat.class); System.out.println("[DONE]"); System.out.print("[INFO]Setting OutputFormatClass..."); job.setOutputFormatClass(TextOutputFormat.class); System.out.println("[DONE]"); System.out.print("[INFO]Creating Path Varable(s)..."); Path in = new Path("hdfs://localhost:9000/user/test/temp/part-r-00000"); Path out = new Path("hdfs://localhost:9000/user/test/result"); System.out.println("[DONE]"); System.out.print("[INFO]Adding Left Input Path..."); FileInputFormat.addInputPath(job, in); System.out.println("[DONE]"); System.out.print("[INFO]Setting Output Path..."); FileOutputFormat.setOutputPath(job, out); System.out.println("[DONE]"); System.out.println("[INFO]Process Now Running..."); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
单纯的进货出货差的统计并按差值排序的MapReduce(共分3步,差值的绝对值求合(最大数据值不采用方差也极大),按最大数据值排序(由于MapReduce是递增排序,故先全部转为负数,再在输出时转回正数),合并公司表和输出结果):
1 package konoha.pkg.pkg01; 2 3 import java.io.IOException; 4 import java.net.URISyntaxException; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.DoubleWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 17 18 public class MapRedExtraStep1_1 { 19 public static class Type0Mapper extends Mapper<Object, Text, Text, DoubleWritable> { 20 21 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 22 String line = value.toString(); 23 String str[] = line.split(" "); 24 if (str.length == 5) { 25 context.write(new Text(str[0]), new DoubleWritable(Double.parseDouble(str[4]))); 26 } 27 } 28 } 29 30 public static class Type0Reducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { 31 @Override 32 public void reduce(Text key, Iterable<DoubleWritable> values, Context context) 33 throws IOException, InterruptedException { 34 double sum = 0.0; 35 for (DoubleWritable val : values) { 36 sum = sum + Math.abs(val.get()); 37 } 38 context.write(new Text(key), new DoubleWritable(sum)); 39 } 40 } 41 42 public static void main(String[] args) 43 throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { 44 Configuration conf = new Configuration(); 45 System.out.println("[INFO]Starting Job..."); 46 Job job = new Job(conf, "MyAverageExtraStep1_1"); 47 System.out.print("[INFO]Setting Jar By Class..."); 48 job.setJarByClass(MapRedExtraStep1_1.class); 49 System.out.println("[DONE]"); 50 System.out.print("[INFO]Setting Mapper Class..."); 51 job.setMapperClass(Type0Mapper.class); 52 System.out.println("[DONE]"); 53 System.out.print("[INFO]Setting Reducer Class..."); 54 job.setReducerClass(Type0Reducer.class); 55 System.out.println("[DONE]"); 56 System.out.print("[INFO]Setting Output Key Class..."); 57 job.setOutputKeyClass(Text.class); 58 System.out.println("[DONE]"); 59 System.out.print("[INFO]Setting Output Value Class..."); 60 job.setOutputValueClass(DoubleWritable.class); 61 System.out.println("[DONE]"); 62 System.out.print("[INFO]Setting Input Format Class..."); 63 job.setInputFormatClass(TextInputFormat.class); 64 System.out.println("[DONE]"); 65 System.out.print("[INFO]Setting OutputFormatClass..."); 66 job.setOutputFormatClass(TextOutputFormat.class); 67 System.out.println("[DONE]"); 68 System.out.print("[INFO]Creating Path Varable(s)..."); 69 Path in = new Path("hdfs://localhost:9000/user/test/result/part-r-00000"); 70 Path out = new Path("hdfs://localhost:9000/user/test/none_output"); 71 System.out.println("[DONE]"); 72 System.out.print("[INFO]Adding Left Input Path..."); 73 FileInputFormat.addInputPath(job, in); 74 System.out.println("[DONE]"); 75 System.out.print("[INFO]Setting Output Path..."); 76 FileOutputFormat.setOutputPath(job, out); 77 System.out.println("[DONE]"); 78 System.out.println("[INFO]Process Now Running..."); 79 System.exit(job.waitForCompletion(true) ? 0 : 1); 80 } 81 }
1 package konoha.pkg.pkg01; 2 3 import java.io.IOException; 4 import java.net.URISyntaxException; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.DoubleWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 17 18 public class MapRedExtraStep1_2 { 19 public static class Type0Mapper extends Mapper<Object, Text, DoubleWritable, Text> { 20 21 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 22 String line = value.toString(); 23 String str[] = line.split(" "); 24 if (str.length == 2) { 25 context.write(new DoubleWritable(-Double.parseDouble(str[1])), new Text(str[0])); 26 } 27 } 28 } 29 30 public static class Type0Reducer extends Reducer<DoubleWritable, Text, Text, DoubleWritable> { 31 @Override 32 public void reduce(DoubleWritable key, Iterable<Text> values, Context context) 33 throws IOException, InterruptedException { 34 for (Text val : values) { 35 context.write(val, new DoubleWritable(-key.get())); 36 } 37 } 38 } 39 40 public static void main(String[] args) 41 throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { 42 Configuration conf = new Configuration(); 43 System.out.println("[INFO]Starting Job..."); 44 Job job = new Job(conf, "MyAverageExtraStep1_1"); 45 System.out.print("[INFO]Setting Jar By Class..."); 46 job.setJarByClass(MapRedExtraStep1_2.class); 47 System.out.println("[DONE]"); 48 System.out.print("[INFO]Setting Mapper Class..."); 49 job.setMapperClass(Type0Mapper.class); 50 System.out.println("[DONE]"); 51 System.out.print("[INFO]Setting Reducer Class..."); 52 job.setReducerClass(Type0Reducer.class); 53 System.out.println("[DONE]"); 54 System.out.print("[INFO]Setting Output Key Class..."); 55 job.setOutputKeyClass(Text.class); 56 System.out.println("[DONE]"); 57 System.out.print("[INFO]Setting Output Value Class..."); 58 job.setOutputValueClass(DoubleWritable.class); 59 System.out.println("[DONE]"); 60 System.out.print("[INFO]Setting Input Format Class..."); 61 job.setInputFormatClass(TextInputFormat.class); 62 System.out.println("[DONE]"); 63 System.out.print("[INFO]Setting Output Format Class..."); 64 job.setOutputFormatClass(TextOutputFormat.class); 65 System.out.println("[DONE]"); 66 System.out.print("[INFO]Setting Map Output Key Class..."); 67 job.setMapOutputKeyClass(DoubleWritable.class); 68 System.out.println("[DONE]"); 69 System.out.print("[INFO]Setting Map Output Value Class..."); 70 job.setMapOutputValueClass(Text.class); 71 System.out.println("[DONE]"); 72 System.out.print("[INFO]Creating Path Varable(s)..."); 73 Path in = new Path("hdfs://localhost:9000/user/test/none_output/part-r-00000"); 74 Path out = new Path("hdfs://localhost:9000/user/test/none_output1"); 75 System.out.println("[DONE]"); 76 System.out.print("[INFO]Adding Left Input Path..."); 77 FileInputFormat.addInputPath(job, in); 78 System.out.println("[DONE]"); 79 System.out.print("[INFO]Setting Output Path..."); 80 FileOutputFormat.setOutputPath(job, out); 81 System.out.println("[DONE]"); 82 System.out.println("[INFO]Process Now Running..."); 83 System.exit(job.waitForCompletion(true) ? 0 : 1); 84 } 85 }
1 package konoha.pkg.pkg01; 2 3 import java.io.BufferedReader; 4 import java.io.FileReader; 5 import java.io.IOException; 6 import java.net.URI; 7 import java.net.URISyntaxException; 8 import java.util.HashMap; 9 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.fs.Path; 12 import org.apache.hadoop.io.Text; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.Mapper; 15 import org.apache.hadoop.mapreduce.Reducer; 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 17 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 19 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 20 21 public class MapRedExtraStep1_3 { 22 public static double limit = 5000; 23 24 public static class Type0Mapper extends Mapper<Object, Text, Text, Text> { 25 public HashMap<String, String> corp = new HashMap<String, String>(); 26 27 @Override 28 public void setup(Context context) throws IOException { 29 String fileName = context.getLocalCacheFiles()[0].getName(); 30 BufferedReader reader = new BufferedReader(new FileReader(fileName)); 31 String line = null; 32 String str[] = null; 33 for (int i = 0; i < 300 && null != (line = reader.readLine()); i++) { 34 str = line.split(" "); 35 if (str.length == 2) { 36 corp.put(str[0], ""); 37 } 38 } 39 reader.close(); 40 } 41 42 @Override 43 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 44 String line = value.toString(); 45 String str[] = line.split(" "); 46 if (str.length == 5 && corp.containsKey(str[0])) { 47 context.write(new Text(str[0]), new Text(str[1] + " " + str[2] + " " + str[3] + "," + str[4])); 48 } 49 } 50 } 51 52 public static class Type0Reducer extends Reducer<Text, Text, Text, Text> { 53 @Override 54 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 55 String str[] = null; 56 for (Text val : values) { 57 str = val.toString().split(","); 58 if (str.length == 2 && Math.abs(Double.parseDouble(str[1])) >= limit) { 59 String res = "虚开发票(商品缺失)"; 60 if (Double.parseDouble(str[1]) > 0) { 61 res = "漏开发票(商品溢出)"; 62 } 63 context.write(key, new Text(str[0] + " " + str[1] + " " + res)); 64 } 65 } 66 } 67 } 68 69 public static void main(String[] args) 70 throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { 71 Configuration conf = new Configuration(); 72 System.out.println("[INFO]Starting Job..."); 73 Job job = new Job(conf, "MyAverageExtraStep1_1"); 74 System.out.print("[INFO]Setting Jar By Class..."); 75 job.setJarByClass(MapRedExtraStep1_3.class); 76 System.out.println("[DONE]"); 77 System.out.print("[INFO]Setting Mapper Class..."); 78 job.setMapperClass(Type0Mapper.class); 79 System.out.println("[DONE]"); 80 System.out.print("[INFO]Setting Reducer Class..."); 81 job.setReducerClass(Type0Reducer.class); 82 System.out.println("[DONE]"); 83 System.out.print("[INFO]Setting Output Key Class..."); 84 job.setOutputKeyClass(Text.class); 85 System.out.println("[DONE]"); 86 System.out.print("[INFO]Setting Output Value Class..."); 87 job.setOutputValueClass(Text.class); 88 System.out.println("[DONE]"); 89 System.out.print("[INFO]Setting Input Format Class..."); 90 job.setInputFormatClass(TextInputFormat.class); 91 System.out.println("[DONE]"); 92 System.out.print("[INFO]Setting Output Format Class..."); 93 job.setOutputFormatClass(TextOutputFormat.class); 94 System.out.println("[DONE]"); 95 System.out.print("[INFO]Setting Map Output Key Class..."); 96 job.setMapOutputKeyClass(Text.class); 97 System.out.println("[DONE]"); 98 System.out.print("[INFO]Setting Map Output Value Class..."); 99 job.setMapOutputValueClass(Text.class); 100 System.out.println("[DONE]"); 101 System.out.print("[INFO]Creating Path Varable(s)..."); 102 Path in = new Path("hdfs://localhost:9000/user/test/result/part-r-00000"); 103 URI uri = new URI("hdfs://localhost:9000/user/test/none_output1/part-r-00000"); 104 Path out = new Path("hdfs://localhost:9000/user/test/none_output2"); 105 System.out.println("[DONE]"); 106 System.out.print("[INFO]Adding Input Path..."); 107 FileInputFormat.addInputPath(job, in); 108 System.out.println("[DONE]"); 109 System.out.print("[INFO]Adding Input URI..."); 110 job.addCacheFile(uri); 111 System.out.println("[DONE]"); 112 System.out.print("[INFO]Setting Output Path..."); 113 FileOutputFormat.setOutputPath(job, out); 114 System.out.println("[DONE]"); 115 System.out.println("[INFO]Process Now Running..."); 116 System.exit(job.waitForCompletion(true) ? 0 : 1); 117 } 118 }
-----------------------------------------------------------------------------------26.11.2020更新-----------------------------------------------------------------------------------
发现第一步的合并表的废票判断貌似写反了,修改且改为将两表大多数信息一并合并的MapRedStep1如下
1 package konoha.pkg.pkg01; 2 3 import java.io.IOException; 4 import java.net.URISyntaxException; 5 import java.util.Vector; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.input.FileSplit; 15 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 19 class DealInfo { 20 public String buyerId = null; 21 public String sellerId = null; 22 23 public DealInfo(String buyer, String seller) { 24 buyerId = buyer; 25 sellerId = seller; 26 } 27 } 28 29 public class MapRedStep1 { 30 31 public static class Type0Mapper extends Mapper<Object, Text, Text, Text> { 32 33 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 34 String filePath = ((FileSplit) context.getInputSplit()).getPath().toString(); 35 String line = value.toString(); 36 line = line.substring(1, line.length() - 1); 37 // System.out.println("[DEBUG]Processing Line:" + line); 38 String str[] = line.split(","); 39 if (filePath.contains("hwmx")) { 40 if (str.length == 10) { 41 context.write(new Text(str[0]), new Text("nya" + str[1] + " " + str[2] + " " + str[3] + " " 42 + str[4] + " " + str[6] + " " + str[7] + " " + str[8] + " " + str[9] + "," + str[5])); 43 } 44 } else { 45 if (str.length == 9) { 46 if (!str[8].equals("Y")) 47 context.write(new Text(str[0]), new Text("kon" + str[1] + " " + str[2])); 48 } 49 50 } 51 52 } 53 } 54 55 public static class Type0Reducer extends Reducer<Text, Text, Text, Text> { 56 @Override 57 public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 58 String buyer = null; 59 String seller = null; 60 Vector<String> nya = new Vector<String>(); 61 String tmp = null; 62 String str[] = null; 63 for (Text val : values) { 64 tmp = val.toString(); 65 if (tmp.startsWith("kon")) { 66 str = tmp.substring(3).split(" "); 67 buyer = str[0]; 68 seller = str[1]; 69 } else if (tmp.startsWith("nya")) { 70 nya.add(tmp.substring(3)); 71 } 72 } 73 if (buyer != null && seller != null) { 74 for (int j = 0; j < nya.size(); j++) { 75 str = nya.get(j).split(","); 76 if (!str[1].equals("null")) { 77 context.write(new Text(seller), new Text(str[0] + " " + (-Double.parseDouble(str[1])))); 78 context.write(new Text(buyer), new Text(str[0] + " " + Double.parseDouble(str[1]))); 79 } 80 } 81 } 82 } 83 } 84 85 public static void main(String[] args) 86 throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { 87 Configuration conf = new Configuration(); 88 System.out.println("[INFO]Starting Job..."); 89 Job job = new Job(conf, "MyAverageStep1"); 90 System.out.print("[INFO]Setting Jar By Class..."); 91 job.setJarByClass(MapRedStep1.class); 92 System.out.println("[DONE]"); 93 System.out.print("[INFO]Setting Mapper Class..."); 94 job.setMapperClass(Type0Mapper.class); 95 System.out.println("[DONE]"); 96 System.out.print("[INFO]Setting Reducer Class..."); 97 job.setReducerClass(Type0Reducer.class); 98 System.out.println("[DONE]"); 99 System.out.print("[INFO]Setting Output Key Class..."); 100 job.setOutputKeyClass(Text.class); 101 System.out.println("[DONE]"); 102 System.out.print("[INFO]Setting Output Value Class..."); 103 job.setOutputValueClass(Text.class); 104 System.out.println("[DONE]"); 105 System.out.print("[INFO]Setting Input Format Class..."); 106 job.setInputFormatClass(TextInputFormat.class); 107 System.out.println("[DONE]"); 108 System.out.print("[INFO]Setting OutputFormatClass..."); 109 job.setOutputFormatClass(TextOutputFormat.class); 110 System.out.println("[DONE]"); 111 System.out.print("[INFO]Creating Path Varable(s)..."); 112 Path left = new Path("hdfs://localhost:9000/user/test/zzsfp"); 113 Path right = new Path("hdfs://localhost:9000/user/test/zzsfp_hwmx"); 114 Path out = new Path("hdfs://localhost:9000/user/test/temp"); 115 System.out.println("[DONE]"); 116 System.out.print("[INFO]Adding Left Input Path..."); 117 FileInputFormat.addInputPath(job, left); 118 System.out.println("[DONE]"); 119 System.out.print("[INFO]Adding Right Input Path..."); 120 FileInputFormat.addInputPath(job, right); 121 System.out.println("[DONE]"); 122 System.out.print("[INFO]Setting Output Path..."); 123 FileOutputFormat.setOutputPath(job, out); 124 System.out.println("[DONE]"); 125 System.out.println("[INFO]Process Now Running..."); 126 System.exit(job.waitForCompletion(true) ? 0 : 1); 127 } 128 }