zoukankan      html  css  js  c++  java
  • 课堂练习之mapperduce

    Map类:

    package com.lq.testt;
    
    import java.io.BufferedReader;
    import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.StringTokenizer;
    import java.io.Map;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    
    public class Map extends Mapper<LongWritable, Text, Text, Writable>
    {
    //    @Override
    //    protected void map(LongWritable key, Text value, Context context)
    //            throws IOException, InterruptedException 
    //    {
    //        String line = value.toString();
    //        //切分成各个字段
    //        String[] fields = StringUtils.split(line, "[,]");
    //        
    //        //拿到我们需要的字段
    //        String ip = fields[0]+"."+fields[1]+"."+fields[2]+"."+fields[3];
    //        String article=fields[7]+"/"+fields[8];
    //        String time=fields[4];
    //        int traffic=new Integer(fields[6]);
    //        String video=fields[7]+"/"+fields[8];
    //        //封装数据为kv并输出      
    //        context.write(new Text(ip), new Writable(ip, time, traffic, article, video));
    //    }
         BufferedReader br=new  BufferedReader(new FileReader("D:"+File.separator+"result.txt"));//map存放引用数据类型
           Map<String,Integer> map=new HashMap<String,Integer>();
           String line=null;
           //读取文件,将向map中添加单词
           while((line=br.readLine())!=null)
           {
        String[] fields = StringUtils.split(line, "[,]");
        
        //拿到我们需要的字段
        String ip = fields[0]+"."+fields[1]+"."+fields[2]+"."+fields[3];
        String article=fields[7]+"/"+fields[8];
        String time=fields[4];
        int traffic=new Integer(fields[6]);
        String video=fields[7]+"/"+fields[8];
    }

    Reduce类

    package com.lq.testt;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class Reduce extends Reducer<Text, Writable, Text, Writable>
    {
       @Override
      protected void reduce(Text key, Iterable<Writable> values, Context context)
            throws IOException, InterruptedException 
       {
           String time=null;
           int traffic=0;
           String article=null;
           String video=null;
           for(Writable writable:values)
           {
               time+=writable.getTime();
               traffic+=writable.getTraffic();
               article+=writable.getArticle();
               video=writable.getVideo();
           }
          context.write(key, new Writable(key.toString(), time, traffic, article, video));
       }
    }

    Writable类:

    package com.lq.testt;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class Writable implements org.apache.hadoop.io.Writable
    {
       private String ip;
       private String time;
       private int traffic;
       private String article;
       private String video;
       public Writable()
       {
        super();
        }
       public Writable(String ip, String time, int traffic, String article, String video)
       {
        super();
        this.ip = ip;
        this.time = time;
        this.traffic = traffic;
        this.article = article;
        this.video = video;
      }
       public void write(DataOutput out) throws IOException
       {
           out.writeInt(this.traffic);
           out.writeUTF(this.ip);
           out.writeUTF(this.time);
           out.writeUTF(this.article);
           out.writeUTF(this.video);
       }
       @Override
       public void readFields(DataInput in) throws IOException
       {
          this.traffic=in.readInt();
          this.ip=in.readUTF();
          this.time=in.readUTF();
          this.article=in.readUTF();
          this.video=in.readUTF();
           
       }
      public String getIp() {
        return ip;
    }
    public void setIp(String ip) {
        this.ip = ip;
    }
    public String getTime() {
        return time;
    }
    public void setTime(String time) {
        this.time = time;
    }
    public int getTraffic() {
        return traffic;
    }
    public void setTraffic(int traffic) {
        this.traffic = traffic;
    }
    public String getArticle() {
        return article;
    }
    public void setArticle(String article) {
        this.article = article;
    }
    public String getVideo() {
        return video;
    }
    public void setVideo(String video) {
        this.video = video;
    }
    
    }

    Runn类:

    package com.lq.testt;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    
    public class Runner 
    {
       public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException 
       {
           Configuration conf=new Configuration();
            Job job=Job.getInstance(conf);  //调用计算程序,封装计算程序的mapper,reduce,输入,输出
            job.setJarByClass(Runner.class);   //设置主驱动类反射  Hadoop运行是jar包类型
            job.setMapperClass(Map.class);//设置mapper类
            job.setReducerClass(Reduce.class);//设置reduce类
            job.setMapOutputKeyClass(Text.class); //设置map的输出类型
            job.setMapOutputValueClass(Writable.class);
            job.setOutputKeyClass(Text.class);//设置reduce的输出类型
            job.setOutputValueClass(Writable.class);
            FileInputFormat.setInputPaths(job, new Path(args[0])); //设置输入,需要统计单词的路径,args[0]为控制台手动输入的参数
            FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置输出,最终结果输出的路径,输出路径之前不能存在
    //        job.submit();  //job提交,一般不打印日志
            job.waitForCompletion(true); //true为打印执行日志 
       }
    }
  • 相关阅读:
    css学习之 display:inline-block;
    java重写
    PDF在线阅读 FlexPaper 惰性加载 ;
    js两种生成对象模式(公有成员和成员私有)
    js 设计模式-接口
    聊聊 elasticsearch 之分词器配置 (IK+pinyin)
    nexus 批量上传jar到私有仓库内
    Java IDEA 根据mybatis-generator-core自动生成代码支持sqlserver获取备注(二)
    Elasticsearch实现搜索推荐词
    Java IDEA根据database以及脚本代码自动生成DO,DAO,SqlMapper文件(一)
  • 原文地址:https://www.cnblogs.com/lq13035130506/p/11853823.html
Copyright © 2011-2022 走看看