zoukankan      html  css  js  c++  java
  • Hadoop—MapReducer统计文件的单词出现的个数

     

    1. MapReduce 统计文件的单词出现的个数

    Mapper: 处理具体文本,发送结果
    Reducer: 合并各个Mapper发送过来的结果
    Job: 制定相关配置,框架

    Mapper

    package cn.itcast.hadoop.mr.wordcount;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.util.StringUtils;
    
    
    // 4个泛型中,前两个是指定mapper输入数据的类型
    // map和 reducer的输入和输出都是key-value对的形式
    // 默认情况下。框架输入的我们mapper的输入数据中,key是要处理的文本中的一行的起始偏移量, 内容就是value
    public class WCMapper extends Mapper <LongWritable, Text, Text, LongWritable> {
    
        // 每读一次数据,就调一次这个方法
        @Override
        protected void map(LongWritable key, Text value,  Context context)
        throws IOException, InterruptedException{
            //具体业务逻辑就像和在这里, 传入数据就是  key, value
    
            //将这一行的内容转化成string类型
            String line = value.toString();
    
            // 对这一行的文本按特定分隔符切分
            String[] words = StringUtils.split(line, ' ');
    
            // 便利这个单词数组, 输出为kv形式   k:单词  v:1
            for (String word : words) {
                context.write(new Text(word), new LongWritable(1));
            }
        }
    }

    Reducer

    package cn.itcast.hadoop.mr.wordcount;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    
        @Override
        protected void reduce(Text Key, Iterable<LongWritable> values, Context context) 
            throws IOException, InterruptedException {
    
            long count = 0;
    
            for (LongWritable value:values) {
                count += value.get();
            }
    
            context.write(Key, new LongWritable(count));
        }
    }

    Job

    package cn.itcast.hadoop.mr.wordcount;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import org.apache.hadoop.io.Text;
    //import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
    
    public class WCRunner {
    
        public static void main(String[] args) throws Exception{
    
            Configuration conf = new Configuration();
    
            Job wcJob = Job.getInstance(conf);
    
            wcJob.setJarByClass(WCRunner.class);
    
            //本job使用的mapper和reducer类
            wcJob.setMapperClass(WCMapper.class);
            wcJob.setReducerClass(WCReducer.class);
    
            // 指定reduce的输出数据kv类型
            wcJob.setOutputKeyClass(Text.class);
            wcJob.setOutputValueClass(LongWritable.class);
    
            // 指定mapper的输出数据kv类型
            wcJob.setMapOutputKeyClass(Text.class);
            wcJob.setMapOutputValueClass(LongWritable.class);
    
            //指定要处理的输入数据存放路径
            FileInputFormat.setInputPaths(wcJob, new Path("/wc/srcdata/"));
    
            //指定处理结果的输出数据存放路径
            FileOutputFormat.setOutputPath(wcJob, new Path("/wc/output"));
    
            //将job提交给集群运行
            wcJob.waitForCompletion(true);
        }
    }

    2. Yarn资源调度框架

    Resource Manager:
    Node Manager:

    
    
    1. wcJob.waitforCompleition启动一个RunJar进程,这个进程向RM申请执行一个Job
    2. RM 返回一个Job相关资源的路径staging-dir,和为Job产生的jobID
    3. RunJar提交资源到 HDFS的 staging-dir上
    4. RunJar提交资源完毕之后,上报RM 提交资源完毕
    5. RM下个Job加入RM中的任务队列中
    6. 各个Node Manager通过通信,从RM的任务队列中领取任务
    7. 各个Node Manager初始化  运行资源的容器,从staging-dir上面拉取资源
    8. RM选择一个Node Manager 启动MRAppMaster 来运行map reducer
    9. MRAppMaster向RM注册
    10. MSAppMaster启动Mapper任务
    11. MSAppMaster启动Reducer任务
    12. 任务完成后, 向RM注销自己

    3.几种运行模式
    本地模型运行

    在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器locaJobRunner执行
    – 输入输出数据可以放在本地路径下(c:/wc/src/data/)
    – 输入输出数据也可以放在hdfs中(hdfs://hadoop1:9000/wc/srcdata)

    在linux的ecllipse里面直接运行main 方法,则不需要添加yarn相关的配置,也会提交给localJobRunner执行
    – 输入输出数据可以放在本地路径下(/home/hadoop/wc/srcdata)
    – 输入输出数据也可以放在hdfs中(hdfs://hadoop1:9000/wc/srcdata)

    集群模式运行

    将工程打包成jar包, 上传到服务器,然后用hadoop命令提交 hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner
    在linux的eclipse中直接运行main方法,也可以提交到集群中去运行,但是,必须采取一下措施:
    – 在工程src目录下加入mapred-site.xml 和yarn-site.xml
    – 将工程打成jar包(wc.jar), 同时在main方法中添加一个conf的配置参数 conf.set(“mapreduce.job.jar”, “wc.jar”);

    在windows的eclipse中直接运行main方法,也可以提交集群中运行,但是因为平台不兼容,需要做很多的设置修改
    – 要在windows中存放一份hadoop的安装包(解压好的)
    – 要将其中的lib和bin目录替换成根据你的windows版本重新编译出的文件
    – 再要配置系统环境变量 HDOOP_HOME 和 PATH
    – 修改YarnRunner这个类的源码

  • 相关阅读:
    blocksit
    getdata
    ASP.net 探针
    301重定向
    webapi
    Unity NGUI UIPanel下对粒子的剪裁
    unity3d 之本地推送
    c#之时间戳与DateTime的相互转换
    c#之从服务器下载压缩包,并解压
    Unity3d 开发之 ulua 坑的总结
  • 原文地址:https://www.cnblogs.com/zemul/p/10820750.html
Copyright © 2011-2022 走看看