zoukankan      html  css  js  c++  java
  • 21-hadoop-weibo推送广告

    1, tf-idf

    计算每个人的词条中的重要度

    需要3个mapreduce 的 job执行, 第一个计算 TF 和 n, 第二个计算 DF, 第三个代入公式计算结果值

    1, 第一个job

    package com.wenbronk.weibo;
    
    import java.io.IOException;
    import java.io.StringReader;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.wltea.analyzer.core.IKSegmenter;
    import org.wltea.analyzer.core.Lexeme;
    
    /**
     * 第一个map, 计算 TF 和 N
     * 
     * @author root
     *
     */
    public class FirstMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        /**
         * TF 在一个文章中出现的词频 N 总共多少文章
         * 按行传入
         */
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
    
            String[] values = value.toString().trim().split("	");
    
            if (values.length >= 2) {
                String id = values[0].trim();
                String content = values[1].trim();
                
                // 分词
                StringReader stringReader = new StringReader(content);
                IKSegmenter ikSegmenter = new IKSegmenter(stringReader, true);
                Lexeme word = null;
                while ((word = ikSegmenter.next()) != null ) {
                    String w = word.getLexemeText();
                    context.write(new Text(w + "_" + id), new IntWritable(1));
                }
                context.write(new Text("count"), new IntWritable(1));
            }else {
                System.out.println(values.toString() + "---");
            }
    
        }
    
    }

    reduce

    package com.wenbronk.weibo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * 统计tf, n
     * @author root
     *
     */
    public class FirstReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
        @Override
        protected void reduce(Text arg0, Iterable<IntWritable> arg1,
                Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {
            
            int sum = 0;
            for (IntWritable intWritable : arg1) {
                sum += intWritable.get();
            }
            if (arg0.equals(new Text("count"))) {
                System.err.println(arg0.toString() + "---");
            }
            arg2.write(arg0, new IntWritable(sum));
        }
        
    }

    partition

    package com.wenbronk.weibo;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    
    /**
     * 决定分区, 计划分4个, n一个, tf三个
     * @author root
     *
     */
    public class FirstPartition extends HashPartitioner<Text, IntWritable>{
    
        @Override
        public int getPartition(Text key, IntWritable value, int numReduceTasks) {
            if (key.equals(new Text("count"))) {
                return 3;
            }else {
                return super.getPartition(key, value, numReduceTasks - 1);
            }
            
        }
        
    }

    mainJob

    package com.wenbronk.weibo;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class FirstJob {
    
        public static void main(String[] args) {
            Configuration config = new Configuration();
            config.set("fs.defaults", "hdfs://192.168.208.106:8020");
            config.set("yarn.resourcemanager.hostname", "192.168.208.106");
    //        config.set("maper.jar", "E:\sxt\target\weibo1.jar");
            
            try {
                
                Job job = Job.getInstance(config);
                job.setJarByClass(FirstJob.class);
                job.setJobName("first");
                
                job.setPartitionerClass(FirstPartition.class);
                job.setMapperClass(FirstMapper.class);
                job.setNumReduceTasks(4);
                job.setCombinerClass(FirstReducer.class);
                job.setReducerClass(FirstReducer.class);
    
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(IntWritable.class);
                
                FileInputFormat.addInputPath(job, new Path("E:\sxt\1-MapReduce\data\weibo2.txt"));
                
                FileSystem fileSystem = FileSystem.get(config);
                
                Path outPath = new Path("E:\sxt\1-MapReduce\data\weibo1");
                if (fileSystem.exists(outPath)) {
                    fileSystem.delete(outPath);
                }
                FileOutputFormat.setOutputPath(job, outPath);
                
                boolean waitForCompletion = job.waitForCompletion(true);
                if (waitForCompletion) {
                    System.out.println("first success");
                }
                
            }catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }

    2, 第二个

    package com.wenbronk.weibo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    /**
     * 计算 DFi的值, 在多少个文章中出现过
     *
     */
    public class SecondMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            
            // 获取当前maptask的数据片段
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            
            // count不被统计
            if (!inputSplit.getPath().getName().contains("part-r-00003")) {
                
                String[] values = value.toString().trim().split("	");
                
                if (values.length >= 2) {
                    String[] split = values[0].trim().split("_");
                    if (split.length >= 2) {
                        String id = split[0];
                        context.write(new Text(id), new IntWritable(1));
                    }
                }
            }else {
                System.out.println(value.toString() + "----");
            }
            
            
        }
        
    }

    reduce

    package com.wenbronk.weibo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * 
     * @author root
     *
     */
    public class SecondReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
        @Override
        protected void reduce(Text arg0, Iterable<IntWritable> arg1,
                Reducer<Text, IntWritable, Text, IntWritable>.Context arg2) throws IOException, InterruptedException {
            
            int sum = 0;
            for (IntWritable intWritable : arg1) {
                sum += intWritable.get();
            }
            arg2.write(new Text(arg0), new IntWritable(sum));
        }
        
    }

    mainjob

    package com.wenbronk.weibo;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class SecondJob {
    
        public static void main(String[] args) {
            Configuration config = new Configuration();
            config.set("fs.default", "hdfs://192.168.208.106:8020");
            config.set("yarn.resourcemanager.hostname", "192.168.208.106");
            
            try {
                
                Job job = Job.getInstance(config);
                job.setJarByClass(SecondJob.class);
                job.setJobName("second");
                
                job.setMapperClass(SecondMapper.class);
                job.setCombinerClass(SecondReducer.class);
                job.setReducerClass(SecondReducer.class);
    
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                
                FileInputFormat.addInputPath(job, new Path("E:\sxt\1-MapReduce\data\weibo1"));
                
                FileSystem fileSystem = FileSystem.get(config);
                Path outPath = new Path("E:\sxt\1-MapReduce\data\weibo2");
                if (fileSystem.exists(outPath)) {
                    fileSystem.delete(outPath);
                }
                FileOutputFormat.setOutputPath(job, outPath);
                
                boolean f = job.waitForCompletion(true);
                if (f) {
                    System.out.println("job2 success");
                }
                
            }catch(Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }

    3, 第三个Job

    package com.wenbronk.weibo;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.text.NumberFormat;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class ThirdMapper extends Mapper<LongWritable, Text, Text, Text>{
    
        //存放微博总数, 将小数据缓存进内存, 预加载
            public static Map<String, Integer> cmap = null;
            //存放df
            public static Map<String, Integer> df = null;
            
            // 在初始化类时执行, 将数据预加载进map
            protected void setup(Context context)
                    throws IOException, InterruptedException {
                
                System.out.println("*****");
                if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
                    URI[] cacheFiles = context.getCacheFiles();
                    if (cacheFiles != null) {
                        for (URI uri : cacheFiles) {
                            if (uri.getPath().endsWith("part-r-00003")) {
                                Path path = new Path(uri.getPath());
                                // 获取文件
                                Configuration configuration = context.getConfiguration();
                                FileSystem fs = FileSystem.get(configuration);
                                FSDataInputStream open = fs.open(path);
                                BufferedReader reader = new BufferedReader(new InputStreamReader(open));
                                
    //                            BufferedReader reader = new BufferedReader(new FileReader(path.getName()));
                                String line = reader.readLine();
                                if (line.startsWith("count")) {
                                    String[] split = line.split("	");
                                    cmap = new HashMap<>();
                                    cmap.put(split[0], Integer.parseInt(split[1].trim()));
                                }
                                reader.close();
                            }else if (uri.getPath().endsWith("part-r-00000")) {
                                df = new HashMap<>();
                                Path path = new Path(uri.getPath());
                                
                                // 获取文件
                                Configuration configuration = context.getConfiguration();
                                FileSystem fs = FileSystem.get(configuration);
                                FSDataInputStream open = fs.open(path);
                                BufferedReader reader = new BufferedReader(new InputStreamReader(open));
    //                            BufferedReader reader = new BufferedReader(new FileReader(path.getName()));
                                
                                String line = null;
                                while ((line = reader.readLine()) != null) {
                                    String[] ls = line.split("	");
                                    df.put(ls[0], Integer.parseInt(ls[1].trim()));
                                }
                                reader.close();
                            }
                        }
                    }
                }
            }
            
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                    throws IOException, InterruptedException {
                // 获取分片
                FileSplit inputSplit = (FileSplit) context.getInputSplit();
                
                if (!inputSplit.getPath().getName().contains("part-r-00003")) {
                    String[] values = value.toString().trim().split("	");
                    
                    if (values.length >= 2) {
                        
                        int tf = Integer.parseInt(values[1].trim());
                        
                        String[] ss = values[0].split("_");
                        if (ss.length >= 2) {
                            String word = ss[0];
                            String id = ss[1];
                            
                            // 公式
                            Double s = tf * Math.log(cmap.get("count")) / df.get(word);
                            NumberFormat format = NumberFormat.getInstance();
                            // 取小数点后5位
                            format.setMaximumFractionDigits(5);
                            
                            context.write(new Text(id), new Text(word + ": " + format.format(s)));
                        }else {
                            System.out.println(value.toString() + "------");
                        }
                    }
                }
            }
    }

    reduce

    package com.wenbronk.weibo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class ThirdReducer extends Reducer<Text, Text, Text, Text>{
    
        @Override
        protected void reduce(Text arg0, Iterable<Text> arg1, Reducer<Text, Text, Text, Text>.Context arg2)
                throws IOException, InterruptedException {
            
            StringBuffer sb = new StringBuffer();
            for (Text text : arg1) {
                sb.append(text.toString() + "	");
            }
            arg2.write(arg0, new Text(sb.toString()));
        }
        
    }

    mainJob

    package com.wenbronk.weibo;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class ThirdJob {
    
        public static void main(String[] args) {
            
            Configuration config = new Configuration();
            config.set("fs.defaults", "hdfs://192.168.208.106:8020");
            config.set("yarn.resourcemanager.hostname", "192.168.208.106");
            try {
                Job job = Job.getInstance(config); 
                job.setJarByClass(ThirdJob.class);
                job.setJobName("third");
    //            job.setInputFormatClass(KeyValueTextInputFormat.class);
                
                //把微博总数加载到内存
                job.addCacheFile(new Path("E:\sxt\1-MapReduce\data\weibo1\part-r-00003").toUri());
                //把df加载到内存
                job.addCacheFile(new Path("E:\sxt\1-MapReduce\data\weibo2\part-r-00000").toUri());
                
                job.setMapperClass(ThirdMapper.class);
                job.setReducerClass(ThirdReducer.class);
                
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
                
                FileSystem fs = FileSystem.get(config);
                FileInputFormat.addInputPath(job, new Path("E:\sxt\1-MapReduce\data\weibo1"));
                Path path = new Path("E:\sxt\1-MapReduce\data\weibo3");
                if (fs.exists(path)) {
                    fs.delete(path);
                }
                FileOutputFormat.setOutputPath(job, path);
                
                boolean waitForCompletion = job.waitForCompletion(true);
                if(waitForCompletion) {
                    System.out.println("执行job成功");
                }
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
        
    }

     系列来自尚学堂视频

  • 相关阅读:
    PhpStorm函数注释的设置
    thinkphp5 返回数组提示variable type error: array
    js获取json对象中的key和value,并组成新数组
    PHP生成随机字符串与唯一字符串
    yii2-admin扩展自定义目录
    PHP7.3发布啦
    服务器环境从PHP5升级到PHP7
    亲测能用的mysqli类,挺好用的
    PHP必用代码片段
    git flow的使用
  • 原文地址:https://www.cnblogs.com/wenbronk/p/7309515.html
Copyright © 2011-2022 走看看