排序和分组
- 在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。
案例说明
数据:
zhangsan@163.com600002014-02-20
lisi@163.com200002014-02-20
lisi@163.com01002014-02-20
zhangsan@163.com300002014-02-20
wangwu@126.com900002014-02-20
wangwu@126.com02002014-02-20
6
1
zhangsan .com600002014-02-20
2
lisi .com200002014-02-20
3
lisi .com01002014-02-20
4
zhangsan .com300002014-02-20
5
wangwu .com900002014-02-20
6
wangwu .com02002014-02-20
数据分析
- SumStep
用户 收入 支出 结余 lisi@163.com 2000 100 1900 wangwu@126.com 9000 200 8800 zhangsan@163.com 9000 0 9000 - SortStep
用户 收入 支出 结余 zhangsan@163.com 9000 0 9000 wangwu@126.com 9000 200 8800 lisi@163.com 2000 100 1900
InfoBean类
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class InfoBean implements WritableComparable<InfoBean>{
private String accout;
private double incom;
private double zhichu;
private double jieyu;
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(accout);
out.writeDouble(incom);
out.writeDouble(zhichu);
out.writeDouble(jieyu);
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
this.accout=in.readUTF();
this.incom=in.readDouble();
this.zhichu=in.readDouble();
this.jieyu=in.readDouble();
}
@Override
public int compareTo(InfoBean infoBean) {
if(this.incom == infoBean.getIncom()){
return this.zhichu > infoBean.getZhichu() ? 1 : -1;
}else{
return this.incom > infoBean.getIncom() ? -1 : 1;
}
}
public String getAccout() {
return accout;
}
public void setAccout(String accout) {
this.accout = accout;
}
public double getIncom() {
return incom;
}
public void setIncom(double incom) {
this.incom = incom;
}
public double getZhichu() {
return zhichu;
}
public void setZhichu(double zhichu) {
this.zhichu = zhichu;
}
public double getJieyu() {
return jieyu;
}
public void setJieyu(double jieyu) {
this.jieyu = jieyu;
}
@Override
public String toString() {
return incom +" " + zhichu +" " + jieyu;
}
public void set(String accout,double incom,double zhichu){
this.accout=accout;
this.incom=incom;
this.zhichu=zhichu;
this.jieyu=incom - zhichu;
}
}
84
1
import java.io.DataInput;
2
import java.io.DataOutput;
3
import java.io.IOException;
4
5
import org.apache.hadoop.io.WritableComparable;
6
7
public class InfoBean implements WritableComparable<InfoBean>{
8
9
private String accout;
10
private double incom;
11
private double zhichu;
12
private double jieyu;
13
14
15
public void write(DataOutput out) throws IOException {
16
// TODO Auto-generated method stub
17
out.writeUTF(accout);
18
out.writeDouble(incom);
19
out.writeDouble(zhichu);
20
out.writeDouble(jieyu);
21
}
22
23
24
public void readFields(DataInput in) throws IOException {
25
// TODO Auto-generated method stub
26
this.accout=in.readUTF();
27
this.incom=in.readDouble();
28
this.zhichu=in.readDouble();
29
this.jieyu=in.readDouble();
30
}
31
32
33
public int compareTo(InfoBean infoBean) {
34
if(this.incom == infoBean.getIncom()){
35
return this.zhichu > infoBean.getZhichu() ? 1 : -1;
36
}else{
37
return this.incom > infoBean.getIncom() ? -1 : 1;
38
}
39
}
40
41
public String getAccout() {
42
return accout;
43
}
44
45
public void setAccout(String accout) {
46
this.accout = accout;
47
}
48
49
public double getIncom() {
50
return incom;
51
}
52
53
public void setIncom(double incom) {
54
this.incom = incom;
55
}
56
57
public double getZhichu() {
58
return zhichu;
59
}
60
61
public void setZhichu(double zhichu) {
62
this.zhichu = zhichu;
63
}
64
65
public double getJieyu() {
66
return jieyu;
67
}
68
69
public void setJieyu(double jieyu) {
70
this.jieyu = jieyu;
71
}
72
73
74
public String toString() {
75
return incom +" " + zhichu +" " + jieyu;
76
}
77
78
public void set(String accout,double incom,double zhichu){
79
this.accout=accout;
80
this.incom=incom;
81
this.zhichu=zhichu;
82
this.jieyu=incom - zhichu;
83
}
84
}
SumStep类
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SumStep {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Job job=Job.getInstance(new Configuration());
job.setJarByClass(SumStep.class);
job.setMapperClass(SumMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
FileInputFormat.setInputPaths(job, args[0]);
job.setReducerClass(sumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
private Text k2 = new Text();
private InfoBean v2 = new InfoBean();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, InfoBean>.Context context)
throws IOException, InterruptedException {
String line=value.toString();
String[] hang=line.split(" ");
String accout=hang[0];
double incom=Double.parseDouble(hang[1]);
double zhichu=Double.parseDouble(hang[2]);
k2.set(accout);
v2.set(accout, incom, zhichu);
context.write(k2, v2);
}
}
public static class sumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{
private InfoBean v3 = new InfoBean();
@Override
protected void reduce(Text k2, Iterable<InfoBean> v2,
Reducer<Text, InfoBean, Text, InfoBean>.Context context)
throws IOException, InterruptedException {
double sumIncom = 0;
double sumZhichu = 0;
for(InfoBean infoBean : v2){
sumIncom += infoBean.getIncom();
sumZhichu += infoBean.getZhichu();
}
v3.set("", sumIncom, sumZhichu);
context.write(k2, v3);
}
}
}
x
1
import java.io.IOException;
2
import org.apache.hadoop.conf.Configuration;
3
import org.apache.hadoop.fs.Path;
4
import org.apache.hadoop.io.LongWritable;
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Job;
7
import org.apache.hadoop.mapreduce.Mapper;
8
import org.apache.hadoop.mapreduce.Reducer;
9
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11
12
public class SumStep {
13
14
public static void main(String[] args) throws Exception {
15
// TODO Auto-generated method stub
16
Job job=Job.getInstance(new Configuration());
17
18
job.setJarByClass(SumStep.class);
19
20
job.setMapperClass(SumMapper.class);
21
job.setMapOutputKeyClass(Text.class);
22
job.setMapOutputValueClass(InfoBean.class);
23
FileInputFormat.setInputPaths(job, args[0]);
24
25
job.setReducerClass(sumReducer.class);
26
job.setOutputKeyClass(Text.class);
27
job.setOutputValueClass(InfoBean.class);
28
FileOutputFormat.setOutputPath(job, new Path(args[1]));
29
30
job.waitForCompletion(true);
31
}
32
public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
33
34
private Text k2 = new Text();
35
private InfoBean v2 = new InfoBean();
36
37
38
protected void map(LongWritable key, Text value,
39
Mapper<LongWritable, Text, Text, InfoBean>.Context context)
40
throws IOException, InterruptedException {
41
String line=value.toString();
42
String[] hang=line.split(" ");
43
String accout=hang[0];
44
double incom=Double.parseDouble(hang[1]);
45
double zhichu=Double.parseDouble(hang[2]);
46
47
k2.set(accout);
48
v2.set(accout, incom, zhichu);
49
context.write(k2, v2);
50
}
51
52
}
53
public static class sumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{
54
private InfoBean v3 = new InfoBean();
55
56
57
protected void reduce(Text k2, Iterable<InfoBean> v2,
58
Reducer<Text, InfoBean, Text, InfoBean>.Context context)
59
throws IOException, InterruptedException {
60
double sumIncom = 0;
61
double sumZhichu = 0;
62
for(InfoBean infoBean : v2){
63
sumIncom += infoBean.getIncom();
64
sumZhichu += infoBean.getZhichu();
65
}
66
v3.set("", sumIncom, sumZhichu);
67
context.write(k2, v3);
68
}
69
}
70
}
SortStep类
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortStep {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Job job=Job.getInstance(new Configuration());
job.setJarByClass(SortStep.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(InfoBean.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SortMapper extends
Mapper<LongWritable, Text, InfoBean, NullWritable>{
private InfoBean k2 = new InfoBean();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, InfoBean, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] hang=line.split(" ");
String accout = hang[0];
double incom = Double.parseDouble(hang[1]);
double zhichu = Double.parseDouble(hang[2]);
k2.set(accout, incom, zhichu);
context.write(k2,NullWritable.get());
}
}
public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
private Text k3=new Text();
@Override
protected void reduce(InfoBean k2, Iterable<NullWritable> v2,
Reducer<InfoBean, NullWritable, Text, InfoBean>.Context context)
throws IOException, InterruptedException {
k3.set(k2.getAccout());
context.write(k3, k2);
}
}
}
1
import java.io.IOException;
2
import org.apache.hadoop.conf.Configuration;
3
import org.apache.hadoop.fs.Path;
4
import org.apache.hadoop.io.LongWritable;
5
import org.apache.hadoop.io.NullWritable;
6
import org.apache.hadoop.io.Text;
7
import org.apache.hadoop.mapreduce.Job;
8
import org.apache.hadoop.mapreduce.Mapper;
9
import org.apache.hadoop.mapreduce.Reducer;
10
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12
public class SortStep {
13
14
public static void main(String[] args) throws Exception {
15
// TODO Auto-generated method stub
16
Job job=Job.getInstance(new Configuration());
17
18
job.setJarByClass(SortStep.class);
19
20
job.setMapperClass(SortMapper.class);
21
job.setMapOutputKeyClass(InfoBean.class);
22
job.setMapOutputValueClass(NullWritable.class);
23
FileInputFormat.setInputPaths(job, new Path(args[0]));
24
25
job.setReducerClass(SortReducer.class);
26
job.setOutputKeyClass(Text.class);
27
job.setOutputValueClass(InfoBean.class);
28
FileOutputFormat.setOutputPath(job, new Path(args[1]));
29
30
job.waitForCompletion(true);
31
}
32
33
public static class SortMapper extends
34
Mapper<LongWritable, Text, InfoBean, NullWritable>{
35
36
private InfoBean k2 = new InfoBean();
37
38
39
protected void map(LongWritable key, Text value,
40
Mapper<LongWritable, Text, InfoBean, NullWritable>.Context context)
41
throws IOException, InterruptedException {
42
String line = value.toString();
43
String[] hang=line.split(" ");
44
String accout = hang[0];
45
double incom = Double.parseDouble(hang[1]);
46
double zhichu = Double.parseDouble(hang[2]);
47
48
k2.set(accout, incom, zhichu);
49
50
context.write(k2,NullWritable.get());
51
}
52
53
}
54
public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
55
56
private Text k3=new Text();
57
58
protected void reduce(InfoBean k2, Iterable<NullWritable> v2,
59
Reducer<InfoBean, NullWritable, Text, InfoBean>.Context context)
60
throws IOException, InterruptedException {
61
k3.set(k2.getAccout());
62
context.write(k3, k2);
63
}
64
65
}
66
}