zoukankan      html  css  js  c++  java
  • 云计算(一)——使用 Hadoop Mapreduce 进行数据处理

    使用 Hadoop Mapreduce 进行数据处理

    1. 综述

      使用HDP(下载: http://zh.hortonworks.com/products/releases/hdp-2-3/#install)搭建环境,进行分布式数据处理。

      项目文件下载,解压文件后将看到项目文件夹。该程序将读取 cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件, 文件中的每一行文本都是来自于 wikipedia 的一个标题, 读取每个标题,并使用 cloudMR/internal_use/tmp/dataset/misc/delimiters.txt 中指定的特殊符号分割标题成独立单词,然后将单词转换为全小写,然后将出现在
    cloudMR/internal_use/tmp/dataset/misc/stopwords.txt 中的单词全部删除,最后统计剩余单词的出现次数,并输出。

      程序的编译过程需要在“~/.bashrc”文件内定义自己的环境变量“$hadoop_CLASSPATH”,在“~/.bashrc”文件中添加一行:  

    export hadoop_CLASSPATH="/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/conf:/usr/hdp/2.3.2.0-2950/hadoop/lib/*:/usr/hdp/2.3.2.0-2950/hadoop/.//*:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/./:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-hdfs/.//*:/usr/hdp/2.3.2.0-2950/hadoop-yarn/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-yarn/.//*:/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/lib/*:/usr/hdp/2.3.2.0-2950/hadoop-mapreduce/.//*:::/usr/share/java/mysql-connector-java-5.1.17.jar:/usr/share/java/mysql-connector-java-5.1.31-bin.jar:/usr/share/java/mysql-connector-java.jar:/usr/hdp/2.3.2.0-2950/tez/*:/usr/hdp/2.3.2.0-2950/tez/lib/*:/usr/hdp/2.3.2.0-2950/tez/conf:/usr/hdp/current/hadoop-yarn-client/.//*:/usr/hdp/current/hadoop-yarn-client/lib/*"

      

    2. 运行过程

    Step (1): 将项目文件夹放入HDP虚拟机,进入cloudMR文件夹,运行下列命令启动:

    ./start.sh

    要求输入账号,随意输入10位数字即可。再运行下列命令检查 hadoop 是否正常运行:

    hadoop version

    Step (2): 编写 TitleCount.java 文件,完成相应功能。完成后的 TitleCount.java 如下:

    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.*;
    import java.util.*;
    /**
     * Classic "Word Count"
     */
    public class TitleCount extends Configured implements Tool {
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new TitleCount(), args);
            System.exit(res);
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Job job = Job.getInstance(this.getConf(), "Title Count");
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            job.setMapperClass(TitleCountMap.class);
            job.setReducerClass(TitleCountReduce.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.setJarByClass(TitleCount.class);
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static String readHDFSFile(String path, Configuration conf) throws IOException{
            Path pt=new Path(path);
            FileSystem fs = FileSystem.get(pt.toUri(), conf);
            FSDataInputStream file = fs.open(pt);
            BufferedReader buffIn=new BufferedReader(new InputStreamReader(file));
    
            StringBuilder everything = new StringBuilder();
            String line;
            while( (line = buffIn.readLine()) != null) {
                everything.append(line);
                everything.append("
    ");
            }
            return everything.toString();
        }
        
        public static class TitleCountMap extends Mapper<Object, Text, Text, IntWritable> {
            Set<String> stopWords = new HashSet<String>();
            String delimiters;
    
            @Override
            protected void setup(Context context) throws IOException,InterruptedException {
    
                Configuration conf = context.getConfiguration();
                
                String delimitersPath = conf.get("delimiters");
                delimiters = readHDFSFile(delimitersPath, conf);
                
                String stopWordsPath = conf.get("stopwords");
                List<String> stopWordsList = Arrays.asList(readHDFSFile(stopWordsPath, conf).split("
    "));
                for(String e : stopWordsList){
                    stopWords.add(e);
                }
            }
    
            @Override
            public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                StringTokenizer stk = new StringTokenizer(value.toString(),delimiters);
                while(stk.hasMoreTokens()){
                    String e = stk.nextToken().trim().toLowerCase();
                    if(stopWords.contains(e) == false){
                        context.write(new Text(e),new IntWritable(1));
                    }
                }
            }
        }
    
        public static class TitleCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
            @Override
            public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for(IntWritable e : values){
                    sum += e.get();
                }
                context.write(key, new IntWritable(sum));
            }
        }
    }

    Step (3): 编译java源文件。为了方便,在cloudMR文件夹中新建output文件夹,用来存放编译生成的.class文件。使用以下命令(在cloudMR文件夹下执行):

    mkdir output
    
    javac -classpath $Hadoop_CLASSPATH -d output TitleCount.java

    进入output文件夹会看到3个.class文件。

    Step (4): 将编译生成的类文件打包。

    首先在 cloudMR 文件夹下新建文本文件 manifest.mf,使用以下命令(在 cloudMR 文件夹下执行):

    touch manifest.mf

    编辑内容为

    Main-Class: TitleCount.class

    manifest.mf 内是一些关于这个包的信息,这里定义了主类。

    再使用下面命令打包(在cloudMR文件夹下执行):

    jar cvfM TitleCount.jar manifest.mf -C output/ .

    这条命令的含义是;

    jar        打包命令

    cvfM       

    TitleCount.jar   打成的包的名字

    manifest.mf     将这个文件打进包里

    -C        -C之后的文件夹内的所有文件打进包里

    output/      将output文件夹内的文件全部打进包里

    .          打成的包TitleCount.jar放在当前文件夹

    注意:打包过程很重要而且易错,请务必按照上文所述步骤进行。

    Step (5): 将 TitleCount.jar 发布。

    在发布(yarn)之前,还要完成准备工作。

    将相关文件:cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件、cloudMR/internal_use/tmp/dataset/misc/delimiters.txt 、
    cloudMR/internal_use/tmp/dataset/misc/stopwords.txt 上传到hdfs.

    在 hdfs 的 /user/root/ 文件夹内新建 data 文件夹,将 delimiters.txt、stopwords.txt 放入 data 文件夹,再在 data 文件夹中新建 titles 文件夹,将cloudMR/internal_use/tmp/dataset/titles 目录下的四个文本文件放入 titles 文件夹。

    介绍相关的命令:

    hadoop fs -ls                                         列出hdfs目录,由于没有参数,列出的是当前用户的主目录

    hadoop fs -ls /                                         列出hdfs根目录

    hadoop fs -mkdir data                                       在默认目录下新建data目录

    hadoop fs -mkdir data/titles                               在data目录中新建 titles目录

    hadoop fs -copyFromLocal ./abc.txt  data           上传当前目录(本地)中的 abc.txt 到 hdfs 上的 data 目录

      之后便可以发布了,使用命令:  

    yarn jar TitleCount.jar TitleCount -D delimiters="/user/root/data/delimiters.txt" -D stopwords="/user/root/data/stopwords.txt" data/titles output

    这条命令的含义是;

    yarn                              发布内容

    jar                                要发布的内容为jar包

    TitleCount.jar                发布的内容

    TitleCount                 TitleCount.jar的入口

    -D delimiters="/user/root/data/delimiters.txt" -D stopwords="/user/root/data/stopwords.txt"       -D后跟参数,这里定义了两个参数

    data/titles                     输入文件夹,其内的文件作为Map 的输入

    output                          输出文件存放的位置

      yarn 命令执行完毕后,即可查看运行结果。

  • 相关阅读:
    【JAVA】日志管理slf4j
    【JAVA】从java线程来看java内存模型
    【数据结构】---栈和队列
    【数据结构】---链表
    C++ 第10章 反省程序与C++标准模板库
    回忆C++知识点(1)
    C++ 第8章 多态性
    C++ 第7章 继承与派生
    C++ 第6章 数组、指针、字符串
    C++ 第5章 数据的共享与保护
  • 原文地址:https://www.cnblogs.com/yongheng20/p/5391334.html
Copyright © 2011-2022 走看看