主要完成的任务是从一个文件中读取数据后,去重,然后分析出作者发布的文章数量和总的阅读量之后,把分析结果保存在另一个文件中
首先创建一个flowbean
package flow; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean>{ private String userName; private long numerRead; private long numberArticle; //在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数 public FlowBean(){} //为了对象数据的初始化方便,加入一个带参的构造函数 public FlowBean(String userName, long numerRead, long numberArticle) { this.userName = userName; this.numerRead = numerRead; this.numberArticle = numberArticle; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public long getNumerRead() { return numerRead; } public void setNumerRead(long numerRead) { this.numerRead = numerRead; } public long getNumberArticle() { return numberArticle; } public void setNumberArticle(long numberArticle) { this.numberArticle = numberArticle; } //将对象数据序列化到流中 @Override public void write(DataOutput out) throws IOException { out.writeUTF(userName); out.writeLong(numerRead); out.writeLong(numberArticle); } //从数据流中反序列出对象的数据 //从数据流中读出对象字段时,必须跟序列化时的顺序保持一致 @Override public void readFields(DataInput in) throws IOException { userName = in.readUTF(); numerRead = in.readLong(); numberArticle = in.readLong(); }
@Override public String toString() { return "" + userName + " " +numerRead + " " + numberArticle; } @Override public int compareTo(FlowBean o) { return numerRead>o.getNumerRead()?-1:1; } }
然后创建map和reduce
package flow; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import flow.RemoveDup.RemoveDupMapper; import flow.RemoveDup.RemoveDupReducer; public class FlowSumRunner extends Configured implements Tool{ public static class RemoveDupMapper extends Mapper<Object, Text, Text, NullWritable> { public void map(Object key, Text value, Context context) throws IOException, InterruptedException { context.write(value, NullWritable.get()); } } public static class RemoveDupReducer extends Reducer<Text, NullWritable, Text, NullWritable> { public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); //System.out.println("reduce: key=" + key); } } /** * FlowBean 是我们自定义的一种数据类型,要在hadoop的各个节点之间传输,应该遵循hadoop的序列化机制 * 就必须实现hadoop相应的序列化接口 * @author duanhaitao@itcast.cn * */ public static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //拿一行数据 String line = value.toString(); //切分成各个字段 String[] fields = StringUtils.split(line, " "); //拿到我们需要的字段 String username = fields[1]; long userRead = Long.parseLong(fields[2]); //封装数据为kv并输出 context.write(new Text(username), new FlowBean(username,userRead,1)); System.out.println(username + " " + userRead ); } } public static class FlowSumReducer extends Reducer<Text, FlowBean, FlowBean, Text>{ @Override protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException { long numberRead_counter = 0; long numberArticle_counter = 0; for(FlowBean bean : values){ numberRead_counter += bean.getNumerRead(); numberArticle_counter += bean.getNumberArticle(); } context.write(new FlowBean(key.toString(), numberRead_counter, numberArticle_counter), new Text()); } } @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "RemoveDup"); job.setJarByClass(RemoveDup.class); job.setMapperClass(RemoveDupMapper.class); job.setCombinerClass(RemoveDupReducer.class); job.setReducerClass(RemoveDupReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("/Users/lihu/Desktop/crawle/tap.txt")); FileOutputFormat.setOutputPath(job, new Path("/Users/lihu/Desktop/crawle/quchong")); Job job1 = Job.getInstance(conf); job1.setJarByClass(FlowSumRunner.class); job1.setMapperClass(FlowSumMapper.class); job1.setReducerClass(FlowSumReducer.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(FlowBean.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job1, new Path("/Users/lihu/Desktop/crawle/quchong")); FileOutputFormat.setOutputPath(job1, new Path("/Users/lihu/Desktop/crawle/logs")); //提交job1及job2,并等待完成 if (job.waitForCompletion(true)) { return job1.waitForCompletion(true)?0:1; } return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args); System.exit(res); } }