zoukankan      html  css  js  c++  java
  • day09 MapReduce

    PS:上图为MapReduce原理全解剖,  图上带红色标识的部分是能够自定义的
    1.首先要解决读文件的问题。 mapTask中有个read()方法,专门负责读取键值对,而且是整行整行的读
    2.在读好文件以后,给到Mapper中的map方法,然后进入环形缓冲区(就是一个数组),该缓冲区满80%就溢出(溢出之前都排续好了)一个文件,溢出20% 分区排序,溢出部分用spllier收集数据,然多多个文件合并成一个大文件
    3.合并数据 。 通过框架自带的组件,会把相同的key进行分组,继续合并文件,给reduce任务。reduce再通过框架内部写入HDFS
    4.shuffle其实就是上图 黄色部分 ,红色部分可以自动完成, 黑色部分是由框架自动完成
    --------------------------------------------
    PS:Combinder就是如果有a,1.......好多,然后出来以后在中间就开始合并,这样就可以降低程序执行的效率,其实就是相当于提前Reduce。 其他执行的时候一直循环

    package cn.itcast.bigdata.mr.wcdemo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
      * 输如为map的-------------------提前合并文件
      * @author: 张政
      * @date: 2016年4月11日 下午7:08:18
      * @package_name: day07.sample
     */
    public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
            int count=0;
            for(IntWritable v: values){
                
                count += v.get();
            }
            
            context.write(key, new IntWritable(count));
            
            
        }
        
        
    }
     
    PS:当有很多小的切片文件的时候,使用combinder这样会减少开启 map的程序,提高效率,程序下面部分指定的开启的文件的大小范围;绑定类也可以不指定

     ------------------------------------MapReduce和yarn的工作机制

    PS:yarn其实是就是一个类似虚拟化隔离的技术,好比我买的服务器要隔离开来,提供一种内部运算的机制,下图就是MapReduce和yarn的的执行顺序

    3.3. MapReduce与YARN

    3.3.1 YARN概述---本身也是集群

         Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而mapreduce等运算程序则相当于运行于操作系统之上的应用程序

    3.3.2 YARN的重要概念

    1、  yarn并不清楚用户提交的程序的运行机制

    2、  yarn只提供运算资源的调度(用户程序向yarn申请资源,yarn就负责分配资源)

    3、  yarn中的主管角色叫ResourceManager

    4、  yarn中具体提供运算资源的角色叫NodeManager

    5、  这样一来,yarn其实就与运行的用户程序完全解耦,就意味着yarn上可以运行各种类型的分布式运算程序(mapreduce只是其中的一种),比如mapreduce、storm程序,spark程序,tez ……

    6、  所以,spark、storm等运算框架都可以整合在yarn上运行,只要他们各自的框架中有符合yarn规范的资源请求机制即可

    7、  Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享

     -----------------------------


    PS:
    1.当主函数程序submit的时候,就会按照上述执行。
    PS:
    1.yarn是一种资源调度模式,MapReduce可以使用,其他的框架也可以使用。

    2.2 MAPREDUCE程序运行模式

    2.2.1 本地运行模式

    (1)mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行

    (2)而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上

    (3)怎样实现本地运行?写一个程序,不要带集群的配置文件(本质是你的mr程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname参数)

    (4)本地模式非常便于进行业务逻辑的debug,只要在eclipse中打断点即可

     步骤:

    1.如果在windows下想运行本地模式来测试程序逻辑,需要在windows中配置环境变量:

    %HADOOP_HOME%  =  d:/hadoop-2.6.1

    %PATH% =  %HADOOP_HOME%in

    并且要将d:/hadoop-2.6.1的lib和bin目录替换成windows平台编译的版本

    2.添加测试参数

     

    3.修改执行平台

    4.文件夹  测试擦看,记得要编写测试数据

     

    2.2.2 集群运行模式

    (1)将mapreduce程序提交给yarn集群resourcemanager,分发到很多的节点上并发执行

    (2)处理的数据和输出结果应该位于hdfs文件系统

    (3)提交集群的实现步骤:

    A、将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动

         $ hadoop jar wordcount.jar cn.itcast.bigdata.mrsimple.WordCountDriver inputpath outputpath

    B、直接在linux的eclipse中运行main方法

    (项目中要带参数:mapreduce.framework.name=yarn以及yarn的两个基本配置)

    C、如果要在windows的eclipse中提交job给集群,则要修改YarnRunner类

    PS:
    通过对wordcount程序的调试,我明白程序先执行map阶段,再执行reduce阶段。
    在map阶段,主要是对输入的键值对进行分组分割
    在reduce节点,对每个以key为分组进行操作,最后汇总输出文件中

     -------------------------------------------------------------------

    2018-1-4   Java 也可以直接编译  jar包控制执行,下面对   在eclipse远程传递jar包,给集群运行,而不用命令(注意4个关键点)  

    package cn.itcast.bigdata.mr.wcdemo;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 相当于一个yarn集群的客户端
     * 需要在此封装我们的mr程序的相关运行参数,指定jar包
     * 最后提交给yarn
     * @author
     *
     */
    public class WordcountDriver {
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            
            //是否运行为本地模式,就是看这个参数值是否为local,默认就是local
            //conf.set("mapreduce.framework.name", "local");
            
            //本地模式运行mr程序时,输入输出的数据可以在本地,也可以在hdfs上
            //到底在哪里,就看以下两行配置你用哪行,默认就是file:///
            /*conf.set("fs.defaultFS", "hdfs://mini1:9000/");*/
            //conf.set("fs.defaultFS", "file:///");
            
            
            
            //运行集群模式,就是把程序提交到yarn中去运行
            //要想运行为集群模式,以下3个参数要指定为集群上的值,    第一点   ,要么把hadoop配置文件放入项目上
            conf.set("mapreduce.framework.name", "yarn");
            conf.set("yarn.resourcemanager.hostname", "192.168.8.10");
            conf.set("fs.defaultFS", "hdfs://192.168.8.10:9000/");
            Job job = Job.getInstance(conf);
            
            job.setJar("d:/wordcount.jar");      //第二点
            //指定本程序的jar包所在的本地路径
            //job.setJarByClass(WordcountDriver.class);
            
            //指定本业务job要使用的mapper/Reducer业务类
            job.setMapperClass(WordcountMapper.class);
            job.setReducerClass(WordcountReducer.class);
            
            //指定mapper输出数据的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            //指定最终输出的数据的kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            //指定需要使用combiner,以及用哪个类作为combiner的逻辑
            /*job.setCombinerClass(WordcountCombiner.class);*/
            job.setCombinerClass(WordcountReducer.class);
            
            //如果不设置InputFormat,它默认用的是TextInputformat.class
            job.setInputFormatClass(CombineTextInputFormat.class);
            CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
            CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
            
            //指定job的输入原始文件所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            //指定job的输出结果所在目录
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
            /*job.submit();*/
            boolean res = job.waitForCompletion(true);
            System.exit(res?0:1);
            
        }
        
    
    }

    3.将该文件引入到项目中,包名都不能够改

    
    

    
    

    4.运行时的参数设置

    
    

     

    5.测试结果

     ------------------------------------------------------------------------------------------------------------------------------

    1、需求:

    订单数据表t_order:

    id

    date

    pid

    amount

    1001

    20150710

    P0001

    2

    1002

    20150710

    P0001

    3

    1002

    20150710

    P0002

    3

    商品信息表t_product

    id

    pname

    category_id

    price

    P0001

    小米5

    1000

    2

    P0002

    锤子T1

    1000

    3

    假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算:

    select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

    2、实现机制:

    通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联

    package cn.itcast.bigdata.mr.rjoin;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    public class InfoBean implements Writable {
    
        private int order_id;
        private String dateString;
        private String p_id;
        private int amount;
        private String pname;
        private int category_id;
        private float price;
    
        // flag=0表示这个对象是封装订单表记录
        // flag=1表示这个对象是封装产品信息记录
        private String flag;
    
        public InfoBean() {
        }
    
        public void set(int order_id, String dateString, String p_id, int amount, String pname, int category_id, float price, String flag) {
            this.order_id = order_id;
            this.dateString = dateString;
            this.p_id = p_id;
            this.amount = amount;
            this.pname = pname;
            this.category_id = category_id;
            this.price = price;
            this.flag = flag;
        }
    
        public int getOrder_id() {
            return order_id;
        }
    
        public void setOrder_id(int order_id) {
            this.order_id = order_id;
        }
    
        public String getDateString() {
            return dateString;
        }
    
        public void setDateString(String dateString) {
            this.dateString = dateString;
        }
    
        public String getP_id() {
            return p_id;
        }
    
        public void setP_id(String p_id) {
            this.p_id = p_id;
        }
    
        public int getAmount() {
            return amount;
        }
    
        public void setAmount(int amount) {
            this.amount = amount;
        }
    
        public String getPname() {
            return pname;
        }
    
        public void setPname(String pname) {
            this.pname = pname;
        }
    
        public int getCategory_id() {
            return category_id;
        }
    
        public void setCategory_id(int category_id) {
            this.category_id = category_id;
        }
    
        public float getPrice() {
            return price;
        }
    
        public void setPrice(float price) {
            this.price = price;
        }
    
        public String getFlag() {
            return flag;
        }
    
        public void setFlag(String flag) {
            this.flag = flag;
        }
    
        /**
         * private int order_id; private String dateString; private int p_id;
         * private int amount; private String pname; private int category_id;
         * private float price;
         */
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(order_id);
            out.writeUTF(dateString);
            out.writeUTF(p_id);
            out.writeInt(amount);
            out.writeUTF(pname);
            out.writeInt(category_id);
            out.writeFloat(price);
            out.writeUTF(flag);
    
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            this.order_id = in.readInt();
            this.dateString = in.readUTF();
            this.p_id = in.readUTF();
            this.amount = in.readInt();
            this.pname = in.readUTF();
            this.category_id = in.readInt();
            this.price = in.readFloat();
            this.flag = in.readUTF();
    
        }
    
        @Override
        public String toString() {
            return "order_id=" + order_id + ", dateString=" + dateString + ", p_id=" + p_id + ", amount=" + amount + ", pname=" + pname + ", category_id=" + category_id + ", price=" + price + ", flag=" + flag;
        }
    
    
    }
    package cn.itcast.bigdata.mr.rjoin;
    
    import java.io.IOException;
    import java.util.ArrayList;
    
    import org.apache.commons.beanutils.BeanUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 订单表和商品表合到一起
    order.txt(订单id, 日期, 商品编号, 数量)
        1001    20150710    P0001    2
        1002    20150710    P0001    3
        1002    20150710    P0002    3
        1003    20150710    P0003    3
    product.txt(商品编号, 商品名字, 价格, 数量)
        P0001    小米5    1001    2
        P0002    锤子T1    1000    3
        P0003    锤子    1002    4
     * @author: 张政
     * @date: 2016-2016年4月15日-上午8:45:57
     * @package_name: shizhan_03_hadoop
     * @package_name: cn.itcast.bigdata.mr.rjoin
     */
    public class RJoin {
    
        static class RJoinMapper extends Mapper<LongWritable, Text, Text, InfoBean> {
            InfoBean bean = new InfoBean();
            Text k = new Text();
            /**
             * 根据pid进行key的分组
             */
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String line = value.toString();
    
                FileSplit inputSplit = (FileSplit) context.getInputSplit();   //只是另一种获取文件的方式
                String name = inputSplit.getPath().getName();
                // 通过文件名判断是哪种数据
                String pid = "";
                if (name.startsWith("order")) {
                    String[] fields = line.split("	");
                    // id date pid amount
                    pid = fields[2];
                    bean.set(Integer.parseInt(fields[0]), fields[1], pid, Integer.parseInt(fields[3]), "", 0, 0, "0");
    
                } else {
                    String[] fields = line.split("	");
                    // id pname category_id price
                    pid = fields[0];
                    bean.set(0, "", pid, 0, fields[1], Integer.parseInt(fields[2]), Float.parseFloat(fields[3]), "1");
    
                }
                k.set(pid);
                context.write(k, bean);
            }
    
        }
        /**
         * 针对相同的key进行操作,每一个key输出一行
         * @author bee
         *
         */
        static class RJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable> {
    
            @Override
            protected void reduce(Text pid, Iterable<InfoBean> beans, Context context) throws IOException, InterruptedException {
                InfoBean pdBean = new InfoBean();//记录产品中的信息,一个产品只有一个
                ArrayList<InfoBean> orderBeans = new ArrayList<InfoBean>();
    
                for (InfoBean bean : beans) {  //这个是根据pid进行分类
                    if ("1".equals(bean.getFlag())) {    //产品的
                        try {
                            BeanUtils.copyProperties(pdBean, bean);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    } else {
                        InfoBean odbean = new InfoBean();
                        try {
                            BeanUtils.copyProperties(odbean, bean);
                            orderBeans.add(odbean);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
    
                }
    
                // 拼接两类数据形成最终结果
                for (InfoBean bean : orderBeans) {
    
                    bean.setPname(pdBean.getPname());
                    bean.setCategory_id(pdBean.getCategory_id());
                    bean.setPrice(pdBean.getPrice());
    
                    context.write(bean, NullWritable.get());
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            
            conf.set("mapred.textoutputformat.separator", "	");
            
            Job job = Job.getInstance(conf);
    
            // 指定本程序的jar包所在的本地路径
            job.setJarByClass(RJoin.class);
            //job.setJar("d:/rjoin.jar");  这种是通过本地运行直接调试   ,不过我至今没成功,都是先上传到HDFS然后再二次执行
    
        
            // 指定本业务job要使用的mapper/Reducer业务类
            job.setMapperClass(RJoinMapper.class);
            job.setReducerClass(RJoinReducer.class);
    
            // 指定mapper输出数据的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(InfoBean.class);
    
            // 指定最终输出的数据的kv类型
            job.setOutputKeyClass(InfoBean.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 指定job的输入原始文件所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            // 指定job的输出结果所在目录
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // 将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
            /* job.submit(); */
            boolean res = job.waitForCompletion(true);
            System.exit(res ? 0 : 1);
    
        }
    }
    PS:简单的总结一下:
    1.这两种方式首先都需要创建好 文件夹和待执行的文件。
    2.当为 上传执行的时候 hadoop jar rjoin.jar cn.itcast.bigdata.mr.rjoin.RJoin /rjoin/input /rjoin/output 传入参数执行
    当为eclipse远程执行的时候,现在本地生产jar包,然后再上传,注意上面的四点



     ---------------------day10

    Map端的join,避免数据倾斜

    PS:思想就是在 DistributeCashe缓冲中,放置一个文件,每个map任务执行的时候都去取本地文件,在map端完成任务的执行。

     下面看代码。    好好体会核心思想

    1001 pd001 300
    1001 pd002 20
    1002 pd003 40
    1003 pd002 50

    pd001,apple
    pd002,banana
    pd003,orange

    package cn.itcast.bigdata.mr.mapsidejoin;
    
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class MapSideJoin {
    
        public static class MapSideJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
            // 用一个hashmap来加载保存产品信息表
            Map<String, String> pdInfoMap = new HashMap<String, String>();
    
            Text k = new Text();
    
            /**
             * 通过阅读父类Mapper的源码,发现 setup方法是在maptask处理数据之前调用一次 可以用来做一些初始化工作
             * 文件就放入Map阶段中
             */
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pdts.txt")));
                String line;
                while (StringUtils.isNotEmpty(line = br.readLine())) {
                    String[] fields = line.split(",");
                    pdInfoMap.put(fields[0], fields[1]);
                }
                br.close();
            }
    
            // 由于已经持有完整的产品信息表,所以在map方法中就能实现join逻辑了
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String orderLine = value.toString();
                String[] fields = orderLine.split("	");
                String pdName = pdInfoMap.get(fields[1]);//在表中查找到名字,然后给到赋值到一起
                k.set(orderLine + "	" + pdName);
                context.write(k, NullWritable.get());
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(MapSideJoin.class);
    
            job.setMapperClass(MapSideJoinMapper.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:/tmp/mapjoininput"));
            FileOutputFormat.setOutputPath(job, new Path("D:/tmp/output"));
    
            // 指定需要缓存一个文件到所有的maptask运行节点工作目录
            /* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中
            /* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中
            /* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录
            /* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录
    
            // 将产品表文件缓存到task工作节点的工作目录中去
            job.addCacheFile(new URI("file:/D:/tmp/mapjoincache/pdts.txt"));
    
            //map端join的逻辑不需要reduce阶段,设置reducetask数量为0
            job.setNumReduceTasks(0);
            
            boolean res = job.waitForCompletion(true);
            System.exit(res ? 0 : 1);
    
        }
    
    }

     

     day10

    PS:如果map少,reducetask的任务多的话,就会引起数据倾斜, 一般通过map端的join来解决问题,那么mapReduce有个组件DistributedCache缓存来解决该问题

     

    public class TestDistributedCache {
        static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{
            FileReader in = null;
            BufferedReader reader = null;
            HashMap<String,String> b_tab = new HashMap<String, String>();
            String localpath =null;
            String uirpath = null;
            
            //是在map任务初始化的时候调用一次
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                //通过这几句代码可以获取到cache file的本地绝对路径,测试验证用
                Path[] files = context.getLocalCacheFiles();
                localpath = files[0].toString();
                URI[] cacheFiles = context.getCacheFiles();
                
                
                //缓存文件的用法——直接用本地IO来读取
                //这里读的数据是map task所在机器本地工作目录中的一个小文件
                in = new FileReader("b.txt");
                reader =new BufferedReader(in);
                String line =null;
                while(null!=(line=reader.readLine())){
                    
                    String[] fields = line.split(",");
                    b_tab.put(fields[0],fields[1]);
                    
                }
                IOUtils.closeStream(reader);
                IOUtils.closeStream(in);
                
            }
            
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                //这里读的是这个map task所负责的那一个切片数据(在hdfs上)
                 String[] fields = value.toString().split("	");
                 
                 String a_itemid = fields[0];
                 String a_amount = fields[1];
                 
                 String b_name = b_tab.get(a_itemid);
                 
                 // 输出结果  1001    98.9    banan
                 context.write(new Text(a_itemid), new Text(a_amount + "	" + ":" + localpath + "	" +b_name ));
                 
            }
        }
        public static void main(String[] args) throws Exception {
            
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            
            job.setJarByClass(TestDistributedCache.class);
            
            job.setMapperClass(TestDistributedCacheMapper.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            
            //这里是我们正常的需要处理的数据所在路径
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            //不需要reducer
            job.setNumReduceTasks(0);
            //分发一个文件到task进程的工作目录
            job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));
            
            //分发一个归档文件到task进程的工作目录
    //        job.addArchiveToClassPath(archive);
    
            //分发jar包到task节点的classpath下
    //        job.addFileToClassPath(jarfile);
            
            job.waitForCompletion(true);
        }
    }

     PS : 倒排索引,分为两步的MapReduce

    package cn.itcast.bigdata.mr.inverindex;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class InverIndexStepOne {
    
        static class InverIndexStepOneMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
            Text k = new Text();
            IntWritable v = new IntWritable(1);
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String line = value.toString();
    
                String[] words = line.split(" ");
    
                FileSplit inputSplit = (FileSplit) context.getInputSplit();
                String fileName = inputSplit.getPath().getName();
                for (String word : words) {
                    k.set(word + "--" + fileName);
                    context.write(k, v);
    
                }
    
            }
    
        }
    
        static class InverIndexStepOneReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
                int count = 0;
                for (IntWritable value : values) {
    
                    count += value.get();
                }
    
                context.write(key, new IntWritable(count));
    
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf);
            job.setJarByClass(InverIndexStepOne.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:/srcdata/inverindexinput"));
            FileOutputFormat.setOutputPath(job, new Path("D:/temp/out"));
            // FileInputFormat.setInputPaths(job, new Path(args[0]));
            // FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.setMapperClass(InverIndexStepOneMapper.class);
            job.setReducerClass(InverIndexStepOneReducer.class);
    
            job.waitForCompletion(true);
    
        }
    
    }
    Ps:分解完的形式,第一步是这个样式

    package cn.itcast.bigdata.mr.inverindex;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class IndexStepTwo {
        public static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] files = line.split("--");
                context.write(new Text(files[0]), new Text(files[1]));
            }
        }
        public static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text>{
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                StringBuffer sb = new StringBuffer();
                for (Text text : values) {
                    sb.append(text.toString().replace("	", "-->") + "	");
                }
                context.write(key, new Text(sb.toString()));
            }
        }
        public static void main(String[] args) throws Exception {
            
            if (args.length < 1 || args == null) {
                args = new String[]{"D:/temp/out/part-r-00000", "D:/temp/out2"};
            }
            
            Configuration config = new Configuration();
            Job job = Job.getInstance(config);
            
            job.setMapperClass(IndexStepTwoMapper.class);
            job.setReducerClass(IndexStepTwoReducer.class);
    //        job.setMapOutputKeyClass(Text.class);
    //        job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            System.exit(job.waitForCompletion(true) ? 1:0);
        }
    }

    PS: 第二部是这样的形式  ,就得到了想要的结果

    以下是qq的好友列表数据,冒号前是一个用,冒号后是该用户的所有好友(数据中的好友关系是单向的)

    A:B,C,D,F,E,O

    B:A,C,E,K

    C:F,A,D,I

    D:A,E,F,L

    E:B,C,D,M,L

    F:A,B,C,D,E,O,M

    G:A,C,D,E,F

    H:A,C,D,E,O

    I:A,O

    J:B,O

    K:A,C,D

    L:D,E,F

    M:E,F,G

    O:A,H,I,J

    求出哪些人两两之间有共同好友,及他俩的共同好友都有谁?

    解题思路:

    第一步  

    map

    读一行   A:B,C,D,F,E,O

    输出    <B,A><C,A><D,A><F,A><E,A><O,A>

    在读一行   B:A,C,E,K

    输出   <A,B><C,B><E,B><K,B>

    REDUCE

    拿到的数据比如<C,A><C,B><C,E><C,F><C,G>......

    输出:  

    <A-B,C>

    <A-E,C>

    <A-F,C>

    <A-G,C>

    <B-E,C>

    <B-F,C>.....

    第二步

    map

    读入一行<A-B,C>

    直接输出<A-B,C>

    reduce

    读入数据  <A-B,C><A-B,F><A-B,G>.......

    输出: A-B  C,F,G,.....

    package cn.itcast.bigdata.mr.fensi;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SharedFriendsStepOne {
    
        static class SharedFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text> {
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                // A:B,C,D,F,E,O
                String line = value.toString();
                String[] person_friends = line.split(":");
                String person = person_friends[0];
                String friends = person_friends[1];
    
                for (String friend : friends.split(",")) {
    
                    // 输出<好友,人>
                    context.write(new Text(friend), new Text(person));
                }
    
            }
    
        }
    
        static class SharedFriendsStepOneReducer extends Reducer<Text, Text, Text, Text> {
    
            @Override
            protected void reduce(Text friend, Iterable<Text> persons, Context context) throws IOException, InterruptedException {
    
                StringBuffer sb = new StringBuffer();
    
                for (Text person : persons) {
                    sb.append(person).append(",");
    
                }
                context.write(friend, new Text(sb.toString()));
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf);
            job.setJarByClass(SharedFriendsStepOne.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            
            job.setMapperClass(SharedFriendsStepOneMapper.class);
            job.setReducerClass(SharedFriendsStepOneReducer.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:/srcdata/friends"));
            FileOutputFormat.setOutputPath(job, new Path("D:/temp/out"));
    
            job.waitForCompletion(true);
    
        }
    
    }

     

    package cn.itcast.bigdata.mr.fensi;
    
    import java.io.IOException;
    import java.util.Arrays;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SharedFriendsStepTwo {
    
        static class SharedFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
    
            // 拿到的数据是上一个步骤的输出结果
            // A I,K,C,B,G,F,H,O,D,
            // 友 人,人,人
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String line = value.toString();
                String[] friend_persons = line.split("	");
    
                String friend = friend_persons[0];
                String[] persons = friend_persons[1].split(",");
    
                Arrays.sort(persons);
    
                for (int i = 0; i < persons.length - 1; i++) {
                    for (int j = i + 1; j < persons.length; j++) {
                        // 发出 <人-人,好友> ,这样,相同的“人-人”对的所有好友就会到同1个reduce中去
                        context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));
                    }
    
                }
    
            }
    
        }
    
        static class SharedFriendsStepTwoReducer extends Reducer<Text, Text, Text, Text> {
    
            @Override
            protected void reduce(Text person_person, Iterable<Text> friends, Context context) throws IOException, InterruptedException {
    
                StringBuffer sb = new StringBuffer();
    
                for (Text friend : friends) {
                    sb.append(friend).append(" ");
    
                }
                context.write(person_person, new Text(sb.toString()));
            }
    
        }
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf);
            job.setJarByClass(SharedFriendsStepTwo.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            job.setMapperClass(SharedFriendsStepTwoMapper.class);
            job.setReducerClass(SharedFriendsStepTwoReducer.class);
    
            FileInputFormat.setInputPaths(job, new Path("D:/temp/out/part-r-00000"));
            FileOutputFormat.setOutputPath(job, new Path("D:/temp/out2"));
    
            job.waitForCompletion(true);
    
        }
    
    }

  • 相关阅读:
    Java生鲜电商平台-微服务架构利弊分析(生鲜小程序/APP)
    实用---eclipse中的代码提示功能
    实验---数据结构中字符串的匹配问题
    实验---实现两个指数递减多项式的和与积
    实验----Java的二维数组的应用及杨辉三角的编写
    你也想成为白帽子么?推荐一份网络安全书单!
    互联网公司的敏捷开发是怎么回事?这一份软件工程书单送给你!
    曾经我也有一个做游戏的梦想,这几本游戏开发的书籍推荐给为未来的游戏工程师
    如何做好Linux服务器运维,你可能需要这一份运维工程师书单!
    如何做好Linux服务器运维,你可能需要这一份运维工程师书单!
  • 原文地址:https://www.cnblogs.com/bee-home/p/8137010.html
Copyright © 2011-2022 走看看