Hadoop自带的数据类型:
Intwritable,LongWritable,Text,xxWritable.
某些情况下:使用自定义的数据类型方便一些(类似java中的pojo)。
实现:
实现writableComparable接口即可。
场景例如:
成绩表:由语文,数学,英文组成。
上传到hdfs上score目录下一个score.txt文件--文件内容如下:
想让按照总成绩进行排名。如果成绩相同,则按照语文,数学,英文来排序。
一、自定义ScoreWritable实现writableComparable接口:
package com.day07;
import org.apache.hadoop.io.WritableComparable;
import java.io.*;
public class ScoreWritable implements WritableComparable<ScoreWritable> {
int chinese;
int math;
int english;
int sum;
public ScoreWritable() {
}
public ScoreWritable(int chinese, int math, int english) {
this.chinese = chinese;
this.math = math;
this.english = english;
this.sum=chinese+english+math;
}
@Override
public String toString() {
return "ScoreWritable{" +
"chinese=" + chinese +
", math=" + math +
", english=" + english +
", sum=" + sum +
'}';
}
public int getChinese() {
return chinese;
}
public void setChinese(int chinese) {
this.chinese = chinese;
}
public int getMath() {
return math;
}
public void setMath(int math) {
this.math = math;
}
public int getEnglish() {
return english;
}
public void setEnglish(int english) {
this.english = english;
}
public int getSum() {
return sum;
}
public void setSum(int sum) {
this.sum = sum;
}
//比较
public int compareTo(ScoreWritable that) {
//先比较总成绩
if (this.sum>that.getSum()){
return -1;
}else if(this.sum<that.getSum()){
return 1;
}else{
if (this.chinese>that.getChinese()){
return -1;
}else if (this.chinese<that.getChinese()){
return 1;
}else {
return -(this.math-that.getMath());
}
}
}
//序列化--dataOutput(data流):可以自定义序列化对象,节省空间,hadoop用的就是这个流
public void write(DataOutput out) throws IOException {
out.writeInt(chinese);
out.writeInt(math);
out.writeInt(english);
out.writeInt(sum);
}
//反序列化
public void readFields(DataInput in) throws IOException {
this.chinese = in.readInt();
this.math = in.readInt();
this.english = in.readInt();
this.sum = in.readInt();
}
}
注意:
最好实现toString方法。
二、编写ScoreJob类用于测试自定义的ScoreWritable
package com.day07;
import com.day03.MaxSaleJob;
import com.google.common.io.Resources;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ScoreJob {
public static class ScoreMapper extends Mapper<LongWritable,Text,ScoreWritable,NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//super.map(key, value, context);
String[] grades = value.toString().split(",");
ScoreWritable score = new ScoreWritable(Integer.parseInt(grades[0]), Integer.parseInt(grades[1]), Integer.parseInt(grades[2]));
context.write(score,NullWritable.get());
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration coreSiteConf = new Configuration();
coreSiteConf.addResource(Resources.getResource("core-site-local.xml"));
//设置一个任务
Job job = Job.getInstance(coreSiteConf, "score");
//设置job的运行类
job.setJarByClass(ScoreJob.class);
//mrdemo/target/mrdemo-1.0-SNAPSHOT.jar
//job.setJar("mrdemo/target/mrdemo-1.0-SNAPSHOT.jar");
//设置Map和Reduce处理类
job.setMapperClass(ScoreMapper.class);
//map输出类型
job.setMapOutputKeyClass(ScoreWritable.class);
job.setMapOutputValueClass(NullWritable.class);
//设置job/reduce输出类型
/*job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);*/
//设置任务的输入路径
FileInputFormat.addInputPath(job, new Path("/score/"));
FileSystem fileSystem = FileSystem.get(coreSiteConf);
if(fileSystem.exists(new Path("/out/"))){
fileSystem.delete(new Path("/out/"),true);
};
FileOutputFormat.setOutputPath(job, new Path("/out/"));
//运行任务
boolean flag = job.waitForCompletion(true);
if(flag){
FSDataInputStream open = fileSystem.open(new Path("/out/part-r-00000"));
byte[] buffer = new byte[1024];
IOUtils.readFully(open,buffer,0,open.available());
System.out.println(new String(buffer));
}
}
}
三、测试结果,类似于一下内容