首先在hadoop中建立input文件夹放几个文件,里边写点东西。比如我放了三个,分别写的是
第一个
hello hadoop
bye hadoop
第二个
hello world
bye world
第三个
hello bigdata
然后就有下边这段代码做单词统计:
1 import java.io.File; 2 import java.io.IOException; 3 import java.net.URI; 4 import java.net.URISyntaxException; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.FileSystem; 8 import org.apache.hadoop.fs.Path; 9 import org.apache.hadoop.io.LongWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 17 public class WorldCount { 18 19 static final String INPUT_PATH = "hdfs://masters:9000/user/hadoop/input"; 20 static final String OUTPUT_PATH = "hdfs://masters:9000/user/hadoop/output"; 21 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { 22 23 //添加以下的代码,就可以联通,不知道咋回事 24 String path = new File(".").getCanonicalPath(); 25 System.getProperties().put("hadoop.home.dir", path); 26 new File("./bin").mkdirs(); 27 new File("./bin/winutils.exe").createNewFile(); 28 29 Configuration conf = new Configuration(); 30 Path outpath = new Path(OUTPUT_PATH); 31 32 Job job = new Job(conf, "WorldCount"); 33 34 FileInputFormat.setInputPaths(job, INPUT_PATH); 35 FileOutputFormat.setOutputPath(job, outpath); 36 37 //检测输出路径是否存在,如果存在就删除,否则会报错 38 FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf); 39 if(fileSystem.exists(outpath)){ 40 fileSystem.delete(outpath, true); 41 } 42 43 job.setMapperClass(MyMapper.class); 44 job.setReducerClass(MyReducer.class); 45 job.setOutputKeyClass(Text.class); 46 job.setOutputValueClass(LongWritable.class); 47 job.waitForCompletion(true); 48 } 49 50 //输入,map,即拆分过程 51 static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ 52 53 /* 54 * 输入为(key,value)输出为(value,count数量) 55 * 所以LongWritable, Text, Text, LongWritable分别代表 key(行号) value value count 56 * 其中LongWritable和Text是hadoop定义的类型,分别代表long和string两种类型 57 * */ 58 protected void map(LongWritable k1, Text v1, Context context)throws IOException, InterruptedException{ 59 String[] splits = v1.toString().split(" ");//按照空格拆分 60 for(String str: splits){ 61 System.out.println("---" + str); 62 context.write(new Text(str), new LongWritable(1));//拆分出来的形式为(“单词”,出现次数(这里默认为1)) 63 } 64 } 65 } 66 67 //输出,reduce,汇总过程 68 static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ 69 protected void reduce( 70 Text k2, //输出的内容,即value 71 Iterable<LongWritable> v2s, //是一个longwritable类型的数组,所以用了Iterable这个迭代器,且元素为v2s 72 org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context) 73 //这里一定设置好,不然输出会变成单个单词,从而没有统计数量 74 throws IOException, InterruptedException { 75 //列表求和 初始为0 76 long times = 0L; 77 for(LongWritable count:v2s){ 78 times += count.get(); 79 } 80 context.write(k2, new LongWritable(times)); 81 } 82 } 83 }
然后就成了,看下结果
第23行到第27行不写就会报错,我也不知道咋回事,如果哪个大牛知道咋回事,非常期待留言解答。