zoukankan      html  css  js  c++  java
  • MapReduce经典入门小案例

    /**
     * 单词统计
     * @author fengmingyue
     *
     */
    public class WordCount {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(WordCount.class);
            job.setMapperClass(WCMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input/words.txt"));
            job.setReducerClass(WCReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
            /**
             * Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。
             * 所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。
             * 比如累加,最大值等。
             */
            job.setCombinerClass(WCReducer.class);
            job.waitForCompletion(true);
        }
    }
    class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
        protected void reduce(Text key, Iterable<LongWritable> values, Context context)
                throws IOException, InterruptedException {
            long counter = 0;
            for(LongWritable l : values){
                counter += l.get();
            }
            context.write(key, new LongWritable(counter));
        }
    }
    class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = line.split(" ");
            for(String w : words){
                context.write(new Text(w), new LongWritable(1));
            }
        }
    }
    /**
     * 输入:
     *      hello tom
            hello tom2
            hello tom3
            hello tom4
            hello tom5
       输出:
            hello   5
            tom     1
            tom2    1
            tom3    1
            tom4    1
            tom5    1
     */
    /**
     * 流量统计(输出在多个文件中)
     * @author fengmingyue
     *
     */
    public class DataCount {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(DataCount.class);
            job.setMapperClass(DCMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DataBean.class);
            FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input/flowData.txt"));
            
            job.setReducerClass(DCReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DataBean.class);
            FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
            
            job.setPartitionerClass(ServiceProviderPartitioner.class);
            /**
             * 设置reducer数量,(有几个reducer就有几个结果文件,如果partitioner数量小于reducer数量,则多出的文件里无内容,
             * 如果partitioner数量大于reducer数量,则程序运行出错)
             */
            job.setNumReduceTasks(4);
            
            job.waitForCompletion(true);
        }
        public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] fields = line.split("	");
                String tel = fields[1];
                long up = Long.parseLong(fields[8]);
                long down = Long.parseLong(fields[9]);    
                DataBean bean = new DataBean(tel, up, down);
                context.write(new Text(tel), bean);
            }
        }
        public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{
            protected void reduce(Text key, Iterable<DataBean> values, Context context)
                    throws IOException, InterruptedException {
                long up_sum = 0;
                long down_sum = 0;
                for(DataBean bean : values){
                    up_sum += bean.getUpPayLoad();
                    down_sum += bean.getDownPayLoad();
                }
                DataBean bean = new DataBean("", up_sum, down_sum);
                context.write(key, bean);
            }
        }
        public static class ServiceProviderPartitioner extends Partitioner<Text, DataBean>{
            private static Map<String, Integer> providerMap = new HashMap<String, Integer>();
            static {
                providerMap.put("139", 1);
                providerMap.put("138", 2);
                providerMap.put("159", 3);
            }
            public int getPartition(Text key, DataBean value, int number) {
                String telNo = key.toString();
                String pcode = telNo.substring(0, 3);
                Integer p = providerMap.get(pcode);
                if(p == null){
                    p = 0;
                }
                return p;
            }
        }
    }
    class DataBean implements Writable{
        private String tel;
        private long upPayLoad;
        private long downPayLoad;
        private long totalPayLoad;
        public DataBean(){}
        public DataBean(String tel, long upPayLoad, long downPayLoad) {
            super();
            this.tel = tel;
            this.upPayLoad = upPayLoad;
            this.downPayLoad = downPayLoad;
            this.totalPayLoad = upPayLoad + downPayLoad;
        }
        public String toString() {
            return this.upPayLoad + "	" + this.downPayLoad + "	" + this.totalPayLoad;
        }
        // notice : 1 类型 2 顺序
        public void write(DataOutput out) throws IOException {
            out.writeUTF(tel);
            out.writeLong(upPayLoad);
            out.writeLong(downPayLoad);
            out.writeLong(totalPayLoad);
        }
        public void readFields(DataInput in) throws IOException {
            this.tel = in.readUTF();
            this.upPayLoad = in.readLong();
            this.downPayLoad = in.readLong();
            this.totalPayLoad = in.readLong();
            
        }
        public String getTel() {
            return tel;
        }
        public void setTel(String tel) {
            this.tel = tel;
        }
        public long getUpPayLoad() {
            return upPayLoad;
        }
        public void setUpPayLoad(long upPayLoad) {
            this.upPayLoad = upPayLoad;
        }
        public long getDownPayLoad() {
            return downPayLoad;
        }
        public void setDownPayLoad(long downPayLoad) {
            this.downPayLoad = downPayLoad;
        }
        public long getTotalPayLoad() {
            return totalPayLoad;
        }
        public void setTotalPayLoad(long totalPayLoad) {
            this.totalPayLoad = totalPayLoad;
        }
    }
    /**
     * 输入:
     *  1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
        1363157995052     13826544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4            4    0    264    0    200
        1363157991076     13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
        1363154400022     13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
        1363157993044     18211575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    视频网站    15    12    1527    2106    200
        1363157995074     84138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
        1363157993055     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
        1363157995033     15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    3156    2936    200
        1363157983019    13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
        1363157984041     13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站点统计    24    9    6960    690    200
        1363157973098     15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    3659    3538    200
        1363157986029     15989002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站点统计    3    3    1938    180    200
        1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
        1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
        1363157984040     13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    综合门户    15    12    1938    2910    200
        1363157995093     13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
        1363157982040     13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    综合门户    57    102    7335    110349    200
        1363157986072     18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    2412    200
        1363157990043     13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    48243    200
        1363157988072     13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
        1363157985066     13726238888    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
        1363157993055     13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
       输出:
       part-r-00000:
            13480253104    180    180    360
            13502468823    7335    110349    117684
            13560436666    1116    954    2070
            13560439658    2034    5892    7926
            13602846565    1938    2910    4848
            13660577991    6960    690    7650
            13719199419    240    0    240
            13726230503    2481    24681    27162
            13726238888    2481    24681    27162
            13760778710    120    120    240
            15013685858    3659    3538    7197
            18211575961    1527    2106    3633
            18320173382    9531    2412    11943
            84138413    4116    1432    5548
       part-r-00001:
            13922314466    3008    3720    6728
            13925057413    11058    48243    59301
            13926251106    240    0    240
            13926435656    132    1512    1644
       part-r-00002:
            13826544101    264    0    264
       part-r-00003:
            15920133257    3156    2936    6092
            15989002119    1938    180    2118
     */
    /**
     * 先求和,再排序
     * @author fengmingyue
     *
     */
    public class SumStep {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(SumStep.class);
            job.setMapperClass(SumMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(InfoBean.class);
            FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input/trade_info.txt"));
            job.setReducerClass(SumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(InfoBean.class);
            FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
            job.waitForCompletion(true);
        }
        public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
            private InfoBean bean = new InfoBean();
            private Text k = new Text();
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] fields = line.split("	");
                String account = fields[0];
                double income = Double.parseDouble(fields[1]);
                double expenses = Double.parseDouble(fields[2]);
                k.set(account);
                bean.set(account, income, expenses);
                context.write(k, bean);
            }
        }
        public static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{
            private InfoBean bean = new InfoBean();
            protected void reduce(Text key, Iterable<InfoBean> v2s, Context context)
                    throws IOException, InterruptedException {
                double in_sum = 0;
                double out_sum = 0;
                for(InfoBean bean : v2s){
                    in_sum += bean.getIncome();
                    out_sum += bean.getExpenses();
                }
                bean.set("", in_sum, out_sum);
                context.write(key, bean);
            }
        }
    }
    class InfoBean implements WritableComparable<InfoBean>{
        private String account;
        private double income;
        private double expenses;
        private double surplus;
        public void set(String account, double income, double expenses){
            this.account = account;
            this.income = income;
            this.expenses = expenses;
            this.surplus = income - expenses;
        }
        public String toString() {
            return this.income + "	" + this.expenses + "	" + this.surplus;
        }
        //serialize
        public void write(DataOutput out) throws IOException {
            out.writeUTF(account);
            out.writeDouble(income);
            out.writeDouble(expenses);
            out.writeDouble(surplus);
        }
        public void readFields(DataInput in) throws IOException {
            this.account = in.readUTF();
            this.income = in.readDouble();
            this.expenses = in.readDouble();
            this.surplus = in.readDouble();
        }
        public int compareTo(InfoBean o) {
            if(this.income == o.getIncome()){
                return this.expenses > o.getExpenses() ? 1 : -1; 
            } else {
                return this.income > o.getIncome() ? -1 : 1;
            }
        }
        public String getAccount() {
            return account;
        }
        public void setAccount(String account) {
            this.account = account;
        }
        public double getIncome() {
            return income;
        }
        public void setIncome(double income) {
            this.income = income;
        }
        public double getExpenses() {
            return expenses;
        }
        public void setExpenses(double expenses) {
            this.expenses = expenses;
        }
        public double getSurplus() {
            return surplus;
        }
        public void setSurplus(double surplus) {
            this.surplus = surplus;
        }
    }
    /**
     * 输入:
     *      zhangsan@163.com    6000    0    2014-02-20
            lisi@163.com    2000    0    2014-02-20
            lisi@163.com    0    100    2014-02-20
            zhangsan@163.com    3000    0    2014-02-20
            wangwu@126.com    9000    0    2014-02-20
            wangwu@126.com    0    200        2014-02-20
       输出:
            lisi@163.com    2000.0    100.0    1900.0
            wangwu@126.com    9000.0    200.0    8800.0
            zhangsan@163.com    9000.0    0.0    9000.0
     */
    public class SortStep {
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(SortStep.class);
            job.setMapperClass(SortMapper.class);
            job.setMapOutputKeyClass(InfoBean.class);
            job.setMapOutputValueClass(NullWritable.class);
            FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input/trade_info2.txt"));
            job.setReducerClass(SortReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(InfoBean.class);
            FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
            job.waitForCompletion(true);
        }
        /**
         * 在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。
         * 分组时也是按照k2进行比较的。
         */
        public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{
            private InfoBean bean = new InfoBean();
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] fields = line.split("	");
                String account = fields[0];
                double income = Double.parseDouble(fields[1]);
                double expenses = Double.parseDouble(fields[2]);
                bean.set(account, income, expenses);
                context.write(bean, NullWritable.get());
            }
        }
        public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
            private Text k = new Text();
            protected void reduce(InfoBean bean, Iterable<NullWritable> v2s, Context context)
                    throws IOException, InterruptedException {
                String account = bean.getAccount();
                k.set(account);
                context.write(k, bean);
            }
        }
    }
    /**
     * 
       输入:
            lisi@163.com    2000.0    100.0    1900.0
            wangwu@126.com    9000.0    200.0    8800.0
            zhangsan@163.com    9000.0    0.0    9000.0
       输出:
            zhangsan@163.com    9000.0    0.0    9000.0
            wangwu@126.com    9000.0    200.0    8800.0
            lisi@163.com    2000.0    100.0    1900.0
     */
    /**
     * 倒排索引(某个关键字在某篇文章中出现多少次)
     * @author fengmingyue
     *
     */
    public class InverseIndex {
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(InverseIndex.class);
            job.setMapperClass(IndexMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //只写目录new Path("hdfs://localhost:9000/input2/")或new Path("hdfs://localhost:9000/input2")也可
            FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input2/a.txt"),new Path("hdfs://localhost:9000/input2/b.txt"));
            job.setReducerClass(IndexReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
            job.setCombinerClass(IndexCombiner.class);
            job.waitForCompletion(true);
        }
        public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{
            private Text k = new Text();
            private Text v = new Text();
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                String line = value.toString();
                String[] fields = line.split(" ");
                FileSplit inputSplit = (FileSplit) context.getInputSplit();
                Path path = inputSplit.getPath();
                String name = path.getName();
                for(String f : fields){
                    k.set(f + "->" + name);
                    v.set("1");
                    context.write(k, v);
                }
            }
        }
        public static class IndexCombiner extends Reducer<Text, Text, Text, Text>{
            private Text k = new Text();
            private Text v = new Text();
            protected void reduce(Text key, Iterable<Text> values,
                    Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                String[] fields = key.toString().split("->");
                long sum = 0;
                for(Text t : values){
                    sum += Long.parseLong(t.toString());
                }
                k.set(fields[0]);
                v.set(fields[1] + "->" + sum);
                context.write(k, v);
            }
        }
        public static class IndexReducer extends Reducer<Text, Text, Text, Text>{
            private Text v = new Text();
            protected void reduce(Text key, Iterable<Text> values,
                    Reducer<Text, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                String value = "";
                for(Text t : values){
                    value += t.toString() + " ";
                }
                v.set(value);
                context.write(key, v);
            }
        }
    }
    /**
     * 输入:
     *     a.txt:  hello tom
                   hello jerry
                   hello kitty
                   hello world
                   hello tom
          b.txt:   hello jerry
                   hello tom
                   hello world
       输出:
               hello    b.txt->3 a.txt->5 
               jerry    a.txt->1 b.txt->1 
               kitty    a.txt->1 
               tom      a.txt->2 b.txt->1 
               world    b.txt->1 a.txt->1 
     */ 
  • 相关阅读:
    20201029-1 每周例行报告
    20201022-1 每周例行报告
    2020年秋软件工程“领跑衫”获奖感言
    20201015-3 每周例行报告
    20201207-总结
    20201126-1 每周例行报告
    20201120-1 每周例行报告
    20201112-1 每周例行报告
    20201105-1 每周例行报告
    20201022-1 每周例行报告
  • 原文地址:https://www.cnblogs.com/fengmingyue/p/6354490.html
Copyright © 2011-2022 走看看