一、MapReduce程序编写:
(1)继承Mapper类,重写map方法:
org.apache.hadoop.mapreduce.Mapper
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 切单词 String line = value.toString(); String[] words = line.split(" "); for(String word:words){ context.write(new Text(word), new IntWritable(1)); } } }
- KEYIN :是map task读取到的数据的key的类型,是一行的起始偏移量Long
- VALUEIN:是map task读取到的数据的value的类型,是一行的内容String
- KEYOUT:是用户的自定义map方法要返回的结果kv数据的key的类型,在wordcount逻辑中,我们需要返回的是单词String
- VALUEOUT:是用户的自定义map方法要返回的结果kv数据的value的类型,在wordcount逻辑中,我们需要返回的是整数Integer
但是,在mapreduce中,map产生的数据需要传输给reduce,需要进行序列化和反序列化,而jdk中的原生序列化机制产生的数据量比较冗余,就会导致数据在mapreduce运行过程中传输效率低下所以,hadoop专门设计了自己的序列化机制,那么,mapreduce中传输的数据类型就必须实现hadoop自己的序列化接口。
hadoop为jdk中的常用基本类型Long String Integer Float等数据类型封住了自己的实现了hadoop序列化接口的类型:
LongWritable,Text,IntWritable,FloatWritable
(2)继承Reducer类,重写reduce方法:
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count = 0; Iterator<IntWritable> iterator = values.iterator(); while(iterator.hasNext()){ IntWritable value = iterator.next(); count += value.get(); } context.write(key, new IntWritable(count)); } }
(3)写job提交类:
package cn.edu360.mr.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 如果要在hadoop集群的某台机器上启动这个job提交客户端的话 * conf里面就不需要指定 fs.defaultFS mapreduce.framework.name * * 因为在集群机器上用 hadoop jar xx.jar cn.edu360.mr.wc.JobSubmitter2 命令来启动客户端main方法时, * hadoop jar这个命令会将所在机器上的hadoop安装目录中的jar包和配置文件加入到运行时的classpath中 * * 那么,我们的客户端main方法中的new Configuration()语句就会加载classpath中的配置文件,自然就有了 * fs.defaultFS 和 mapreduce.framework.name 和 yarn.resourcemanager.hostname 这些参数配置 * * @author ThinkPad * */ public class JobSubmitterLinuxToYarn { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://hdp-01:9000"); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); // 没指定默认文件系统 // 没指定mapreduce-job提交到哪运行 Job job = Job.getInstance(conf); job.setJarByClass(JobSubmitterLinuxToYarn.class); job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); FileOutputFormat.setOutputPath(job, new Path("/wordcount/output")); job.setNumReduceTasks(3); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }