zoukankan      html  css  js  c++  java
  • mr微博内容推荐

     
     
    
    
    
     第一次迭代
    1
    package com.laoxiao.mr.weibo; 2 3 import java.io.StringReader; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.io.IntWritable; 7 import org.apache.hadoop.io.LongWritable; 8 import org.apache.hadoop.io.Text; 9 import org.apache.hadoop.mapreduce.Mapper; 10 import org.wltea.analyzer.core.IKSegmenter; 11 import org.wltea.analyzer.core.Lexeme; 12 13 /** 14 * 第一个MR,计算TF和计算N(微博总数) 15 * @author root 16 * 17 */ 18 public class firstMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 19 20 protected void map(LongWritable key, Text value, Context context) 21 throws java.io.IOException ,InterruptedException { 22 String [] temp=StringUtils.split(value.toString()," "); 23 if(temp.length>=2){ 24 String id=temp[0].trim(); 25 String str=temp[1].trim(); 26 StringReader sr =new StringReader(str); 27 IKSegmenter ikSegmenter =new IKSegmenter(sr, true); 28 Lexeme word=null; 29 while( (word=ikSegmenter.next()) !=null ){ 30 String w= word.getLexemeText(); 31 context.write(new Text(w+"_"+id), new IntWritable(1)); 32 } 33 context.write(new Text("count"), new IntWritable(1)); 34 }else{ 35 System.out.println("value is error:"+value.toString()); 36 } 37 }; 38 } 39 package com.laoxiao.mr.weibo; 40 41 import org.apache.hadoop.io.IntWritable; 42 import org.apache.hadoop.io.Text; 43 import org.apache.hadoop.mapreduce.Reducer; 44 45 import sun.management.resources.agent; 46 47 public class firstReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 48 49 protected void reduce(Text arg0, java.lang.Iterable<IntWritable> arg1, Context arg2) 50 throws java.io.IOException ,InterruptedException { 51 int sum=0; 52 for (IntWritable i : arg1) { 53 sum+=i.get(); 54 } 55 arg2.write(arg0, new IntWritable(sum)); 56 }; 57 } 58 59 package com.laoxiao.mr.weibo; 60 61 import org.apache.hadoop.io.IntWritable; 62 import org.apache.hadoop.io.Text; 63 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; 64 65 public class firstRepartition extends HashPartitioner<Text, IntWritable>{ 66 67 @Override 68 public int getPartition(Text key, IntWritable value, int numReduceTasks) { 69 if(key.toString().equals("count")){ 70 return 3; 71 }else{ 72 return super.getPartition(key, value, numReduceTasks-1); 73 } 74 75 } 76 } 77 78 79 package com.laoxiao.mr.weibo; 80 81 82 import org.apache.hadoop.conf.Configuration; 83 import org.apache.hadoop.fs.FileSystem; 84 import org.apache.hadoop.fs.Path; 85 import org.apache.hadoop.io.IntWritable; 86 import org.apache.hadoop.io.Text; 87 import org.apache.hadoop.mapreduce.Job; 88 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 89 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 90 91 92 public class firstJob { 93 94 public static void main(String[] args) { 95 Configuration config=new Configuration(); 96 config.set("fs.defaultFS", "hdfs://node1:8020"); 97 config.set("yarn.resourcemanager.hostname", "node1"); 98 try { 99 FileSystem fs =FileSystem.get(config); 100 Job job=Job.getInstance(config); 101 job.setJarByClass(firstJob.class); 102 job.setJobName("weibo1"); 103 104 105 job.setMapperClass(firstMapper.class); 106 job.setReducerClass(firstReducer.class); 107 job.setMapOutputKeyClass(Text.class); 108 job.setMapOutputValueClass(IntWritable.class); 109 job.setPartitionerClass(firstRepartition.class); 110 //job.setCombinerClass(firstReducer.class); 111 job.setNumReduceTasks(4); 112 113 FileInputFormat.addInputPath(job, new Path("/root/input/data/weibo.txt")); 114 115 Path path =new Path("/usr/output/weibo1"); 116 if(fs.exists(path)){ 117 fs.delete(path, true); 118 } 119 FileOutputFormat.setOutputPath(job,path); 120 121 boolean f= job.waitForCompletion(true); 122 if(f){ 123 System.out.println("first job run finished!!"); 124 } 125 126 } catch (Exception e) { 127 // TODO Auto-generated catch block 128 e.printStackTrace(); 129 } 130 } 131 }

    第二次迭代

      1 package com.laoxiao.mr.weibo;
      2 
      3 import java.io.IOException;
      4 
      5 import org.apache.hadoop.io.IntWritable;
      6 import org.apache.hadoop.io.LongWritable;
      7 import org.apache.hadoop.io.Text;
      8 import org.apache.hadoop.mapreduce.Mapper;
      9 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
     10 //统计df:词在多少个微博中出现过。
     11 public class secondMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
     12 
     13     protected void map(LongWritable key, Text value, Context context)
     14             throws IOException, InterruptedException {
     15 
     16         //获取当前    mapper task的数据片段(split)
     17         FileSplit fs = (FileSplit) context.getInputSplit();
     18 
     19         if (!fs.getPath().getName().contains("part-r-00003")) {
     20 
     21             String[] v = value.toString().trim().split("	");
     22             if (v.length >= 2) {
     23                 String[] ss = v[0].split("_");
     24                 if (ss.length >= 2) {
     25                     String w = ss[0];
     26                     context.write(new Text(w), new IntWritable(1));
     27                 }
     28             } else {
     29                 System.out.println(value.toString() + "-------------");
     30             }
     31         }
     32 
     33     }
     34 }
     35 package com.laoxiao.mr.weibo;
     36 
     37 import org.apache.hadoop.io.IntWritable;
     38 import org.apache.hadoop.io.Text;
     39 import org.apache.hadoop.mapreduce.Reducer;
     40 
     41 public class secondReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
     42 
     43     protected void reduce(Text arg0, java.lang.Iterable<IntWritable> arg1, Context context) 
     44             throws java.io.IOException ,InterruptedException {
     45         int sum=0;
     46         for (IntWritable i : arg1) {
     47             sum+=1;
     48         }
     49         context.write(arg0, new IntWritable(sum));
     50     };
     51 }
     52 package com.laoxiao.mr.weibo;
     53 
     54 
     55 import org.apache.hadoop.conf.Configuration;
     56 import org.apache.hadoop.fs.FileSystem;
     57 import org.apache.hadoop.fs.Path;
     58 import org.apache.hadoop.io.IntWritable;
     59 import org.apache.hadoop.io.Text;
     60 import org.apache.hadoop.mapreduce.Job;
     61 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     62 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     63 
     64 
     65 public class secondJob {
     66 
     67     public static void main(String[] args) {
     68         Configuration config=new Configuration();
     69         config.set("fs.defaultFS", "hdfs://node1:8020");
     70         config.set("yarn.resourcemanager.hostname", "node1");
     71         try {
     72             FileSystem fs =FileSystem.get(config);
     73             Job job=Job.getInstance(config);
     74             job.setJarByClass(secondJob.class);
     75             job.setJobName("weibo2");
     76             
     77             
     78             job.setMapperClass(secondMapper.class);
     79             job.setReducerClass(secondReducer.class);
     80             job.setMapOutputKeyClass(Text.class);
     81             job.setMapOutputValueClass(IntWritable.class);
     82             //job.setPartitionerClass(firstRepartition.class);
     83             //job.setCombinerClass(firstReducer.class);
     84             //job.setNumReduceTasks(4);
     85             
     86             FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
     87             
     88             Path path =new Path("/usr/output/weibo2");
     89             if(fs.exists(path)){
     90                 fs.delete(path, true);
     91             }
     92             FileOutputFormat.setOutputPath(job,path);
     93             
     94             boolean f= job.waitForCompletion(true);
     95             if(f){
     96                 System.out.println("second job run finished!!");
     97             }
     98             
     99         } catch (Exception e) {
    100             // TODO Auto-generated catch block
    101             e.printStackTrace();
    102         }
    103     }
    104 }

    第三次迭代

    package com.laoxiao.mr.weibo;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.StringReader;
    import java.net.URI;
    import java.text.NumberFormat;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.wltea.analyzer.core.IKSegmenter;
    import org.wltea.analyzer.core.Lexeme;
    
    /**
     * 最后计算
     * @author root
     *
     */
    public class LastMapper extends Mapper<LongWritable, Text, Text, Text> {
        //存放微博总数
        public static Map<String, Integer> cmap = null;
        //存放df
        public static Map<String, Integer> df = null;
    
        // 在map方法执行之前
        protected void setup(Context context) throws IOException,
                InterruptedException {
            System.out.println("******************");
            if (cmap == null || cmap.size() == 0 || df == null || df.size() == 0) {
    
                URI[] ss = context.getCacheFiles();
                if (ss != null) {
                    for (int i = 0; i < ss.length; i++) {
                        URI uri = ss[i];
                        if (uri.getPath().endsWith("part-r-00003")) {//微博总数
                            Path path =new Path(uri.getPath());
    //                        FileSystem fs =FileSystem.get(context.getConfiguration());
    //                        fs.open(path);
                            BufferedReader br = new BufferedReader(new FileReader(path.getName()));
                            String line = br.readLine();
                            if (line.startsWith("count")) {
                                String[] ls = line.split("	");
                                cmap = new HashMap<String, Integer>();
                                cmap.put(ls[0], Integer.parseInt(ls[1].trim()));
                            }
                            br.close();
                        } else if (uri.getPath().endsWith("part-r-00000")) {//词条的DF
                            df = new HashMap<String, Integer>();
                            Path path =new Path(uri.getPath());
                            BufferedReader br = new BufferedReader(new FileReader(path.getName()));
                            String line;
                            while ((line = br.readLine()) != null) {
                                String[] ls = line.split("	");
                                df.put(ls[0], Integer.parseInt(ls[1].trim()));
                            }
                            br.close();
                        }
                    }
                }
            }
        }
    
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            FileSplit fs = (FileSplit) context.getInputSplit();
    //        System.out.println("--------------------");
            if (!fs.getPath().getName().contains("part-r-00003")) {
                
                String[] v = value.toString().trim().split("	");
                if (v.length >= 2) {
                    int tf =Integer.parseInt(v[1].trim());//tf值
                    String[] ss = v[0].split("_");
                    if (ss.length >= 2) {
                        String w = ss[0];
                        String id=ss[1];
                        
                        double s=tf * Math.log(cmap.get("count")/df.get(w));
                        NumberFormat nf =NumberFormat.getInstance();
                        nf.setMaximumFractionDigits(5);
                        context.write(new Text(id), new Text(w+":"+nf.format(s)));
                    }
                } else {
                    System.out.println(value.toString() + "-------------");
                }
            }
        }
    }
    package com.laoxiao.mr.weibo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class LastReduce extends Reducer<Text, Text, Text, Text>{
        
        protected void reduce(Text key, Iterable<Text> arg1,
                Context context)
                throws IOException, InterruptedException {
            
            StringBuffer sb =new StringBuffer();
            
            for( Text i :arg1 ){
                sb.append(i.toString()+"	");
            }
            
            context.write(key, new Text(sb.toString()));
        }
    
    }
    package com.laoxiao.mr.weibo;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.filecache.DistributedCache;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class LastJob {
    
        public static void main(String[] args) {
            Configuration config =new Configuration();
            config.set("fs.defaultFS", "hdfs://node1:8020");
            config.set("yarn.resourcemanager.hostname", "node1");
            //config.set("mapred.jar", "C:\Users\Administrator\Desktop\weibo3.jar");
            try {
                FileSystem fs =FileSystem.get(config);
                //JobConf job =new JobConf(config);
                Job job =Job.getInstance(config);
                job.setJarByClass(LastJob.class);
                job.setJobName("weibo3");
                
    //            DistributedCache.addCacheFile(uri, conf);
                //2.5
                //把微博总数加载到内存
                job.addCacheFile(new Path("/usr/output/weibo1/part-r-00003").toUri());
                //把df加载到内存
                job.addCacheFile(new Path("/usr/output/weibo2/part-r-00000").toUri());
                
                
                
                
                
                //设置map任务的输出key类型、value类型
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
    //            job.setMapperClass();
                job.setMapperClass(LastMapper.class);
                job.setReducerClass(LastReduce.class);
                
                //mr运行时的输入数据从hdfs的哪个目录中获取
                FileInputFormat.addInputPath(job, new Path("/usr/output/weibo1"));
                Path outpath =new Path("/usr/output/weibo3");
                if(fs.exists(outpath)){
                    fs.delete(outpath, true);
                }
                FileOutputFormat.setOutputPath(job,outpath );
                
                boolean f= job.waitForCompletion(true);
                if(f){
                    System.out.println("执行job成功");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    扫面线模板
    (动态规划、栈)leetcode 84. Largest Rectangle in Histogram, 85. Maximal Rectangle
    tmux 常见命令汇总
    leetcode 221
    leetcode 319 29
    (贪心)leetcode 392. Is Subsequence, 771. Jewels and Stones, 463. Island Perimeter
    leetcode 982 668
    Python import 同文件夹下的py文件的函数,pycharm报错
    Windows里Anaconda-Navigator无法打开的解决方案
    Windows下 gpu版 Tensorflow 安装
  • 原文地址:https://www.cnblogs.com/scl1314/p/7513956.html
Copyright © 2011-2022 走看看