大家在学习Hadoop的MapReduce的时候,90%的第一个程序都是WordCount,所以在这里分享一下我的第二个MapReduce程序。对于学习编程语言的人来说,有时候代码是最好的沟通方式之一。
1 package com.zhongxin.mr; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.Path; 5 import org.apache.hadoop.io.LongWritable; 6 import org.apache.hadoop.io.Text; 7 import org.apache.hadoop.mapreduce.Job; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 11 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 13 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 14 15 import java.io.IOException; 16 import java.math.BigDecimal; 17 import java.util.regex.Pattern; 18 19 /** 20 * 用户已收本息 21 * Created by DingYS on 2017/11/21. 22 */ 23 public class UserReceiveAmount { 24 25 public static class Map extends Mapper<LongWritable,Text,Text,Text>{ 26 private Text outKey = new Text(); 27 private Text outValue = new Text(); 28 private Pattern pattern = Pattern.compile(","); 29 30 31 @Override 32 public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ 33 // 利息 34 BigDecimal interest = new BigDecimal(0); 35 // 本金 36 BigDecimal capital = new BigDecimal(0); 37 String splits[] = pattern.split(String.valueOf(value)); 38 String onwerType = splits[2]; 39 String fundsDirection = splits[6]; 40 String tradeType = splits[5]; 41 String penaltyAmount = splits[15]; 42 String tradeAmount = splits[7]; 43 String tradeShare = splits[8]; 44 String ownerCustomNo = splits[1]; 45 if("USER".equals(onwerType) && "INCR".equals(fundsDirection) && !Pattern.matches("CURRENT_.*?",tradeType)){ 46 if("INTEREST".equals(tradeType) && ("null".equals(penaltyAmount) || "".equals(penaltyAmount) ||"0.00".equals(penaltyAmount))){ 47 interest =new BigDecimal(Double.parseDouble(tradeAmount)).setScale(2,BigDecimal.ROUND_HALF_UP); 48 }else{ 49 interest = new BigDecimal(Double.parseDouble(tradeAmount)).subtract(new BigDecimal(Double.parseDouble(tradeShare))).setScale(2,BigDecimal.ROUND_HALF_UP); 50 capital = new BigDecimal(Double.parseDouble(tradeShare)).setScale(2,BigDecimal.ROUND_HALF_UP); 51 } 52 outKey.set(ownerCustomNo); 53 outValue.set(String.valueOf(interest) + pattern + String.valueOf(capital)); 54 context.write(outKey,outValue); 55 } 56 } 57 } 58 59 public static class Reduce extends Reducer<Text,Text,Text,Text>{ 60 61 public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{ 62 Text outValue = new Text(); 63 BigDecimal interest = new BigDecimal(0); 64 BigDecimal capital = new BigDecimal(0); 65 for(Text value:values){ 66 String[] splits = value.toString().split(","); 67 interest = interest.add(new BigDecimal(Double.parseDouble(splits[0]))).setScale(2,BigDecimal.ROUND_HALF_UP); 68 capital = capital.add(new BigDecimal(Double.parseDouble(splits[1]))).setScale(2,BigDecimal.ROUND_HALF_UP); 69 } 70 outValue.set(String.valueOf(interest) + "\t" + String.valueOf(capital)); 71 context.write(key,outValue); 72 } 73 } 74 75 public static void main(String[] args) throws Exception{ 76 Configuration config = new Configuration(); 77 Job job = Job.getInstance(config); 78 job.setJobName("userReceiveAmount"); 79 job.setJarByClass(UserReceiveAmount.class); 80 81 job.setOutputKeyClass(Text.class); 82 job.setOutputValueClass(Text.class); 83 84 job.setMapperClass(Map.class); 85 job.setReducerClass(Reduce.class); 86 87 job.setInputFormatClass(TextInputFormat.class); 88 job.setOutputFormatClass(TextOutputFormat.class); 89 90 FileInputFormat.addInputPath(job,new Path(args[0])); 91 FileOutputFormat.setOutputPath(job,new Path(args[1])); 92 93 job.waitForCompletion(true); 94 95 } 96 }
对于看懂mapReduce这个程序,有一个非常关键的点就是:map每次读取一行数据,相同key的数据进入到同一个reduce中。
上面是将统计结果输出到hdfs上,下面来一个输出到Hbase中的,请看码
package com.zhongxin.mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.regex.Pattern;
/**
* 用户已收本息
* Created by DingYS on 2017/11/21.
*/
public class UserReceiveAmount {
public static class Map extends Mapper<LongWritable,Text,Text,Text>{
private Text outKey = new Text();
private Text outValue = new Text();
private Pattern pattern = Pattern.compile(",");
@Override
public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
// 利息
BigDecimal interest = new BigDecimal(0);
// 本金
BigDecimal capital = new BigDecimal(0);
String splits[] = pattern.split(String.valueOf(value));
String onwerType = splits[2];
String fundsDirection = splits[6];
String tradeType = splits[5];
String penaltyAmount = splits[15];
String tradeAmount = splits[7];
String tradeShare = splits[8];
String ownerCustomNo = splits[1];
if("USER".equals(onwerType) && "INCR".equals(fundsDirection) && !Pattern.matches("CURRENT_.*?",tradeType)){
if("INTEREST".equals(tradeType) && ("null".equals(penaltyAmount) || "".equals(penaltyAmount) ||"0.00".equals(penaltyAmount))){
interest =new BigDecimal(Double.parseDouble(tradeAmount)).setScale(2,BigDecimal.ROUND_HALF_UP);
}else{
interest = new BigDecimal(Double.parseDouble(tradeAmount)).subtract(new BigDecimal(Double.parseDouble(tradeShare))).setScale(2,BigDecimal.ROUND_HALF_UP);
capital = new BigDecimal(Double.parseDouble(tradeShare)).setScale(2,BigDecimal.ROUND_HALF_UP);
}
outKey.set(ownerCustomNo);
outValue.set(String.valueOf(interest) + pattern + String.valueOf(capital));
context.write(outKey,outValue);
}
}
}
public static class Reduce extends TableReducer<Text,Text,ImmutableBytesWritable> {
ImmutableBytesWritable k = new ImmutableBytesWritable();
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException{
BigDecimal interest = new BigDecimal(0);
BigDecimal capital = new BigDecimal(0);
for(Text value:values){
String[] splits = value.toString().split(",");
interest = interest.add(new BigDecimal(Double.parseDouble(splits[0]))).setScale(2,BigDecimal.ROUND_HALF_UP);
capital = capital.add(new BigDecimal(Double.parseDouble(splits[1]))).setScale(2,BigDecimal.ROUND_HALF_UP);
}
String family = "info";
Put put = new Put(String.valueOf(key).getBytes());
put.addColumn(family.getBytes(),"interest".getBytes(),String.valueOf(interest).getBytes());
put.addColumn(family.getBytes(),"capital".getBytes(),String.valueOf(capital).getBytes());
k.set(key.getBytes());
context.write(k,put);
}
}
public static void main(String[] args) throws Exception{
Configuration config = HBaseConfiguration.create();
Job job = Job.getInstance(config,"userReceiveAmount");
job.setJarByClass(UserReceiveAmount.class);
FileInputFormat.addInputPath(job,new Path(args[0]));
job.setMapperClass(Map.class);
TableMapReduceUtil.initTableReducerJob("userReceiveAmount",Reduce.class,job);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputValueClass(Mutation.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
注意点:rowkey的类型需要是String,如果是Text,需要Text.toString()一下,Text中重写了toString()方法,经测试String.valueOf()也没问题
Put put = new Put(rowkey.getBytes());