zoukankan      html  css  js  c++  java
  • MapReduce分布式编程框架

    一、MapReduce分布式编程框架及yarn集群搭建

    1.大数据解决的问题?    
        海量数据的存储:hadoop->分布式文件系统HDFS
        海量数据的计算:hadoop->分布式计算框架MapReduce
        
    2.什么是MapReduce?
        分布式程序的编程框架,java-->ssh ssm,目的:简化开发!
        是基于hadoop的数据分析应用的核心框架。
        mapreduce的功能:将用户编写的业务逻辑代码和自带默认组件整合成一个完整的
        分布式运算程序,并发的运行在hadoop集群上。
        
    3.MapReduce的优缺点
        优点:
        (1)易于编程
        (2)良好的拓展性
        (3)高容错性
        (4)适合处理PB级别以上的离线处理
    
        缺点:
        (1)不擅长做实时计算
        (2)不擅长做流式计算(mr的数据源是静态的)
        (3)不支持DAG(有向图)计算(spark)
        
    4.自动化调度平台yarn(MapReduce程序的运行平台)
        MapReduce程序应该在多台机器上运行启动,而且要先执行maptask,等待每个maptask都处理完成后
        还要启动很多个reducetask,这个过程要用户手动调用任务不太现实,
        需要一个自动化的任务调度平台-->hadoop当中2.x中提供了一个分布式调度平台-YARN
        
    5.搭建yarn集群
        (1)修改配置文件 yarn-site.xml
        <property>
            <name>yarn.resourcemanager.hostname</name>
            <value>hd09-1</value>
        </property>
    
        <property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle</value>
        </property>2)然后复制到每台机器  $PWD 当前目录
        scp yarn-site.xml root@hd09-2:$PWD
        scp yarn-site.xml root@hd09-3:$PWD
        
        (3)修改slaves文件
        
        然后在hd09-1上,修改hadoop的slaves文件,列入要启动nodemanager的机器
        然后将hd09-1到所有机器的免密登陆配置好
        
        (4)脚本启动yarn集群:
        启动:
        start-yarn.sh
        停止:
        stop-yarn.sh
    
        (5)访问web端口
        启动完成后,可以在windows上用浏览器访问resourcemanager的web端口:
        http://hd09-1:8088

    二、WordCount代码实现

    1.Mapper类

    package com.css.wc;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * 思路?
     * wordcount单词计数
     * <单词,1>
     * 
     * 数据传输
     * 
     * KEYIN:数据的起始偏移量0~10 11~20 21~30
     * VALUEIN:数据
     * 
     * KEYOUT:mapper输出到reduce阶段k的类型
     * VALUEOUT:mapper输出到reduce阶段v的类型
     * <China,1><Beijing,1><love,1>
     */
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        // key 起始偏移量 value 数据 context 上下文
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 1.读取数据
            String line = value.toString();
            // 2.切割 China love
            String[] words = line.split(" ");
            // 3.循环的写到下一个阶段<China,1><love,1>
            for (String w : words) {
                context.write(new Text(w), new IntWritable(1));
            }
        }
    }

    2.Reducer类

    package com.css.wc;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * 汇总<China,2> <Beijing,2> <love,2>
     */
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            // 1.统计单词出现的次数
            int sum = 0;
            // 2.累加求和
            for (IntWritable count : values) {
                // 拿到值累加
                sum += count.get();
            }
            // 3.结果输出
            context.write(key, new IntWritable(sum));
        }    
    }

    3.Driver类

    package com.css.wc;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 驱动类
     */
    public class WordCountDriver {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            // 1.获取job信息
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            // 2.获取jar包
            job.setJarByClass(WordCountDriver.class);
            // 3.获取自定义的mapper与reducer类
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
            // 4.设置map输出的数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            // 5.设置reduce输出的数据类型(最终的数据类型)
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            // 6.设置输入存在的路径与处理后的结果路径       
            FileInputFormat.setInputPaths(job, new Path("c:/in1019"));
            FileOutputFormat.setOutputPath(job, new Path("c:/out1019"));
            // 7.提交任务
            boolean rs = job.waitForCompletion(true);
            System.out.println(rs ? 0 : 1);
        }
    }

    4.输入的文件words.txt

    I love Beijing
    I love China
    Beijing is the capital of China

    5.输出的文件part-r-00000

    Beijing    2
    China    2
    I    2
    capital    1
    is    1
    love    2
    of    1
    the    1
  • 相关阅读:
    今天还要去一次北仑
    重归漫漫长路
    双休日,累
    调整心情,迎接新的挑战
    多喝点水,对身体有好处
    丈人生病住院了
    WPF,DataGrid数据绑定
    AXIS2简介
    心事一件件的了掉,希望一切都能恢复到正常
    驾车是种乐趣,也是种累
  • 原文地址:https://www.cnblogs.com/areyouready/p/9853431.html
Copyright © 2011-2022 走看看