1.读取数据文件,赋给对象并输出
1.数据文件:
name,age,score,sex
tom,10,100,女
susa,9,99,男
hua,60,10,dog
2.创建对象并实现接口WritableComparable或Writable
public class Person02 implements WritableComparable<Person02> { private String name; private int age; private int score; private String sex; @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.name); dataOutput.writeInt(this.age); dataOutput.writeInt(this.score); dataOutput.writeUTF(this.sex); } @Override public void readFields(DataInput dataInput) throws IOException { this.name = dataInput.readUTF(); this.age=dataInput.readInt(); this.score=dataInput.readInt(); this.sex=dataInput.readUTF(); }
3.编写MappReduce 方法,如果不需要Reducer方法处理,仅mapp方法也可
public class OnlyMapp { public static class OnMapp extends Mapper<LongWritable, Text,Person02, NullWritable>{ Person02 person; Text text; @Override protected void setup(Context context) throws IOException, InterruptedException { person=new Person02(); text=new Text(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (key.get()!=0){ //设置汉字编码格式 String line=new String(value.getBytes(),0,value.getLength(),"GBK"); String[] split = line.split(","); person.setName(split[0]); person.setAge(Integer.parseInt(split[1])); person.setScore(Integer.parseInt(split[2])); person.setSex(split[3]); context.write(person,NullWritable.get()); } } } public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(OnlyMapp.class); job.setMapperClass(OnMapp.class); job.setOutputKeyClass(Person.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job,"F:\input\person.txt"); Path path=new Path("F:\out3"); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)){ fs.delete(path,true); } FileOutputFormat.setOutputPath(job,path); job.submit(); } }
4.输出结果文件
2.读取Json格式文件,并赋给对象并输出
数据文件:
1.创建对象并实现接口的方法
public class Phone implements Writable { private int uid; private String phone; private String addr;
2.编写MappReduce方法
public class JsonToObject { public static class JSMapp extends Mapper<LongWritable, Text,Phone, NullWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { ObjectMapper objectMapper = new ObjectMapper(); //设置汉字字符编码 String line=new String(value.getBytes(),0,value.getLength(),"GBK"); Phone phone = objectMapper.readValue(line, Phone.class); context.write(phone,NullWritable.get()); } } public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(JsonToObject.class); job.setMapperClass(JSMapp.class); job.setOutputKeyClass(Phone.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); FileInputFormat.setInputPaths(job,"F:\input\phone.json"); Path path=new Path("F:\out4"); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)){ fs.delete(path,true); } FileOutputFormat.setOutputPath(job,path); job.submit(); }
3.输出结果文件