package s26;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
public class MyMap extends Mapper<LongWritable,Text,Text,IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//super.map(key, value, context);
StringTokenizer st = new StringTokenizer(value.toString());
Text result = new Text();
IntWritable one = new IntWritable(1);
while (st.hasMoreTokens()){
String word = st.nextToken();
result.set(word);
context.write(result,one);
}
}
}
package s26;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyRed extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//super.reduce(key, values, context);
int sum = 0;
for(IntWritable i:values){
sum += i.get();
}
context.write(key,new IntWritable(sum));
}
}
-----------
package s26;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;
public class MyJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
/*指定InputPath、outputPath*/
args=new String[]{"hdfs://master:9000/root/experiment/datas","hdfs://master:9000/root/experiment/output"};
if(args.length!=2){
System.err.println("Usage:please put <in> <out>");
System.exit(2);
}
/*创建Configuration对象*/
Configuration conf = new Configuration();
/*设置fs.defaultFS属性值*/
conf.set("fs.defaultFS", "hdfs://master:9000");
/*创建Job对象并进行初始化*/
Job job = Job.getInstance(conf);
/*指定Jar的来源类*/
job.setJarByClass(MyJob.class);
/*指定作业名称*/
job.setJobName("MyJob");
/*设置作业输出数据的键类*/
job.setOutputKeyClass(Text.class);
/*设置作业输出的值类*/
job.setOutputValueClass(IntWritable.class);
/*设置作业的Mapper类*/
job.setMapperClass(MyMap.class);
/*设置作业的Reducer类*/
job.setReducerClass(MyRed.class);
/*设置作业的输入格式类*/
job.setInputFormatClass(TextInputFormat.class);
/*设置作业的输出格式类*/
job.setOutputFormatClass(TextOutputFormat.class);
/*设置输出地址*/
Path output=new Path(args[1]);
/*获得文件系统*/
FileSystem fs=FileSystem.get(conf);
/*路径存在则删除该路径(文件夹)*/
if(fs.exists(output)){
fs.delete(output, true);
}
/*指定InputPath、outputPath*/
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/*将作业提交给集群并等待它完成*/
job.waitForCompletion(true);
}
}