0. 说明
Windows 下开发 MapReduce 程序部署到集群
1. 前提
在本地开发的时候保证 resource 中包含以下配置文件,从集群的配置文件中拷贝
在 resource 中新建 mapred-site.xml(仅做测试使用,打包的时候删掉)
<?xml version="1.0"?> <configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
在 resource 中新建 yarn-site.xml(仅做测试使用,打包的时候删掉)
<?xml version="1.0"?> <configuration> <!-- Site specific YARN configuration properties --> <property> <name>yarn.resourcemanager.hostname</name> <value>s101</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
2. 代码编写
[2.1 WCMapper.java]
package hadoop.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * Mapper 程序 */ public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * map 函数,被调用过程是通过 while 循环每行调用一次 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将 value 变为 String 格式 String line = value.toString(); // 将一行文本进行截串 String[] arr = line.split(" "); for (String word : arr) { context.write(new Text(word), new IntWritable(1)); } } }
[2.2 WCReducer.class]
package hadoop.mr.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Reducer 类 */ public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * 通过迭代所有的 key 进行聚合 */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key,new IntWritable(sum)); } }
[2.3 WCApp.class]
package hadoop.mr.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * Word Count APP */ public class WCApp { public static void main(String[] args) throws Exception { // 初始化配置文件 Configuration conf = new Configuration(); // 仅在本地开发时使用 // conf.set("fs.defaultFS", "file:///"); // 通过配置文件初始化 job Job job = Job.getInstance(conf); // 设置 job 名称 job.setJobName("Word Count"); // job 入口函数类 job.setJarByClass(WCApp.class); // 设置 mapper 类 job.setMapperClass(WCMapper.class); // 设置 reducer 类 job.setReducerClass(WCReducer.class); // 设置 map 的输出 K-V 类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 设置 reduce 的输出 K-V 类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置输入路径和输出路径 // Path pin = new Path("E:/test/wc/1.txt"); // Path pout = new Path("E:/test/wc/out"); Path pin = new Path(args[0]); Path pout = new Path(args[1]); FileInputFormat.addInputPath(job, pin); FileOutputFormat.setOutputPath(job, pout); // 执行 job job.waitForCompletion(true); } }
3. 打包项目 & 测试
使用 Maven 打包程序如下图所示:
将打包好的 jar 包和测试文件通过 Xftp 上传到服务器
将测试文件上传到 HDFS ,命令略
开启 Hadoop 集群,命令略
执行以下命令
hadoop jar myhadoop-1.0-SNAPSHOT.jar hadoop.mr.wc.WCApp /testdata/1.txt /testdata/out
命令行下可以看到结果,Web UI 查看 http://s101:8088
4. 总结
Mapreduce 作业放在集群上运行分为以下步骤:
1. 编写测试代码,测试其单机模式的运行
2. 改造代码,使其能运行在集群上,改变参数的设置方式
3. 打成 jar 包,发送到服务器中
4. 在服务器中进行测试