1,代码
package mr;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 使用ArrayWritable
*/
public class TrafficApp4 {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf , TrafficApp4.class.getSimpleName());
job.setJarByClass(TrafficApp4.class);
FileInputFormat.setInputPaths(job, args[0]);
job.setMapperClass(TrafficMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongArrayWritable.class);
job.setReducerClass(TrafficReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongArrayWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class TrafficMapper extends Mapper<LongWritable, Text, Text, LongArrayWritable>{
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, LongArrayWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] splited = line.split(" ");
String phonenumber = splited[1];
String upPackNum = splited[6];
String downPackNum = splited[7];
String upPayLoad = splited[8];
String downPayLoad = splited[9];
Text k2 = new Text(phonenumber);
LongArrayWritable v2 = new LongArrayWritable(upPackNum, downPackNum, upPayLoad, downPayLoad);
context.write(k2, v2);
}
}
public static class TrafficReducer extends Reducer<Text, LongArrayWritable, Text, LongArrayWritable>{
@Override
protected void reduce(Text k2, Iterable<LongArrayWritable> v2s,
Reducer<Text, LongArrayWritable, Text, LongArrayWritable>.Context context)
throws IOException, InterruptedException {
long upPackNum = 0L;
long downPackNum = 0L;
long upPayLoad = 0L;
long downPayLoad = 0L;
for (LongArrayWritable v2 : v2s) {
Writable[] values = v2.get();
upPackNum += ((LongWritable)values[0]).get();
downPackNum += ((LongWritable)values[1]).get();
upPayLoad += ((LongWritable)values[2]).get();
downPayLoad += ((LongWritable)values[3]).get();
}
LongArrayWritable v3 = new LongArrayWritable(upPackNum, downPackNum, upPayLoad, downPayLoad);
context.write(k2, v3);
}
}
public static class LongArrayWritable extends ArrayWritable{
/**
* 在调用的时候,首先调用该方法,然后调用set(Writable[])
*/
public LongArrayWritable() {
super(LongWritable.class);
}
/**
* 直接调用该方法即可
* @param values
*/
public LongArrayWritable(LongWritable[] values) {
super(LongWritable.class, values);
}
/**
* 直接调用该方法即可
* @param upPackNum
* @param downPackNum
* @param upPayLoad
* @param downPayLoad
*/
public LongArrayWritable(Long upPackNum, Long downPackNum, Long upPayLoad, Long downPayLoad) {
super(LongWritable.class);
LongWritable[] values = new LongWritable[4];
values[0] = new LongWritable(upPackNum);
values[1] = new LongWritable(downPackNum);
values[2] = new LongWritable(upPayLoad);
values[3] = new LongWritable(downPayLoad);
super.set(values);
}
/**
* 直接调用该方法即可
* @param upPackNum
* @param downPackNum
* @param upPayLoad
* @param downPayLoad
*/
public LongArrayWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad) {
super(LongWritable.class);
LongWritable[] values = new LongWritable[4];
values[0] = new LongWritable(Long.parseLong(upPackNum));
values[1] = new LongWritable(Long.parseLong(downPackNum));
values[2] = new LongWritable(Long.parseLong(upPayLoad));
values[3] = new LongWritable(Long.parseLong(downPayLoad));
super.set(values);
}
@Override
public String toString() {
String[] array = super.toStrings();
return StringUtils.join(array, " ");
}
}
}
2,ArrayWritable的API
org.apache.hadoop.io
Class ArrayWritable
java.lang.Object
org.apache.hadoop.io.ArrayWritable
- 已实现的接口:
- Writable
public class ArrayWritableextends Objectimplements Writable
A Writable for arrays containing instances of a class. The elements of this writable must all be instances of the same class. If this writable will be the input for a Reducer, you will need to create a subclass that sets the value to be of the proper type. For example: public class IntArrayWritable extends ArrayWritable { public IntArrayWritable() { super(IntWritable.class); } }
构造方法摘要 |
ArrayWritable(Class<? extends Writable> valueClass) |
ArrayWritable(Class<? extends Writable> valueClass, Writable[] values) |
ArrayWritable(String[] strings) |
方法摘要 |
Writable[] |
get() |
Class |
getValueClass() |
void |
readFields(DataInput in) Deserialize the fields of this object from in . |
void |
set(Writable[] values) |
Object |
toArray() |
String[] |
toStrings() |
void |
write(DataOutput out) Serialize the fields of this object to out . |
Methods inherited from class java.lang.Object |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
ArrayWritable
public ArrayWritable(Class<? extends Writable> valueClass)
ArrayWritable
public ArrayWritable(Class<? extends Writable> valueClass,
Writable[] values)
ArrayWritable
public ArrayWritable(String[] strings)
getValueClass
public Class getValueClass()
toStrings
public String[] toStrings()
toArray
public Object toArray()
set
public void set(Writable[] values)
get
public Writable[] get()
readFields
public void readFields(DataInput in)
throws IOException
- Description copied from interface:
Writable
- Deserialize the fields of this object from
in
.
For efficiency, implementations should attempt to re-use storage in the existing object where possible.
- Specified by:
readFields
in interface Writable
- Parameters:
in
- DataInput
to deseriablize this object from.- Throws:
IOException
write
public void write(DataOutput out)
throws IOException
- Description copied from interface:
Writable
- Serialize the fields of this object to
out
.
- Specified by:
write
in interface Writable
- Parameters:
out
- DataOuput
to serialize this object into.- Throws:
IOException