zoukankan      html  css  js  c++  java
  • 21-hadoop-weibo推送广告

    1, tf-idf

    计算每个人的词条中的重要度

    需要3个mapreduce 的 job执行, 第一个计算 TF 和 n, 第二个计算 DF, 第三个代入公式计算结果值

    1, 第一个job

    package com.wenbronk.weibo;
    
    import java.io.IOException;
    import java.io.StringReader;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.wltea.analyzer.core.IKSegmenter;
    import org.wltea.analyzer.core.Lexeme;
    
    /**
     * 第一个map, 计算 TF 和 N
     * 
     * @author root
     *
     */
    public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        /**
         * TF 在一个文章中出现的词频 N 总共多少文章
         * 按行传入
         */
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
    
            String[] values = value.toString().trim().split("	");
    
            if (values.length >= 2) {
                String id = values[0].trim();
                String content = values[1].trim();
                
                // 分词
                StringReader stringReader = new StringReader(content);
                IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);
                Lexeme word = null;
                while ((word = ikSegmenter.next()) != null ) {
                    String w = word.getLexemeText();
                    context.write(new Text(w + "_" + id), new IntWritable(1));
                }
                context.write(new Text("count"), new IntWritable(1));
            }else {
                System.out.println(values.toString() + "---");
            }
    
        }
    
    }

    reduce

    package com.wenbronk.weibo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * 统计tf, n
     * @author root
     *
     */
    public class FirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
        @Override
        protected void reduce(Text arg0, Iterable<IntWritable> arg1,
                Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {
            
            int sum = 0;
            for (IntWritable intWritable : arg1) {
                sum += intWritable.get();
            }
            if (arg0.equals(new Text("count"))) {
                System.err.println(arg0.toString() + "---");
            }
            arg2.write(arg0, new IntWritable(sum));
        }
        
    }

    partition

    package com.wenbronk.weibo;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    
    /**
     * 决定分区, 计划分4个, n一个, tf三个
     * @author root
     *
     */
    public class FirstPartition extends HashPartitioner<Text, IntWritable>{
    
        @Override
        public int getPartition(Text key, IntWritable value, int numReduceTasks) {
            if (key.equals(new Text("count"))) {
                return 3;
            }else {
                return super.getPartition(key, value, numReduceTasks - 1);
            }
            
        }
        
    }

    mainJob

    package com.wenbronk.weibo;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class FirstJob {
    
        public static void main(String[] args) {
            Configuration config = new Configuration();
            config.set("fs.defaults", "hdfs://192.168.208.106:8020");
            config.set("yarn.resourcemanager.hostname", "192.168.208.106");
    //        config.set("maper.jar", "E:\sxt\target\weibo1.jar");
            
            try {
                
                Job job = Job.getInstance(config);
                job.setJarByClass(FirstJob.class);
                job.setJobName("first");
                
                job.setPartitionerClass(FirstPartition.class);
                job.setMapperClass(FirstMapper.class);
                job.setNumReduceTasks(4);
                job.setCombinerClass(FirstReducer.class);
                job.setReducerClass(FirstReducer.class);
    
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(IntWritable.class);
                
                FileInputFormat.addInputPath(job, new Path("E:\sxt\1-MapReduce\data\weibo2.txt"));
                
                FileSystem fileSystem = FileSystem.get(config);
                
                Path outPath = new Path("E:\sxt\1-MapReduce\data\weibo1");
                if (fileSystem.exists(outPath)) {
                    fileSystem.delete(outPath);
                }
                FileOutputFormat.setOutputPath(job, outPath);
                
                boolean waitForCompletion = job.waitForCompletion(true);
                if (waitForCompletion) {
                    System.out.println("first success");
                }
                
            }catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }

    2, 第二个

    package com.wenbronk.weibo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    /**
     * 计算 DFi的值, 在多少个文章中出现过
     *
     */
    public class SecondMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            
            // 获取当前maptask的数据片段
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            
            // count不被统计
            if (!inputSplit.getPath().getName().contains("part-r-00003")) {
                
                String[] values = value.toString().trim().split("	");
                
                if (values.length >= 2) {
                    String[] split = values[0].trim().split("_");
                    if (split.length >= 2) {
                        String id = split[0];
                        context.write(new Text(id), new IntWritable(1));
                    }
                }
            }else {
                System.out.println(value.toString() + "----");
            }
            
            
        }
        
    }

    reduce

    package com.wenbronk.weibo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * 
     * @author root
     *
     */
    public class SecondReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
        @Override
        protected void reduce(Text arg0, Iterable<IntWritable> arg1,
                Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {
            
            int sum = 0;
            for (IntWritable intWritable : arg1) {
                sum += intWritable.get();
            }
            arg2.write(new Text(arg0), new IntWritable(sum));
        }
        
    }

    mainjob

    package com.wenbronk.weibo;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SecondJob {
    
        public static void main(String[] args) {
            Configuration config = new Configuration();
            config.set("fs.default", "hdfs://192.168.208.106:8020");
            config.set("yarn.resourcemanager.hostname", "192.168.208.106");
            
            try {
                
                Job job = Job.getInstance(config);
                job.setJarByClass(SecondJob.class);
                job.setJobName("second");
                
                job.setMapperClass(SecondMapper.class);
                job.setCombinerClass(SecondReducer.class);
                job.setReducerClass(SecondReducer.class);
    
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                
                FileInputFormat.addInputPath(job, new Path("E:\sxt\1-MapReduce\data\weibo1"));
                
                FileSystem fileSystem = FileSystem.get(config);
                Path outPath = new Path("E:\sxt\1-MapReduce\data\weibo2");
                if (fileSystem.exists(outPath)) {
                    fileSystem.delete(outPath);
                }
                FileOutputFormat.setOutputPath(job, outPath);
                
                boolean f = job.waitForCompletion(true);
                if (f) {
                    System.out.println("job2 success");
                }
                
            }catch(Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }

    3, 第三个Job

    package com.wenbronk.weibo;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.text.NumberFormat;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class ThirdMapper extends Mapper<LongWritable, Text, Text, Text>{
    
        //存放微博总数, 将小数据缓存进内存, 预加载
            public static Map<String, Integer> cmap = null;
            //存放df
            public static Map<String, Integer> df = null;
            
            // 在初始化类时执行, 将数据预加载进map
            protected void setup(Context context)
                    throws IOException, InterruptedException {
                
                System.out.println("*****");
                if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
                    URI[] cacheFiles = context.getCacheFiles();
                    if (cacheFiles != null) {
                        for (URI uri : cacheFiles) {
                            if (uri.getPath().endsWith("part-r-00003")) {
                                Path path = new Path(uri.getPath());
                                // 获取文件
                                Configuration configuration = context.getConfiguration();
                                FileSystem fs = FileSystem.get(configuration);
                                FSDataInputStream open = fs.open(path);
                                BufferedReader reader = new BufferedReader(new InputStreamReader(open));
                                
    //                            BufferedReader reader = new BufferedReader(new FileReader(path.getName()));
                                String line = reader.readLine();
                                if (line.startsWith("count")) {
                                    String[] split = line.split("	");
                                    cmap = new HashMap<>();
                                    cmap.put(split[0], Integer.parseInt(split[1].trim()));
                                }
                                reader.close();
                            }else if (uri.getPath().endsWith("part-r-00000")) {
                                df = new HashMap<>();
                                Path path = new Path(uri.getPath());
                                
                                // 获取文件
                                Configuration configuration = context.getConfiguration();
                                FileSystem fs = FileSystem.get(configuration);
                                FSDataInputStream open = fs.open(path);
                                BufferedReader reader = new BufferedReader(new InputStreamReader(open));
    //                            BufferedReader reader = new BufferedReader(new FileReader(path.getName()));
                                
                                String line = null;
                                while ((line = reader.readLine()) != null) {
                                    String[] ls = line.split("	");
                                    df.put(ls[0], Integer.parseInt(ls[1].trim()));
                                }
                                reader.close();
                            }
                        }
                    }
                }
            }
            
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                // 获取分片
                FileSplit inputSplit = (FileSplit) context.getInputSplit();
                
                if (!inputSplit.getPath().getName().contains("part-r-00003")) {
                    String[] values = value.toString().trim().split("	");
                    
                    if (values.length >= 2) {
                        
                        int tf = Integer.parseInt(values[1].trim());
                        
                        String[] ss = values[0].split("_");
                        if (ss.length >= 2) {
                            String word = ss[0];
                            String id = ss[1];
                            
                            // 公式
                            Double s = tf * Math.log(cmap.get("count")) / df.get(word);
                            NumberFormat format = NumberFormat.getInstance();
                            // 取小数点后5位
                            format.setMaximumFractionDigits(5);
                            
                            context.write(new Text(id), new Text(word + ": " + format.format(s)));
                        }else {
                            System.out.println(value.toString() + "------");
                        }
                    }
                }
            }
    }

    reduce

    package com.wenbronk.weibo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class ThirdReducer extends Reducer<Text, Text, Text, Text>{
    
        @Override
        protected void reduce(Text arg0, Iterable<Text> arg1, Reducer<Text, Text, Text, Text>.Context arg2)
                throws IOException, InterruptedException {
            
            StringBuffer sb = new StringBuffer();
            for (Text text : arg1) {
                sb.append(text.toString() + "	");
            }
            arg2.write(arg0, new Text(sb.toString()));
        }
        
    }

    mainJob

    package com.wenbronk.weibo;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class ThirdJob {
    
        public static void main(String[] args) {
            
            Configuration config = new Configuration();
            config.set("fs.defaults", "hdfs://192.168.208.106:8020");
            config.set("yarn.resourcemanager.hostname", "192.168.208.106");
            try {
                Job job = Job.getInstance(config); 
                job.setJarByClass(ThirdJob.class);
                job.setJobName("third");
    //            job.setInputFormatClass(KeyValueTextInputFormat.class);
                
                //把微博总数加载到内存
                job.addCacheFile(new Path("E:\sxt\1-MapReduce\data\weibo1\part-r-00003").toUri());
                //把df加载到内存
                job.addCacheFile(new Path("E:\sxt\1-MapReduce\data\weibo2\part-r-00000").toUri());
                
                job.setMapperClass(ThirdMapper.class);
                job.setReducerClass(ThirdReducer.class);
                
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                
                FileSystem fs = FileSystem.get(config);
                FileInputFormat.addInputPath(job, new Path("E:\sxt\1-MapReduce\data\weibo1"));
                Path path = new Path("E:\sxt\1-MapReduce\data\weibo3");
                if (fs.exists(path)) {
                    fs.delete(path);
                }
                FileOutputFormat.setOutputPath(job, path);
                
                boolean waitForCompletion = job.waitForCompletion(true);
                if(waitForCompletion) {
                    System.out.println("执行job成功");
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
        
    }

     系列来自尚学堂视频

  • 相关阅读:
    【BZOJ 2124】【CodeVS 1283】等差子序列
    【BZOJ 1036】【ZJOI 2008】树的统计Count
    【BZOJ 1901】【ZJU 2112】Dynamic Rankings
    【BZOJ 3924】【ZJOI 2015】幻想乡战略游戏
    【BZOJ 4103】【THUSC 2015】异或运算
    【BZOJ 4513】【SDOI 2016】储能表
    【HDU 3622】Bomb Game
    【BZOJ 3166】【HEOI 2013】Alo
    【BZOJ 3530】【SDOI 2014】数数
    【BZOJ 4567】【SCOI 2016】背单词
  • 原文地址:https://www.cnblogs.com/wenbronk/p/7309515.html
Copyright © 2011-2022 走看看