zoukankan      html  css  js  c++  java
  • 大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客共同粉丝案例+常见错误及解决方案

    第6章 Hadoop企业优化(重中之重)6.1 MapReduce 跑的慢的原因6.2 MapReduce优化方法6.2.1 数据输入6.2.2 Map阶段6.2.3 Reduce阶段6.2.4 I/O传输6.2.5 数据倾斜问题6.2.6 常用的调优参数6.3 HDFS小文件优化方法6.3.1 HDFS小文件弊端6.3.2 HDFS小文件解决方案第7章 MapReduce扩展案例7.1 倒排索引案例(多job串联)7.2 TopN案例7.3 找博客共同粉丝案例第8章 常见错误及解决方案


    第6章 Hadoop企业优化(重中之重)

    6.1 MapReduce 跑的慢的原因

    6.2 MapReduce优化方法

      MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。

    6.2.1 数据输入

    6.2.2 Map阶段

    6.2.3 Reduce阶段

    1、


    2、

    6.2.4 I/O传输

    6.2.5 数据倾斜问题

    1、


    2、

    6.2.6 常用的调优参数

    1、资源相关参数
    (1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)


    (2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)

    (3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)

    2、容错相关参数(MapReduce性能优化)

    6.3 HDFS小文件优化方法

    6.3.1 HDFS小文件弊端

      HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢

    6.3.2 HDFS小文件解决方案

    小文件的优化无非以下几种方式:
      (1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。
      (2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。
      (3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。
    1、


    2、

    第7章 MapReduce扩展案例

    7.1 倒排索引案例(多job串联)

    1、需求
      有大量的文本(文档、网页),需要建立搜索索引,如下图所示。
    (1)数据输入
      


    (2)期望输出数据
    atguigu    c.txt-->2   b.txt-->2   a.txt-->3   
    pingping    c.txt-->1   b.txt-->3   a.txt-->1   
    ss    c.txt-->1   b.txt-->1   a.txt-->2   

    2、需求分析


    3、第一次处理
    (1)第一次处理,编写OneIndexMapper类
    package com.atguigu.mr.index;

    import java.io.IOException;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;

    public class OneIndexMapper extends Mapper<LongWritableTextTextIntWritable{

        String name;
        Text k = new Text();
        IntWritable v = new IntWritable();

        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException 
    {
            // 获取文件名称
            FileSplit split = (FileSplit) context.getInputSplit();
            name = split.getPath().getName();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException 
    {

            // atguigu pingping

            // 1、获取一行数据
            String line = value.toString();

            // 2、切割
            String[] fields = line.split(" ");

            for (String word : fields) {
                // 3、拼接
                k.set(word + "---" + name); // atguigu---a.txt
                v.set(1);
                // 4、写出
                context.write(k, v); // <atguigu---a.txt,1>
            }
        }
    }

    (2)第一次处理,编写OneIndexReducer类

    package com.atguigu.mr.index;

    import java.io.IOException;

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class OneIndexReducer extends Reducer<TextIntWritableTextIntWritable{

        IntWritable v = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context)
     throws IOException, InterruptedException 
    {

            // 1、累加求和
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }

            v.set(sum);

            // 2、写出
            context.write(key, v);
        }
    }

    (3)第一次处理,编写OneIndexDriver类

    package com.atguigu.mr.index;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class OneIndexDriver {

        public static void main(String[] args) throws Exception {

            // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
            args = new String[] { "d:/temp/atguigu/0529/input/inputoneindex""d:/temp/atguigu/0529/output17" };

            Configuration conf = new Configuration();

            Job job = Job.getInstance(conf);
            job.setJarByClass(OneIndexDriver.class);

            job.setMapperClass(OneIndexMapper.class);
            job.setReducerClass(OneIndexReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            job.waitForCompletion(true);
        }
    }

    (4)查看第一次输出结果

    atguigu---a.txt    3
    atguigu---b.txt    2
    atguigu---c.txt    2
    pingping---a.txt    1
    pingping---b.txt    3
    pingping---c.txt    1
    ss---a.txt    2
    ss---b.txt    1
    ss---c.txt    1

    4、第二次处理
    (1)第二次处理,编写TwoIndexMapper类

    package com.atguigu.mr.index;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    public class TwoIndexMapper extends Mapper<LongWritableTextTextText{

        Text k = new Text();
        Text v = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException 
    {
            // 输入为:
            // atguigu--a.txt   3
            // atguigu--b.txt   2
            // atguigu--c.txt   2
            // 输出为:(atguigu,a.txt   3)atguigu   c.txt-->2   b.txt-->2   a.txt-->3

            // 1、获取一行数据
            String line = value.toString();

            // 2、用“--”切割
            String[] fields = line.split("--"); // 结果为:(atguigu,a.txt   3)

            // 3、封装数据
            k.set(fields[0]);
            v.set(fields[1]);

            // 4、写出
            context.write(k, v);
        }
    }

    (2)第二次处理,编写TwoIndexReducer类

    package com.atguigu.mr.index;

    import java.io.IOException;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class TwoIndexReducer extends Reducer<TextTextTextText{

        Text v = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException 
    {
            // 输入为:(atguigu,a.txt   3)(atguigu,b.txt    2)(atguigu,c.txt    2)
            // 输出为:atguigu  c.txt-->2   b.txt-->2   a.txt-->3

            StringBuffer sb = new StringBuffer();

            // 拼接
            for (Text value : values) {
                sb.append(value.toString().replace(" ""-->") + " ");
            }

            // 封装
            v.set(sb.toString());

            // 写出
            context.write(key, v);
        }
    }

    (3)第二次处理,编写TwoIndexDriver类

    package com.atguigu.mr.index;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class TwoIndexDriver {

        public static void main(String[] args) throws Exception {

            // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
            args = new String[] { "d:/temp/atguigu/0529/input/inputtowindex""d:/temp/atguigu/0529/output18" };

            Configuration config = new Configuration();
            Job job = Job.getInstance(config);

            job.setJarByClass(TwoIndexDriver.class);
            job.setMapperClass(TwoIndexMapper.class);
            job.setReducerClass(TwoIndexReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }

    (4)第二次查看最终结果

    atguigu    c.txt-->2   b.txt-->2   a.txt-->3
    pingping    c.txt-->1   b.txt-->3   a.txt-->1
    ss    c.txt-->1   b.txt-->1   a.txt-->2

    7.2 TopN案例

    1、需求
      对需求2.3输出结果进行加工,输出流量使用量在前10的用户信息。
    (1)输入数据 (2)输出数据


    2、需求分析
      同上图。
    3、实现代码
    (1)编写FlowBean类
    package com.atguigu.mr.topn;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.WritableComparable;

    public class FlowBean implements WritableComparable<FlowBean{

        private long upFlow; // 上行流量
        private long downFlow; // 下行流量
        private long sumFlow; // 总流量

        public FlowBean() {
            super();
        }

        public FlowBean(long upFlow, long downFlow) {
            super();
            this.upFlow = upFlow;
            this.downFlow = downFlow;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.upFlow = in.readLong();
            this.downFlow = in.readLong();
            this.sumFlow = in.readLong();
        }

        public long getUpFlow() {
            return upFlow;
        }

        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }

        public long getDownFlow() {
            return downFlow;
        }

        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }

        public long getSumFlow() {
            return sumFlow;
        }

        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }

        @Override
        public String toString() {
            return upFlow + " " + downFlow + " " + sumFlow;
        }

        public void set(long downFlow2, long upFlow2) {
            downFlow = downFlow2;
            upFlow = upFlow2;
            sumFlow = downFlow2 + upFlow2;
        }

        @Override
        public int compareTo(FlowBean bean) {

            int result;

            // 按照总流量大小,倒序排列
            if (this.sumFlow > bean.getSumFlow()) {
                result = -1;
            } else if (this.sumFlow < bean.getSumFlow()) {
                result = 1;
            } else {
                result = 0;
            }

            return result;
        }
    }

    (2)编写TopNMapper类

    package com.atguigu.mr.topn;

    import java.io.IOException;
    import java.util.Iterator;
    import java.util.TreeMap;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    public class TopNMapper extends Mapper<LongWritableTextFlowBeanText{

        // 定义一个TreeMap作为存储数据的容器(天然按key排序,降序)
        private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();
        private FlowBean kBean;

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            kBean = new FlowBean();
            Text v = new Text();

            // 13470253144  180 180 360

            // 1、获取一行
            String line = value.toString();

            // 2、切割
            String[] fields = line.split(" ");

            // 3、封装数据
            String phoneNum = fields[0];
            long upFlow = Long.parseLong(fields[1]);
            long downFlow = Long.parseLong(fields[2]);
            long sumFlow = Long.parseLong(fields[3]);

            kBean.setUpFlow(upFlow);
            kBean.setDownFlow(downFlow);
            kBean.setSumFlow(sumFlow);

            v.set(phoneNum);

            // 4、向TreeMap中添加数据
            flowMap.put(kBean, v);

            // 5、限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据
            if (flowMap.size() > 10) {
                // flowMap.remove(flowMap.firstKey()); // 升序删除第一个
                flowMap.remove(flowMap.lastKey()); // 降序删除最后一个
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

            // 6、遍历TreeMap集合,输出数据
            Iterator<FlowBean> bean = flowMap.keySet().iterator();

            while (bean.hasNext()) {
                FlowBean k = bean.next();
                context.write(k, flowMap.get(k));
            }
        }
    }

    (3)编写TopNReducer类

    package com.atguigu.mr.topn;

    import java.io.IOException;
    import java.util.Iterator;
    import java.util.TreeMap;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class TopNReducer extends Reducer<FlowBeanTextTextFlowBean{

        // 定义一个TreeMap作为存储数据的容器(天然按key排序)
        TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

        @Override
        protected void reduce(FlowBean key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException 
    {

            for (Text value : values) {
                FlowBean bean = new FlowBean();
                bean.set(key.getDownFlow(), key.getUpFlow());

                // 1、向treeMap集合中添加数据
                flowMap.put(bean, new Text(value));

                // 2、限制TreeMap数据量,超过10条就删除掉流量最小的一条数据
                if (flowMap.size() > 10) {
                    // flowMap.remove(flowMap.firstKey()); // 升序删除第一个
                    flowMap.remove(flowMap.lastKey()); // 降序删除最后一个
                }
            }
        }

        @Override
        protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context)
                throws IOException, InterruptedException 
    {

            // 3、遍历集合,输出数据
            Iterator<FlowBean> bean = flowMap.keySet().iterator();

            while (bean.hasNext()) {
                FlowBean v = bean.next();
                context.write(new Text(flowMap.get(v)), v);
            }
        }
    }

    (4)编写TopNDriver类

    package com.atguigu.mr.topn;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class TopNDriver {

        public static void main(String[] args) throws Exception {

            args = new String[] { "d:/temp/atguigu/0529/input/inputtopn""d:/temp/atguigu/0529/output20" };

            // 1、获取配置信息,或者job对象实例
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);

            // 6、指定本程序的jar包所在的本地路径
            job.setJarByClass(TopNDriver.class);

            // 2、指定本业务job要使用的mapper/reducer业务类
            job.setMapperClass(TopNMapper.class);
            job.setReducerClass(TopNReducer.class);

            // 3、指定mapper输出数据的kv类型
            job.setMapOutputKeyClass(FlowBean.class);
            job.setMapOutputValueClass(Text.class);

            // 4、指定最终输出的数据的kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);

            // 5、指定job的输入原始文件所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 7、将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }

    7.3 找博客共同粉丝案例

    1、需求
      以下是博客的粉丝列表数据,冒号前是一个用户,冒号后是该用户的所有粉丝(数据中的粉丝关系是单向的)
      求出哪些人两两之间有共同粉丝,及他俩的共同粉丝都有谁?
    (1)数据输入

    A:B,C,D,F,E,O
    B:A,C,E,K
    C:F,A,D,I
    D:A,E,F,L
    E:B,C,D,M,L
    F:A,B,C,D,E,O,M
    G:A,C,D,E,F
    H:A,C,D,E,O
    I:A,O
    J:B,O
    K:A,C,D
    L:D,E,F
    M:E,F,G
    O:A,H,I,J

    2、需求分析
    先求出A、B、C、……等是谁的粉丝
    第一次输出结果

    A    I,K,C,B,G,F,H,O,D,
    B    A,F,J,E,
    C    A,E,B,H,F,G,K,
    D    G,C,K,A,L,F,E,H,
    E    G,M,L,H,A,F,B,D,
    F    L,M,D,C,G,A,
    G    M,
    H    O,
    I    O,C,
    J    O,
    K    B,
    L    D,E,
    M    E,F,
    O    A,H,I,J,F,

    第二次输出结果

    A-B    E C 
    A-C    D F 
    A-D    E F 
    A-E    D B C 
    A-F    O B C D E 
    A-G    F E C D 
    A-H    E C D O 
    A-I    O 
    A-J    O B 
    A-K    D C 
    A-L    F E D 
    A-M    E F 
    B-C    A 
    B-D    A E 
    B-E    C 
    B-F    E A C 
    B-G    C E A 
    B-H    A E C 
    B-I    A 
    B-K    C A 
    B-L    E 
    B-M    E 
    B-O    A 
    C-D    A F 
    C-E    D 
    C-F    D A 
    C-G    D F A 
    C-H    D A 
    C-I    A 
    C-K    A D 
    C-L    D F 
    C-M    F 
    C-O    I A 
    D-E    L 
    D-F    A E 
    D-G    E A F 
    D-H    A E 
    D-I    A 
    D-K    A 
    D-L    E F 
    D-M    F E 
    D-O    A 
    E-F    D M C B 
    E-G    C D 
    E-H    C D 
    E-J    B 
    E-K    C D 
    E-L    D 
    F-G    D C A E 
    F-H    A D O E C 
    F-I    O A 
    F-J    B O 
    F-K    D C A 
    F-L    E D 
    F-M    E 
    F-O    A 
    G-H    D C E A 
    G-I    A 
    G-K    D A C 
    G-L    D F E 
    G-M    E F 
    G-O    A 
    H-I    O A 
    H-J    O 
    H-K    A C D 
    H-L    D E 
    H-M    E 
    H-O    A 
    I-J    O 
    I-K    A 
    I-O    A 
    K-L    D 
    K-O    A 
    L-M    E F

    3、代码实现
    (1)第一次Mapper类

    package com.atguigu.mr.friends;

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    public class OneShareFriendsMapper extends Mapper<LongWritableTextTextText>{

        Text k = new Text();
        Text v = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException 
    {
            // A:B,C,D,F,E,O

            // 1、获取一行
            String line = value.toString();

            // 2、切割
            String[] fields = line.split(":");

            // 3、获取用户和用户的粉丝
            String user = fields[0]; // person = A
            String[] friends = fields[1].split(","); // firends = [B, C, D, F, E, O]

            // 封装
            v.set(user);

            // 4、写出去
            for (String friend : friends) {
                k.set(friend);
                context.write(k, v); // <粉丝,用户>  <B,A><C,A><D,A>
            }
        }
    }

    (2)第一次Reducer类

    package com.atguigu.mr.friends;

    import java.io.IOException;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class OneShareFriendsReducer extends Reducer<TextTextTextText{

        Text v = new Text();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException 
    {

            StringBuffer sb = new StringBuffer();

            // <B,A><C,A><D,A>
            // 1、拼接
            for (Text user : values) {
                sb.append(user).append(","); // 
            }

            v.set(sb.toString());

            // 2、写出
            context.write(key, v); // A I,K,C,B,G,F,H,O,D,
        }
    }

    (3)第一次Driver类

    package com.atguigu.mr.friends;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class OneShareFriendsDriver {
        public static void main(String[] args) throws Exception {

            // 0、根据自己电脑路径重新配置
            args = new String[] { "d:/temp/atguigu/0529/input/inputfriend""d:/temp/atguigu/0529/output21" };

            // 1、获取job对象
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);

            // 2、指定jar包运行的路径
            job.setJarByClass(OneShareFriendsDriver.class);

            // 3、指定map/reduce使用的类
            job.setMapperClass(OneShareFriendsMapper.class);
            job.setReducerClass(OneShareFriendsReducer.class);

            // 4、指定map输出的数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            // 5、指定最终输出的数据类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            // 6、指定job的输入原始所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 7、提交
            boolean result = job.waitForCompletion(true);

            System.exit(result ? 0 : 1);
        }
    }

    (4)第二次Mapper类

    package com.atguigu.mr.friends;

    import java.io.IOException;
    import java.util.Arrays;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    public class TwoShareFriendsMapper extends Mapper<LongWritableTextTextText{



        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException 
    {
            // A   I,K,C,B,G,F,H,O,D,
            // 粉丝    用户,用户,用户

            // 1、获取一行
            String line = value.toString();

            // 2、切割
            String[] friend_users = line.split(" ");

            // A
            String friend = friend_users[0];
            // I,K,C,B,G,F,H,O,D,
            String[] users = friend_users[1].split(",");

            Arrays.sort(users); // B,C,D,F,G,H,I,K,O

            for (int i = 0; i < users.length - 1; i++) {
                for (int j = i + 1; j < users.length; j++) {
                    context.write(new Text(users[i] + "-" + users[j]), new Text(friend));
                }
            }
        }
    }

    (5)第二次Reducer类

    package com.atguigu.mr.friends;

    import java.io.IOException;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    public class TwoShareFriendsReducer extends Reducer<TextTextTextText{

        @Override
        protected void reduce(Text key, Iterable<Text> values,Context context)
                throws IOException, InterruptedException 
    {

            StringBuffer sb = new StringBuffer();

            for (Text friend : values) {
                sb.append(friend).append(" ");
            }

            context.write(key, new Text(sb.toString()));
        }
    }

    (6)第二次Driver类

    package com.atguigu.mr.friends;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class TwoShareFriendsDriver {
        public static void main(String[] args) throws Exception {

            // 0、根据自己电脑路径重新配置
            args = new String[] { "d:/temp/atguigu/0529/input/inputfriends""d:/temp/atguigu/0529/output22" };

            // 1、获取job对象
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);

            // 2、指定jar包运行的路径
            job.setJarByClass(TwoShareFriendsDriver.class);

            // 3、指定map/reduce使用的类
            job.setMapperClass(TwoShareFriendsMapper.class);
            job.setReducerClass(TwoShareFriendsReducer.class);

            // 4、指定map输出的数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //
            // 5、指定最终输出的数据类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            // 6、指定job的输入原始所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));

            // 7、提交
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    }

    第8章 常见错误及解决方案

    1)导包容易出错。尤其Text和CombineTextInputFormat。
    2)Mapper中第一个输入的参数必须是LongWritable或者NullWritable,不可以是IntWritable,报的错误是类型转换异常。
    3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656(4),说明Partition和ReduceTask个数没对上,调整ReduceTask个数。
    4)如果分区数不是1,但是reducetask为1,是否执行分区过程。
      答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。
    5)在Windows环境编译的jar包导入到Linux环境中运行:

    hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/ /user/atguigu/output

    报如下错误:

    Exception in thread "main" java.lang.UnsupportedClassVersionError: com/atguigu/mapreduce/wordcount/WordCountDriver : Unsupported major.minor version 52.0

      原因是Windows环境用的jdk1.7,Linux环境用的jdk1.8。
      解决方案:统一jdk版本。
    6)缓存pd.txt小文件案例中,报找不到pd.txt文件
      原因:大部分为路径书写错误。还有就是要检查pd.txt.txt的问题。还有个别电脑写相对路径找不到pd.txt,可以修改为绝对路径。
    7)报类型转换异常。
      通常都是在驱动函数中设置Map输出和最终输出时编写错误。
      Map输出的key如果没有排序,也会报类型转换异常。
    8)集群中运行wc.jar时出现了无法获得输入文件。
      原因:WordCount案例的输入文件不能放在 HDFS 集群的根目录。
    9)出现了如下相关异常

    Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
        at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
        at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
    java.io.IOException: Could not locate executable nullinwinutils.exe in the Hadoop binaries.
        at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:356)
        at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:371)
        at org.apache.hadoop.util.Shell.<clinit>(Shell.java:364)

    解决方案一:拷贝hadoop.dll文件(文件位置:D:workHadoophadoop-2.7.2in)到Windows目录C:WindowsSystem32。个别同学电脑还需要修改Hadoop源码。
    解决方案二:创建如下包名,并将NativeIO.java拷贝到该包名下


    10)自定义Outputformat时,注意在RecordWirter中的close()方法必须关闭流资源。否则输出的文件内容中数据为空。
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if (atguigufos != null) {
                atguigufos.close();
            }
            if (otherfos != null) {
                otherfos.close();
            }
        }
  • 相关阅读:
    (二)ElasticSearch-Rest命令操作
    (一)ElasticSearch介绍
    (四)SpringCloudAlibaba处理分布式事务-Seata
    网络设备及组网知识
    GITLAB使用
    Ionic APP开发
    开发工具快捷键大全
    web socket接口测试 Jmeter压力测试工具
    VMware Workstation pro无法在Windows上运行,检查可在Windows上运行的此应用的更新版本
    克隆管理员账号
  • 原文地址:https://www.cnblogs.com/chenmingjun/p/10409820.html
Copyright © 2011-2022 走看看