首先复习一下hadoop中hdfs常用的命令
/**
* hadoop fs -mkdir 创建HDFS目录
* hadoop fs -ls 列出HDFS目录
* hadoop fs -copyFromLocal 使用-copyFromLocal 复制本地(local)文件到HDFS
* hadoop fs -put 使用-put 复制本地(local)文件到HDFS
* hadoop fs -cat 列出HDFS目录下的文件内容
* hadoop fs -copyToLocal
* 使用-copyToLocal 将HDFS上文件复制到本地(local)
* hadoop fs -get 使用-get 将HDFS上文件复制到本地(local)
* hadoop fs -cp 复制HDFs文件
* hadoop fs -rm 删除HDFS文件
*/
一、maven配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wu</groupId> <artifactId>HighTem</artifactId> <version>1.0-SNAPSHOT</version> <!--此程序需要以Hadoop文件作为输入文件,以Hadoop文件作为输出文件,因此需要用到文件系统,于是需要引入hadoop-hdfs包;--> <!--我们需要向Map-Reduce集群提交任务,需要用到Map-Reduce的客户端,于是需要导入hadoop-mapreduce-client-jobclient包;--> <!--另外,在处理数据的时候会用到一些hadoop的数据类型例如IntWritable和Text等,--> <!--因此需要导入hadoop-common包。于是运行此程序所需要的相关依赖有以下几个:--> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.4.0</version> </dependency> </dependencies> </project>
二、程序
package com.wu; /** * 数据格式 年份日期+温度 * 2014010114 * 2014010216 * 2014010317 * 2014010410 * 2014010506 * 2012010609 * 2012010732 * 2012010812 * 2012010919 * 2012011023 * 2001010116 * 2001010212 * 2001010310 * 2001010411 * 2001010529 * 2013010619 * 2013010722 * 2013010812 * 2013010929 * 2013011023 */ /** * 1、首先创建input.txt文件,将上面的数据复制进去 * 2、将input.txt文件上传到hdfs文件系统中 * hadoop fs -mkdir /test #新建一个test的目录 * hadoop fs -put /home/hadoop/runfile/input.txt /test/ #把对应的文件传入到hdfs里面的test目录下 * hadoop fs -ls /test #查询对应目录的信息 * * 3、将下面的代码进行打包,并在hadoop环境下运行 */ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @Auther: wuyilong * @Date: 2019/5/24 15:55 * @Description: 一个简单的hadoop例子 */ public class Temperature { /** * 四个泛型类型分别代表: * KeyIn Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,...) * ValueIn Mapper的输入数据的Value,这里是每行文字 * KeyOut Mapper的输出数据的Key,这里是每行文字中的“年份” * ValueOut Mapper的输出数据的Value,这里是每行文字中的“气温” */ static class TempMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(0, 4); int temperature = Integer.parseInt(line.substring(8)); context.write(new Text(year), new IntWritable(temperature)); } } /** * 四个泛型类型分别代表: * KeyIn Reducer的输入数据的Key,这里是每行文字中的“年份” * ValueIn Reducer的输入数据的Value,这里是每行文字中的“气温” * KeyOut Reducer的输出数据的Key,这里是不重复的“年份” * ValueOut Reducer的输出数据的Value,这里是这一年中的“最高气温” */ static class TempReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; StringBuilder sb = new StringBuilder(); for(IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); sb.append(value).append(", "); } context.write(key, new IntWritable(maxValue)); } } /** * * @param args * @throws Exception */ public static void main(String[] args) throws Exception { String dst = "hdfs://localhost:9000/intput.txt";// 1输入路径 String dstOut = "hdfs://localhost:9000/output";// 2输出路径,必须是不存在的,空文件加也不行 Configuration hadoopConfig = new Configuration(); hadoopConfig.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); hadoopConfig.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); Job job = new Job(hadoopConfig); // job执行作业的输入和输出文件的路径 FileInputFormat.addInputPath(job, new Path(dst)); FileOutputFormat.setOutputPath(job, new Path(dstOut)); // 指定Mapper和Reducer两个阶段的处理类 job.setMapperClass(TempMapper.class); job.setReducerClass(TempReducer.class); // 设置最后输出的Key和Value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 执行job,直到完成 job.waitForCompletion(true); System.out.println("finished"); } }