我的环境是hadoop-0.20.2,eclipse:SDK-3.3.2,
源数据为:
Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84 Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84 Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84 Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84 Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84 Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84
想要获取的数据是:
Apr 23 14:7d:c5:9e:fb:84 Apr 23 14:7d:c5:9e:fb:84 Apr 23 14:7d:c5:9e:fb:84 Apr 23 14:7d:c5:9e:fb:84 Apr 23 14:7d:c5:9e:fb:84 Apr 23 14:7d:c5:9e:fb:84
运行时输入的参数是:
hdfs的输入和输出目录:即 hdfs://cMaster:/user/joe/in hdfs://cMaster:/user/joe/out
源代码:
package hadoop; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.io.*; import org.apache.hadoop.util.*; public class test extends Configured implements Tool{ enum Counter{ LINESKIP, } public static class Map extends Mapper<LongWritable,Text,NullWritable,Text>{ public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{ String line=value.toString(); try{ String [] lineSplit=line.split(" "); String month=lineSplit[0]; String time=lineSplit[1]; String mac=lineSplit[6]; Text out=new Text(month+' '+time+' '+mac); context.write(NullWritable.get(),out); }catch(java.lang.ArrayIndexOutOfBoundsException e){ context.getCounter(Counter.LINESKIP).increment(1); return; } } } public int run(String[] args)throws Exception{ Configuration conf=getConf(); Job job=new Job(conf,"test"); job.setJarByClass(test.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.setMapperClass(Map.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args)throws Exception{ int res=ToolRunner.run(new Configuration(),new test(),args); System.exit(res); } }