使用 Hadoop Mapreduce 进行数据处理
1. 综述
使用HDP(下载: http://zh.hortonworks.com/products/releases/hdp-2-3/#install)搭建环境,进行分布式数据处理。
项目文件下载,解压文件后将看到项目文件夹。该程序将读取 cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件, 文件中的每一行文本都是来自于 wikipedia 的一个标题, 读取每个标题,并使用 cloudMR/internal_use/tmp/dataset/misc/delimiters.txt 中指定的特殊符号分割标题成独立单词,然后将单词转换为全小写,然后将出现在
cloudMR/internal_use/tmp/dataset/misc/stopwords.txt 中的单词全部删除,最后统计剩余单词的出现次数,并输出。
程序的编译过程需要在“~/.bashrc”文件内定义自己的环境变量“$hadoop_CLASSPATH”,在“~/.bashrc”文件中添加一行:
export hadoop_CLASSPATH="/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/lib/*:/usr/hdp/2.3.2.0-2950/hadoop/.//*:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/./:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/.//*:/usr/hdp/2.3.2.0-2950/hadoop-yarn/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-yarn/.//*:/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/.//*:::/usr/share/java/mysql-connector-java-5.1.17.jar:/usr/share/java/mysql-connector-java-5.1.31-bin.jar:/usr/share/java/mysql-connector-java.jar:/usr/hdp/2.3.2.0-2950/tez/*:/usr/hdp/2.3.2.0-2950/tez/lib/*:/usr/hdp/2.3.2.0-2950/tez/conf:/usr/hdp/current/hadoop-yarn-client/.//*:/usr/hdp/current/hadoop-yarn-client/lib/*"
2. 运行过程
Step (1): 将项目文件夹放入HDP虚拟机,进入cloudMR文件夹,运行下列命令启动:
./start.sh
要求输入账号,随意输入10位数字即可。再运行下列命令检查 hadoop 是否正常运行:
hadoop version
Step (2): 编写 TitleCount.java 文件,完成相应功能。完成后的 TitleCount.java 如下:
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.*; import java.util.*; /** * Classic "Word Count" */ public class TitleCount extends Configured implements Tool { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new TitleCount(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(), "Title Count"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setMapperClass(TitleCountMap.class); job.setReducerClass(TitleCountReduce.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setJarByClass(TitleCount.class); return job.waitForCompletion(true) ? 0 : 1; } public static String readHDFSFile(String path, Configuration conf) throws IOException{ Path pt=new Path(path); FileSystem fs = FileSystem.get(pt.toUri(), conf); FSDataInputStream file = fs.open(pt); BufferedReader buffIn=new BufferedReader(new InputStreamReader(file)); StringBuilder everything = new StringBuilder(); String line; while( (line = buffIn.readLine()) != null) { everything.append(line); everything.append(" "); } return everything.toString(); } public static class TitleCountMap extends Mapper<Object, Text, Text, IntWritable> { Set<String> stopWords = new HashSet<String>(); String delimiters; @Override protected void setup(Context context) throws IOException,InterruptedException { Configuration conf = context.getConfiguration(); String delimitersPath = conf.get("delimiters"); delimiters = readHDFSFile(delimitersPath, conf); String stopWordsPath = conf.get("stopwords"); List<String> stopWordsList = Arrays.asList(readHDFSFile(stopWordsPath, conf).split(" ")); for(String e : stopWordsList){ stopWords.add(e); } } @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer stk = new StringTokenizer(value.toString(),delimiters); while(stk.hasMoreTokens()){ String e = stk.nextToken().trim().toLowerCase(); if(stopWords.contains(e) == false){ context.write(new Text(e),new IntWritable(1)); } } } } public static class TitleCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable e : values){ sum += e.get(); } context.write(key, new IntWritable(sum)); } } }
Step (3): 编译java源文件。为了方便,在cloudMR文件夹中新建output文件夹,用来存放编译生成的.class文件。使用以下命令(在cloudMR文件夹下执行):
mkdir output
javac -classpath $Hadoop_CLASSPATH -d output TitleCount.java
进入output文件夹会看到3个.class文件。
Step (4): 将编译生成的类文件打包。
首先在 cloudMR 文件夹下新建文本文件 manifest.mf,使用以下命令(在 cloudMR 文件夹下执行):
touch manifest.mf
编辑内容为
Main-Class: TitleCount.class
manifest.mf 内是一些关于这个包的信息,这里定义了主类。
再使用下面命令打包(在cloudMR文件夹下执行):
jar cvfM TitleCount.jar manifest.mf -C output/ .
这条命令的含义是;
jar 打包命令
cvfM
TitleCount.jar 打成的包的名字
manifest.mf 将这个文件打进包里
-C -C之后的文件夹内的所有文件打进包里
output/ 将output文件夹内的文件全部打进包里
. 打成的包TitleCount.jar放在当前文件夹
注意:打包过程很重要而且易错,请务必按照上文所述步骤进行。
Step (5): 将 TitleCount.jar 发布。
在发布(yarn)之前,还要完成准备工作。
将相关文件:cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件、cloudMR/internal_use/tmp/dataset/misc/delimiters.txt 、
cloudMR/internal_use/tmp/dataset/misc/stopwords.txt 上传到hdfs.
在 hdfs 的 /user/root/ 文件夹内新建 data 文件夹,将 delimiters.txt、stopwords.txt 放入 data 文件夹,再在 data 文件夹中新建 titles 文件夹,将cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件放入 titles 文件夹。
介绍相关的命令:
hadoop fs -ls 列出hdfs目录,由于没有参数,列出的是当前用户的主目录
hadoop fs -ls / 列出hdfs根目录
hadoop fs -mkdir data 在默认目录下新建data目录
hadoop fs -mkdir data/titles 在data目录中新建 titles目录
hadoop fs -copyFromLocal ./abc.txt data 上传当前目录(本地)中的 abc.txt 到 hdfs 上的 data 目录
之后便可以发布了,使用命令:
yarn jar TitleCount.jar TitleCount -D delimiters="/user/root/data/delimiters.txt" -D stopwords="/user/root/data/stopwords.txt" data/titles output
这条命令的含义是;
yarn 发布内容
jar 要发布的内容为jar包
TitleCount.jar 发布的内容
TitleCount TitleCount.jar的入口
-D delimiters="/user/root/data/delimiters.txt" -D stopwords="/user/root/data/stopwords.txt" -D后跟参数,这里定义了两个参数
data/titles 输入文件夹,其内的文件作为Map 的输入
output 输出文件存放的位置
yarn 命令执行完毕后,即可查看运行结果。