zoukankan      html  css  js  c++  java
  • hadoop之mapreduce编程实例(系统日志初步清洗过滤处理)

    刚刚开始接触hadoop的时候,总觉得必须要先安装hadoop集群才能开始学习MR编程,其实并不用这样,当然如果你有条件有机器那最好是自己安装配置一个hadoop集群,这样你会更容易理解其工作原理。我们今天就是要给大家演示如何不用安装hadoop直接调试编程MapReduce函数。

    开始之前我们先来理解一下mapreduce的工作原理:

    hadoop集群是有DataNode和NameNode两种节点构成,DataNode负责存储数据本身而NameNode负责存储数据的元数据信息,在启动mapreduce任务时,数据首先是通过inputformat模块从集群的文件库中读出,然后按照设定的Splitsize进行Split(默认是一个block大小128MB),通过ReadRecorder(RR)将每个split的数据块按行进行轮询访问结果给到map函数,由map函数按照编程的代码逻辑进行处理,输出key和value。由map到reduce的处理过程中包含三件事情,Combiner(map端的预先处理,相对于map段reduce)Partitioner(负责将map输出数据均衡的分配给reduce)Shulffling&&sort(根据map输出的key进行洗牌和排序,将结果根据partitioner的分配情况传输给指定的reduce),最后reduce按照代码逻辑处理输出结果(也是key,value格式)。

    注意:

    map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
    map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应

    下面是wordcount的处理过程大家来理解一下:

    现在我们开始我们的本地MR编程吧

    首先我们得去官网下载一个hadoop安装包(本文用的hadoop2.6.0版本,不用安装,我们只要包中jars)

    下载链接:https://archive.apache.org/dist/hadoop/common/(下载最多的那个就可以了,版本自己选个)

    下面就上MR的代码吧:

    [java] view plain copy
     
    1. package loganalysis;  
    2. import java.io.IOException;  
    3. import java.util.StringTokenizer;  
    4. import java.lang.*;  
    5.    
    6. import org.apache.hadoop.conf.Configuration;  
    7. import org.apache.hadoop.fs.Path;  
    8. import org.apache.hadoop.io.IntWritable;  
    9. import org.apache.hadoop.io.Text;  
    10. import org.apache.hadoop.mapreduce.Job;  
    11. import org.apache.hadoop.mapreduce.Mapper;  
    12. import org.apache.hadoop.mapreduce.Reducer;  
    13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    15. import org.apache.hadoop.util.GenericOptionsParser;  
    16.   
    17.    
    18. public class WordCount {  
    19.    
    20.   public static class TokenizerMapper   
    21.        extends Mapper<Object, Text, Text, IntWritable>{  
    22.    
    23.     private final static IntWritable one = new IntWritable(1);  
    24.     private Text word = new Text();  
    25.     private String imei = new String();  
    26.     private String areacode  = new String();  
    27.     private String responsedata = new String();  
    28.     private String requesttime = new String();  
    29.     private String requestip = new String();  
    30.   
    31. //    map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本  
    32. //    map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应  
    33.     public void map(Object key, Text value, Context context  
    34.                     ) throws IOException, InterruptedException {  
    35.       //StringTokenizer itr = new StringTokenizer(value.toString());  
    36.         
    37.       int areai = value.toString().indexOf("areacode", 21);  
    38.       int imeii = value.toString().indexOf("imei", 21);  
    39.       int redatai = value.toString().indexOf("responsedata", 21);  
    40.       int retimei = value.toString().indexOf("requesttime", 21);  
    41.       int reipi = value.toString().indexOf("requestip", 21);  
    42.         
    43.       if (areai==-1)  
    44.       { areacode=""; }  
    45.       else  
    46.       {  
    47.       areacode=value.toString().substring(areai+11);  
    48.       int len2=areacode.indexOf(""");  
    49.       if(len2 <= 1)  
    50.         {  
    51.           areacode="";  
    52.         }  
    53.       else   
    54.         {  
    55.           areacode=areacode.substring(0,len2);  
    56.         }  
    57.         
    58.       }  
    59.       if (imeii==-1)  
    60.       { imei=""; }  
    61.       else  
    62.       {  
    63.           imei=value.toString().substring(imeii+9);  
    64.       int len2=imei.indexOf("\");  
    65.       if(len2 <= 1)  
    66.         {  
    67.           imei="";  
    68.         }  
    69.       else   
    70.         {  
    71.           imei=imei.substring(0,len2);  
    72.         }  
    73.         
    74.       }  
    75.        
    76.         
    77.       if (redatai==-1)  
    78.       { responsedata=""; }  
    79.       else  
    80.       {  
    81.           responsedata=value.toString().substring(redatai+15);  
    82.       int len2=responsedata.indexOf(""");  
    83.       if(len2 <= 1)  
    84.         {  
    85.           responsedata="";  
    86.         }  
    87.       else   
    88.         {  
    89.           responsedata=responsedata.substring(0,len2);  
    90.         }  
    91.         
    92.       }  
    93.         
    94.         
    95.       if (retimei==-1)  
    96.       { requesttime=""; }  
    97.       else  
    98.       {  
    99.           requesttime=value.toString().substring(retimei+14);  
    100.       int len2=requesttime.indexOf(""");  
    101.       if(len2 <= 1)  
    102.         {  
    103.           requesttime="";  
    104.         }  
    105.       else   
    106.         {  
    107.           requesttime=requesttime.substring(0,len2);  
    108.         }  
    109.         
    110.       }  
    111.         
    112.       if (reipi==-1)  
    113.       { requestip=""; }  
    114.       else  
    115.       {  
    116.           requestip=value.toString().substring(reipi+12);  
    117.       int len2=requestip.indexOf(""");  
    118.       if(len2 <= 1)  
    119.         {  
    120.           requestip="";  
    121.         }  
    122.       else   
    123.         {  
    124.           requestip=requestip.substring(0,len2);  
    125.         }  
    126.         
    127.       }  
    128.      /* while (itr.hasMoreTokens()) { 
    129.           string tim; 
    130.            
    131.         word.set(itr.nextToken()); 
    132.         context.write(word, one); 
    133.       }*/  
    134.      if(imei!=""&&areacode!=""&&responsedata!=""&&requesttime!=""&&requestip!="")  
    135.      {  
    136.        String wd=new String();  
    137.        wd=imei+" "+areacode+" "+responsedata+" "+requesttime+" "+requestip;  
    138.        //wd="areacode|"+areacode +"|imei|"+ imei +"|responsedata|"+ responsedata +"|requesttime|"+ requesttime +"|requestip|"+ requestip;  
    139.        word.set(wd);  
    140.        context.write(word, one);  
    141.      }  
    142.   
    143.     }  
    144.   }  
    145.    
    146.   public static class IntSumReducer   
    147.        extends Reducer<Text,IntWritable,Text,IntWritable> {  
    148.     private IntWritable result = new IntWritable();  
    149.    
    150.     public void reduce(Text key, Iterable<IntWritable> values,   
    151.                        Context context  
    152.                        ) throws IOException, InterruptedException {  
    153.       int sum = 0;  
    154.       for (IntWritable val : values) {  
    155.         sum += val.get();  
    156.       }  
    157.       result.set(sum);  
    158.       context.write(key, result);  
    159.     }  
    160.   }  
    161.    
    162.   public static void main(String[] args) throws Exception {  
    163.     Configuration conf = new Configuration();  
    164.   //  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
    165.     String[] otherArgs=new String[]{"/Users/mac/tmp/inputmr","/Users/mac/tmp/output1"};  
    166.       
    167.     if (otherArgs.length != 2) {  
    168.       System.err.println("Usage: wordcount <in> <out>");  
    169.       System.exit(2);  
    170.     }  
    171.       
    172.     //Job job = new Job(conf, "word count");  
    173.     Job job = Job.getInstance(conf);  
    174.       
    175.     job.setJarByClass(WordCount.class);  
    176.     job.setMapperClass(TokenizerMapper.class);  
    177.     job.setCombinerClass(IntSumReducer.class);  
    178.     job.setReducerClass(IntSumReducer.class);  
    179.     job.setOutputKeyClass(Text.class);  
    180.     job.setOutputValueClass(IntWritable.class);  
    181.     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
    182.     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
    183.     System.exit(job.waitForCompletion(true) ? 0 : 1);  
    184.   }  
    185. }  

    主要以上除了jdk1.7其他的jar包都来自hadoop安装包中的share文件下下面

    如果你不知道那些包需要那就将sharehadoop下面的所以得jar包都添加到项目中

    注意:我的电脑是mac pro如果你的是Windows机器相关的路径需要修改一下,前面加上“file:///”( file:///D: mpinput file:///D: mpoutput)

     String[] otherArgs=new String[]{"file:///D: mpinput","file:///D: mpoutput"};
    这个程序核心代码都是在map中,主要做了系统日志中相关核心字段的提取并拼接以key形式返回给reduce,value都是设置为1,是为了方便以后的统计。因为是实例所以简单的弄了几个字段,实际可不止这些。

    下面给下测试的系统日志:

    [html] view plain copy
     
    1. 2016-04-18 16:00:00 {"areacode":"浙江省丽水市","countAll":0,"countCorrect":0,"datatime":"4134362","logid":"201604181600001184409476","requestinfo":"{"sign":"4","timestamp":"1460966390499","remark":"4","subjectPro":"123456","interfaceUserName":"12345678900987654321","channelno":"100","imei":"12345678900987654321","subjectNum":"13989589062","imsi":"12345678900987654321","queryNum":"13989589062"}","requestip":"36.16.128.234","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"}  
    2. 2016-04-18 16:00:00 {"areacode":"宁夏银川市","countAll":0,"countCorrect":0,"datatime":"4715990","logid":"201604181600001858043208","requestinfo":"{"sign":"4","timestamp":"1460966400120","remark":"4","subjectPro":"123456","interfaceUserName":"12345678900987654321","channelno":"1210","imei":"A0000044ABFD25","subjectNum":"15379681917","imsi":"460036951451601","queryNum":""}","requestip":"115.168.93.87","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果","userAgent":"ZTE-Me/Mobile"}  
    3. 2016-04-18 16:00:00 {"areacode":"黑龙江省哈尔滨市","countAll":0,"countCorrect":0,"datatime":"5369561","logid":"201604181600001068429609","requestinfo":"{"interfaceUserName":"12345678900987654321","queryNum":"","timestamp":"1460966400139","sign":"4","imsi":"460030301212545","imei":"35460207765269","subjectNum":"55588237","subjectPro":"123456","remark":"4","channelno":"2100"}","requestip":"42.184.41.180","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"}  
    4. 2016-04-18 16:00:00 {"areacode":"浙江省丽水市","countAll":0,"countCorrect":0,"datatime":"4003096","logid":"201604181600001648238807","requestinfo":"{"sign":"4","timestamp":"1460966391025","remark":"4","subjectPro":"123456","interfaceUserName":"12345678900987654321","channelno":"100","imei":"12345678900987654321","subjectNum":"13989589062","imsi":"12345678900987654321","queryNum":"13989589062"}","requestip":"36.16.128.234","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"}  
    5. 2016-04-18 16:00:00 {"areacode":"广西南宁市","countAll":0,"countCorrect":0,"datatime":"4047993","logid":"201604181600001570024205","requestinfo":"{"sign":"4","timestamp":"1460966382871","remark":"4","subjectPro":"123456","interfaceUserName":"12345678900987654321","channelno":"1006","imei":"A000004853168C","subjectNum":"07765232589","imsi":"460031210400007","queryNum":"13317810717"}","requestip":"219.159.72.3","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"}  
    6. 2016-04-18 16:00:00 {"areacode":"海南省五指山市","countAll":0,"countCorrect":0,"datatime":"5164117","logid":"201604181600001227842048","requestinfo":"{"sign":"4","timestamp":"1460966399159","remark":"4","subjectPro":"123456","interfaceUserName":"12345678900987654321","channelno":"1017","imei":"A000005543AFB7","subjectNum":"089836329061","imsi":"460036380954376","queryNum":"13389875751"}","requestip":"140.240.171.71","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"}  
    7. 2016-04-18 16:00:00 {"areacode":"山西省","countAll":0,"countCorrect":0,"datatime":"14075772","logid":"201604181600001284030648","requestinfo":"{"sign":"4","timestamp":"1460966400332","remark":"4","subjectPro":"123456","interfaceUserName":"12345678900987654321","channelno":"1006","imei":"A000004FE0218A","subjectNum":"03514043633","imsi":"460037471517070","queryNum":""}","requestip":"1.68.5.227","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"}  
    8. 2016-04-18 16:00:00 {"areacode":"四川省","countAll":0,"countCorrect":0,"datatime":"6270982","logid":"201604181600001173504863","requestinfo":"{"sign":"4","timestamp":"1460966398896","remark":"4","subjectPro":"123456","interfaceUserName":"12345678900987654321","channelno":"100","imei":"12345678900987654321","subjectNum":"13666231300","imsi":"12345678900987654321","queryNum":"13666231300"}","requestip":"182.144.66.97","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"}  
    9. 2016-04-18 16:00:00 {"areacode":"浙江省","countAll":0,"countCorrect":0,"datatime":"4198522","logid":"201604181600001390637240","requestinfo":"{"sign":"4","timestamp":"1460966399464","remark":"4","subjectPro":"123456","interfaceUserName":"12345678900987654321","channelno":"100","imei":"12345678900987654321","subjectNum":"05533876327","imsi":"12345678900987654321","queryNum":"05533876327"}","requestip":"36.23.9.49","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"000000","responsedata":"操作成功"}  
    10. 2016-04-18 16:00:00 {"areacode":"江苏省连云港市","countAll":0,"countCorrect":0,"datatime":"4408097","logid":"201604181600001249944032","requestinfo":"{"sign":"4","timestamp":"1460966395908","remark":"4","subjectPro":"123456","interfaceUserName":"12345678900987654321","channelno":"100","imei":"12345678900987654321","subjectNum":"18361451463","imsi":"12345678900987654321","queryNum":"18361451463"}","requestip":"58.223.4.210","requesttime":"2016-04-18 16:00:00","requesttype":"0","responsecode":"010005","responsedata":"无查询结果"}  


    最后给出运行结果截图:

  • 相关阅读:
    关于 haproxy keepalived的测试
    关于 tornado.simple_httpclient SimpleAsyncHTTPClient fetch下载大文件,默认60s的问题
    Linux系统性能监控工具介绍之-tsar
    linux 系统监控好文
    python中字符串使用需要注意的地方
    如何搭建一个GitHub在自己的服务器上?
    linux使用FIO测试磁盘的iops
    适合编程学习的网站
    linux swap的添加等等
    redis主从复制原理与优化
  • 原文地址:https://www.cnblogs.com/gaochsh/p/7803090.html
Copyright © 2011-2022 走看看