zoukankan      html  css  js  c++  java
  • hadoop的wordcount例子运行

    可以通过一个简单的例子来说明MapReduce到底是什么:

      我们要统计一个大文件中的各个单词出现的次数。由于文件太大。我们把这个文件切分成如果小文件,然后安排多个人去统计。这个过程就是”Map”。然后把每个人统计的数字合并起来,这个就是“Reduce"。

      上面的例子如果在MapReduce去做呢,就需要创建一个任务job,由job把文件切分成若干独立的数据块,并分布在不同的机器节点中。然后通过分散在不同节点中的Map任务以完全并行的方式进行处理。MapReduce会对Map的输出地行收集,再将结果输出送给Reduce进行下一步的处理。

      对于一个任务的具体执行过程,会有一个名为"JobTracker"的进程负责协调MapReduce执行过程中的所有任务。若干条TaskTracker进程用来运行单独的Map任务,并随时将任务的执行情况汇报给JobTracker。如果一个TaskTracker汇报任务失败或者长时间未对本身任务进行汇报,JobTracker会启动另外一个TaskTracker重新执行单独的Map任务。

    下面的具体的代码实现:      

    1. 编写wordcount的相关job

       (1)eclipse下创建相关maven项目,依赖jar包如下(也可参照hadoop源码包下的hadoop-mapreduce-examples项目的pom配置)

      注意:要配置一个maven插件maven-jar-plugin,并指定mainClass

    <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.5.2</version>
        </dependency>
      </dependencies>
      
      <build>
          <plugins>
              <plugin>
        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-jar-plugin</artifactId>
          <configuration>
           <archive>
             <manifest>
               <mainClass>com.xxx.demo.hadoop.wordcount.WordCount</mainClass>
             </manifest>
           </archive>
         </configuration>
        </plugin>
          </plugins>
      </build>

     (2)根据MapReduce的运行机制,一个job至少要编写三个类分别用来完成Map逻辑、Reduce逻辑、作业调度这三件事。

    • Map的代码可继承org.apache.hadoop.mapreduce.Mapper类

             

    public static class TokenizerMapper
           extends Mapper<Object, Text, Text, IntWritable>{
    
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
         //由于该例子未用到key的参数,所以该处key的类型就简单指定为Object
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(value.toString());
          while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
          }
        }
      }
    

      

    • Reduce的代码可继承org.apache.hadoop.mapreduce.Reducer类
    public class IntSumReducer
           extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
    
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
                           ) throws IOException, InterruptedException {
          int sum = 0;
          for (IntWritable val : values) {
            sum += val.get();
          }
          result.set(sum);
          context.write(key, result);
        }
      }
    

      

    • 编写main方法进行作业调度
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true) ;
        //System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    

      

    2. 上传数据文件到hadoop集群环境

    执行mvn install把项目打成jar文件然后上传到linux集群环境,使用hdfs dfs -mkdir命令在hdfs文件系统中创建相应的命令,使用hdfs dfs -put 把需要处理的数据文件上传到hdfs系统中,示例:hdfs dfs -put ${linux_path/数据文件} ${hdfs_path}

    3. 执行job

       在集群环境中执行命令: hadoop jar ${linux_path}/wordcount.jar ${hdfs_input_path}  ${hdfs_output_path}

    4. 查看统计结果

        hdfs dfs -cat ${hdfs_output_path}/输出文件名

    以上的方式在未启动hadoop集群环境时,是以Local模式运行,此时HDFS和YARN都不起作用。下面是在伪分布式模式下执行mapreduce job时需要做的工作,先把官网上列的步骤摘录出来:

    ----------------------------------------------------------------------------------------------------------------------

    配置主机名
    # vi /etc/sysconfig/network
    例如:
    NETWORKING=yes
    HOSTNAME=master


    vi /etc/hosts
    填入以下内容
    127.0.0.1 localhost

    配置ssh免密码互通
    ssh-keygen -t rsa
    # cat?~/.ssh/id_rsa.pub?>>?~/.ssh/authorized_keys

    配置core-site.xml文件(位于${HADOOP_HOME}/etc/hadoop/

    <configuration>
        <property>
            <name>fs.defaultFS</name>
            <value>hdfs://localhost:9000</value>
        </property>
    </configuration>

    配置hdfs-site.xml文件
    <configuration>
        <property>
            <name>dfs.replication</name>
            <value>1</value>
        </property>
    </configuration>

    下面的命令可以在单机伪分布模式下运行mapreduce的job

    1. Format the filesystem:
        $ bin/hdfs namenode -format
    2. Start NameNode daemon and DataNode daemon:
        $ sbin/start-dfs.sh

      The hadoop daemon log output is written to the $HADOOP_LOG_DIR directory (defaults to $HADOOP_HOME/logs).

    3. Browse the web interface for the NameNode; by default it is available at:
      • NameNode - http://localhost:50070/
    4. Make the HDFS directories required to execute MapReduce jobs:
        $ bin/hdfs dfs -mkdir /user
        $ bin/hdfs dfs -mkdir /user/<username>
    5. Copy the input files into the distributed filesystem:
        $ bin/hdfs dfs -put etc/hadoop input
    6. Run some of the examples provided:
        $ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.2.jar grep input output 'dfs[a-z.]+'
    7. Examine the output files:

      Copy the output files from the distributed filesystem to the local filesystem and examine them:

        $ bin/hdfs dfs -get output output
        $ cat output/*

      or

      View the output files on the distributed filesystem:

        $ bin/hdfs dfs -cat output/*
    8. When you're done, stop the daemons with:
        $ sbin/stop-dfs.sh
  • 相关阅读:
    C#泛型
    C#接口
    C#委托和事件
    Unity Ray 射线
    C#学习基础
    Graph | Eulerian path
    Careercup | Chapter 8
    Leetcode | Pow(x, n)
    Leetcode | Gray code
    分布式缓存
  • 原文地址:https://www.cnblogs.com/hzhuxin/p/4995629.html
Copyright © 2011-2022 走看看