原始数据如下图:
程序:
Mapper类:
1 public class DemoMapper extends Mapper<LongWritable,Text,IntWritable,Text>{
2 IntWritable k = new IntWritable();
3 Text v = new Text();
4
5 @Override
6 protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,IntWritable,Text>.Context context)
7 throws IOException,InterruptedException{
8 String[] data = value.toString().split(",");
9 k.set(Integer.parseInt(data[0]));
10 try{
11 v.set(Utils.getFixTime(data[1]));
12 context.write(k,v);
13 }catch(ParseException e){
14 e.printStackTrace();
15 }
16 }
17 }
Reducer类:
1 public class DemoReducer extends Reducer<IntWritable,Text,NullWritable,Text>{
2 Text v = new Text();
3
4 @Override
5 protected void reduce(IntWritable key,Iterable<Text> values,Reducer<IntWritable,Text,NullWritable,Text>.Context context)
6 throws IOException,InterruptedException{
7 TreeSet<Long> timeSet = new TreeSet<>();
8 for(Text value : values){
9 try{
10 timeSet.add(getTime(value.toString()));
11 }catch{
12 e.printStackTrace();
13 }
14 }
15 long tmp = -1;
16 for(long time :timeSet){
17 if(tmp == -1){
18 v.set(key.toString()+","+getDate(time));
19 context.write(NullWritable.get(),v);
20 }else{
21 if(time - tmp > 900000){
22 for(int i=0;i<= (time - tmp)/900000;i++){
23 v.set(key.toString()+","+getDate(tmp+900000*i));
24 }
25 }else{
26 v.set(key.toString()+","+getDate(time));
27 context.write(NullWritable.get(),v);
28 }
29 }
30 tmp =time;
31 }
32 }
33 public static long getTime(String str)throws ParseException{
34 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
35 return simpleDateFormat.parse(str).getTime();
36 }
37
38 public static String getDate(long timetmp){
39 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");
40 return simpleDateFormat.format(timeStamp);
41 }
42 }
Driver类:
1 public class DemoDriver{
2 public static void main(String[] args)throws IllegalArgumentException,IOException,ClassNotFoundException,InterruptedException{
3 if(args.length <2){
4 System.err.println("you must input two argument!");
5 System.exit(-1);
6 }
7 Configuration conf = Utils.getConf();
8 Job job =Job.getInstance(conf, "fix time");
9 job.setJarByClass(DemoDriver.class);
10 job.setMapperClass(DemoMapper.class);
11 job.setReducerClass(DemoReducer.class);
12 job.setMapOutputKeyClass(IntWritable.class);
13 job.setMapOutputValueClass(Text.class);
14 job.setOutputKeyClass(NullWritable.class);
15 job.setOutputValueClass(Text.class);
16 job.setNumReduceTask(1);
17 for(int i =0;i <args.length-1;i++){
18 FileInputFormat.addInputPath(job,new Path(args[i]));
19 }
20 FileSystem.get(conf).delete(new Path(args[args.length-1]),true);
21 FileOutputFormat.setOutputPath(job,new Path(args[args.length-1]));
22 System.exit(job.waitForCompletion(true)?0:1);
23 }
24 }