zoukankan      html  css  js  c++  java
  • 5大视频网站数据分析mapreduce


    一、需求
     自定义输入格式 完成统计任务 输出多个文件

    输入数据:5个网站的 每天电视剧的 播放量 收藏数 评论数 踩数 赞数

    输出数据:按网站类别 统计每个电视剧的每个指标的总量

    任务目标:自定义输入格式 完成统计任务 输出多个文件

    二、数据

    部分数据

    这里写图片描述

    三、思路

    第一步:定义一个电视剧热度数据的bean。

    第二步:定义一个读取热度数据的InputFormat类。

    第三步:写MapReduce统计程序

    第四步:上传tvplay.txt数据集到HDFS,并运行程序

    四、代码

    1.利用WritableComparable接口,自定义一个TVWritable类,实现WritableComparable类,将各个参数封装起来,便于计算。
    package com.pc.hadoop.pc.tv;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.WritableComparable;

    public class TVWritable implements WritableComparable

    {
        //定义5个成员变量
        private int view;
        private int collection;
        private int comment;
        private int diss;
        private int up;

        //构造函数
        public TVWritable(){}


        //定义一个set方法,用this关键字对封装好的数据进行引用
        public void set(int view,int collection,int comment, int diss,int up)
        {

            this.view = view;
            this.collection = collection;
            this.comment = comment;
            this.diss = diss;
            this.up = up;
        }

        //使用get和set对封装好的数据进行存取
        public int getView()
        {
            return view;
        }
        public void setView(int view)
        {
            this.view = view;
        }


        public int getCollection()
        {
            return collection;
        }
        public void setCollection(int collection)
        {
            this.collection = collection;
        }


        public int getComment()
        {
            return comment;
        }
        public void setComment(int comment)
        {
            this.comment = comment;
        }


        public int getDiss()
        {
            return diss;
        }
        public void setDiss(int diss)
        {
            this.diss = diss;
        }


        public int getUp()
        {
            return up;
        }
        public void setUp(int up)
        {
            this.up = up;
        }

        //实现WritableComparaqble的redafields()方法,以便该数据能被序列化后完成网络传输或文件输入。
        @Override
        public void readFields(DataInput in) throws IOException
        {
            // TODO Auto-generated method stub

            view = in.readInt();
            collection = in.readInt();
            comment = in.readInt();
            diss = in.readInt();
            up = in.readInt();

        }

        //实现WritableComparaqble的write()方法,以便该数据能被反序列化后完成网络传输或文件输入。
        @Override
        public void write(DataOutput out) throws IOException
        {
            // TODO Auto-generated method stub
            out.writeInt(view);
            out.writeInt(collection);
            out.writeInt(comment);
            out.writeInt(diss);
            out.writeInt(up);
        }

        //使用compareTo对其中的数据进行比较
        @Override
        public int compareTo(Object o)
        {
            // TODO Auto-generated method stub
            return 0;
        }

    }
    2.自定义一个TVInputFormat类取继承FileInputFormat文件输入格式这个父类,然后对createRecordReader()方法进行重写,其实质则是重写TVRecordReader()这个方法,
    得到其返回值,利用TVRecordReader()这个方法去继承RecordReader()这个方法。
    package com.pc.hadoop.pc.tv;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.util.LineReader;

    public class TVInputFormat extends FileInputFormat<Text,TVWritable>
    {
        protected boolean isSplitable()
        {
            return false;
        }

        @Override
        public RecordReader<Text, TVWritable> createRecordReader(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException
        {
            // TODO Auto-generated method stub
            return new TVRecordReader();
        }

        public static class TVRecordReader extends RecordReader<Text,TVWritable>
        {
            public LineReader in; //自定义行读取器
            public Text lineKey; //声明key类型
            public TVWritable lineValue; //自定义value
            public Text line; //每行数据类型

            //
            @Override
            public void close() throws IOException
            {
                // TODO Auto-generated method stub
                if(in != null)
                {
                    in.close();
                }

            }

            //获取当前key
            @Override
            public Text getCurrentKey() throws IOException, InterruptedException
            {
                // TODO Auto-generated method stub
                return lineKey;
            }

            //获取当前value
            @Override
            public TVWritable getCurrentValue() throws IOException, InterruptedException

            {
                // TODO Auto-generated method stub
                return lineValue;
            }

            //获取当前进程
            @Override
            public float getProgress() throws IOException, InterruptedException

            {
                // TODO Auto-generated method stub
                return 0;
            }

            //初始化
            @Override
            public void initialize(InputSplit inputsplit, TaskAttemptContext context) throws IOException, InterruptedException

            {
                // TODO Auto-generated method stub

                FileSplit split = (FileSplit) inputsplit;//获取分片内容
                Configuration job = context.getConfiguration();//读取配置信息
                Path file = split.getPath();//获取路径
                FileSystem fs = file.getFileSystem(job);//获取文件系统

                FSDataInputStream filein = fs.open(file);//通过文件系统打开文件,对文件进行读取
                in = new LineReader(filein,job);
                lineKey = new Text();//新建一个Text实例作为自定义输入格式的key
                lineValue = new TVWritable();
                line = new Text();



            }

            @Override
            public boolean nextKeyValue() throws IOException, InterruptedException

            {
                // TODO Auto-generated method stub
                int lineSize = in.readLine(line);
                if(lineSize == 0)
                return false;
                //读取每行数据解数组i
                String[] i = line.toString().split(" ");
                if(i.length != 7)

                {
                    throw new IOException("Invalid record received");
                }
                //自定义key和value的值
                lineKey.set(i[0]+" "+i[1]);//电视剧名称和所属视频网站
                lineValue.set(Integer.parseInt(i[2].trim()),
                Integer.parseInt(i[3].trim()),
                Integer.parseInt(i[4].trim()),
                Integer.parseInt(i[5].trim()),
                Integer.parseInt(i[6].trim() ));
                return true;

            }
        }
    }

    3.使用MapperReducer对输入的数据进行进行相应的处理输出想要得到的结果。
     在reduce在定义一个多输出的对象MultipleOutputs
    /**
         * @input Params Text TvPlayData
         * @output Params Text TvPlayData
         * @author yangjun
         * @function 直接输出
         */
        public static class TVPlayMapper extends
                Mapper<Text, TVWritable, Text, TVWritable> {
            @Override
            protected void map(Text key, TVWritable value, Context context)
                    throws IOException, InterruptedException {
                context.write(key, value);
            }
        }
        /**
         * @input Params Text TvPlayData
         * @output Params Text Text
         * @author yangjun
         * @fuction 统计每部电视剧的 点播数 收藏数等  按source输出到不同文件夹下
         */
        public static class TVPlayReducer extends
                Reducer<Text, TVWritable, Text, Text> {
            private Text m_key = new Text();
            private Text m_value = new Text();
            private MultipleOutputs<Text, Text> mos;

            protected void setup(Context context) throws IOException,
                    InterruptedException {
                mos = new MultipleOutputs<Text, Text>(context);
            }//将 MultipleOutputs 的初始化放在 setup() 中,因为在 setup() 只会被调用一次
    //定义reduce() 方法里的 multipleOutputs.write(…)。你需要把以前的 context.write(…) 替换成现在的这个
            protected void reduce(Text Key, Iterable<TVWritable> Values,
                    Context context) throws IOException, InterruptedException {
                 int view = 0;
                 int collection = 0;
                 int comment = 0;
                 int diss = 0;
                 int up = 0;
                for (TVWritable a:Values) {
                     view += a.getView();
                     collection += a.getCollection();
                     comment +=a.getComment();
                     diss += a.getDiss();
                     up += a.getUp();
                }
                //tvname  source
                String[] records = Key.toString().split(" ");
                // 1优酷2搜狐3土豆4爱奇艺5迅雷看看
                String source = records[1];// 媒体类别
                m_key.set(records[0]);
                m_value.set(view+" "+collection+" "+comment+" "+diss+" "+up);
                if (source.equals("1")) {
                    mos.write("youku", m_key, m_value);
                } else if (source.equals("2")) {
                    mos.write("souhu", m_key, m_value);
                } else if (source.equals("3")) {
                    mos.write("tudou", m_key, m_value);
                } else if (source.equals("4")) {
                    mos.write("aiqiyi", m_key, m_value);
                } else if (source.equals("5")) {
                    mos.write("xunlei", m_key, m_value);
                }
            }

            protected void cleanup(Context context) throws IOException,
                    InterruptedException {
                mos.close();   //关闭 MultipleOutputs,也就是关闭 RecordWriter,并且是一堆 RecordWriter,因为这里会有很多 reduce 被调用。
            }
        }

    4 运行run函数对作业进行运行,并自定义输出MultipleOutputs函数调用addNameoutput方法对其进行设置多路径的输出。
    @Override
        public int run(String[] args) throws Exception {
            Configuration conf = new Configuration();// 配置文件对象
            Path mypath = new Path(args[1]);
            FileSystem hdfs = mypath.getFileSystem(conf);// 创建输出路径
            if (hdfs.isDirectory(mypath)) {
                hdfs.delete(mypath, true);
            }

            Job job = new Job(conf, "tvplay");// 构造任务
            job.setJarByClass(TVplay.class);// 设置主类

            job.setMapperClass(TVPlayMapper.class);// 设置Mapper
            job.setMapOutputKeyClass(Text.class);// key输出类型
            job.setMapOutputValueClass(TVWritable.class);// value输出类型
            job.setInputFormatClass(TVInputFormat.class);//自定义输入格式

            job.setReducerClass(TVPlayReducer.class);// 设置Reducer
            job.setOutputKeyClass(Text.class);// reduce key类型
            job.setOutputValueClass(Text.class);// reduce value类型
            // 自定义文件输出格式,通过路径名(pathname)来指定输出路径
            MultipleOutputs.addNamedOutput(job, "youku", TextOutputFormat.class,
                    Text.class, Text.class);
            MultipleOutputs.addNamedOutput(job, "souhu", TextOutputFormat.class,
                    Text.class, Text.class);
            MultipleOutputs.addNamedOutput(job, "tudou", TextOutputFormat.class,
                    Text.class, Text.class);
            MultipleOutputs.addNamedOutput(job, "aiqiyi", TextOutputFormat.class,
                    Text.class, Text.class);
            MultipleOutputs.addNamedOutput(job, "xunlei", TextOutputFormat.class,
                    Text.class, Text.class);

            FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
            FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
            job.waitForCompletion(true);
            return 0;
        }
        public static void main(String[] args) throws Exception {
            String[] args0 = { "hdfs://pc1:9000/home/hadoop/tvplay/tvplay.txt",
                    "hdfs://pc1:9000/home/hadoop/tvplay/out/" };
            int ec = ToolRunner.run(new Configuration(), new TVplay(), args0);
            //public static int run(Configuration conf,Tool tool, String[] args),可以在job运行的时候指定配置文件或其他参数
            //这个方法调用tool的run(String[])方法,并使用conf中的参数,以及args中的参数,而args一般来源于命令行。
            System.exit(ec);
        }

  • 相关阅读:
    随机色块
    JQ命令汇总
    JQ选择器
    cookie
    tab切换
    Ajax跨域
    RocksDB介绍:一个比LevelDB更彪悍的引擎
    谷歌的诀窍:如何取消验证码
    Ruby on Rails创始人DHH谈如何进行混合移动APP开发
    SequoiaDB 架构指南
  • 原文地址:https://www.cnblogs.com/fengyouheng/p/10266812.html
Copyright © 2011-2022 走看看