zoukankan      html  css  js  c++  java
  • Hadoop日志文件分析系统

                                                                         Hadoop日志分析系统

    项目需求:

       需要统计一下线上日志中某些信息每天出现的频率,举个简单的例子,统计线上每天的请求总数和异常请求数。线上大概几十台

    服务器,每台服务器大概每天产生4到5G左右的日志,假设有30台,每台5G的,一天产生的日志总量为150G。

    处理方案:

       方案1:传统的处理方式,写个JAVA日志分析代码,部署到每台服务器进行处理,这种方式部署起来耗时费力,又不好维护。

       方案2:采用Hadoop分布式处理,日志分析是Hadoop集群系统的拿手好戏。150G每天的日志也算是比较大的数据量了,搭个简

    单的Hadoop集群来处理这些日志是再好不过的了。

    Hadoop集群的搭建:

          参见这两篇文章:http://www.cnblogs.com/cstar/archive/2012/12/16/2820209.html

    http://www.cnblogs.com/cstar/archive/2012/12/16/2820220.html

    我们这里的集群就采用了两台机器,配置每台8核,32G内存,500G磁盘空间。

    日志准备工作:

         由于日志分散在各个服务器,所以我们先需要将所有的日志拷贝到我们的集群系统当中,这个可以通过linux服务器下rsync或者scp

    服务来执行。这里我们通过scp服务来拷贝,由于都是内网的机器,所以拷贝几个G的日志可以很快就完成。下面是拷贝日志的脚本,脚本

    还是有一些需要注意的地方,我们只需要拷贝前一天的数据,实际保存的数据可能是好几天的,所以我们只要把我们需要的这一天的数据

    SCP过去就可以了。

    #!/bin/sh
    workdir=/home/myproj/bin/log/
    files=`ls $workdir`
    pre1date=`date  +"%Y%m%d" -d  "-1 days"`
    pre1date1=`date  +"%Y-%m-%d" -d  "-1 days"`
    curdate=`date  +"%Y%m%d"`
    hostname=`uname -n`
    echo $pre1date $curdate
    uploadpath="/home/hadoop/hadoop/mytest/log/"$pre1date1"/"$hostname
    echo $uploadpath
    cd $workdir
    mintime=240000
    secondmintime=0
    for file in $files;do
      filedate=`stat $file | grep Modify| awk '{print $2}' |sed -e 's/-//g'`
      filetime=`stat $file | grep Modify| awk '{print $3}' |cut -d"." -f1 | sed -e 's/://g'| sed 's/^0+//'`
      if [ $filedate -eq $curdate ]; then
       if [ $filetime -lt $mintime ]; then
            secondmintime=$mintime
    	mintime=$filetime
       fi
      fi
    done
    echo "mintime:"$mintime
    step=1000
    mintime=`expr $mintime + $step`
    echo "mintime+1000:"$mintime
    for file in $files;do
      filedate=`stat $file | grep Modify| awk '{print $2}' |sed -e 's/-//g'`
      filetime=`stat $file | grep Modify| awk '{print $3}' |cut -d"." -f1 | sed -e 's/://g'| sed 's/^0+//'`
      filename=`echo $file | cut -c 1-8`
      startchars="info.log"
      #echo $filename
      if [ $filename == $startchars ]; then
        if [ $filedate -eq $pre1date ]; then
         scp -rp $file dir@antix2:$uploadpath
         #echo $file
        elif [ $filedate -eq $curdate ]; then
          if [ $filetime -lt $mintime ]; then
            scp -rp $file dir@antix2:$uploadpath
            #echo $file
          fi
        fi
      fi
      #echo $filedate $filetime
    done
    

    MapReduce代码

       接下来就是编写MapReduce的代码了。使用Eclipse环境来编写,需要安装hadoop插件,我们hadoop机器采用的是1.1.1版本,所以插

    件使用hadoop-eclipse-plugin-1.1.1.jar,将插件拷贝到eclipse的plugins目录下就可以了。然后新建一个MapReduce项目:

     

    工程新建好了然后我们就可以编写我们的MapReduce代码了。

    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    
    public class LogAnalysis {
    
        public static class LogMapper 
        extends Mapper<LongWritable, Text, Text, IntWritable>{
    
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
            private Text hourWord = new Text();
            public void map(LongWritable key, Text value, Context context
                    ) throws IOException, InterruptedException {
                String line = value.toString(); 
                SimpleDateFormat formatter2 = new SimpleDateFormat("yy-MM-dd");
                java.util.Date d1 =new Date();
                d1.setTime(System.currentTimeMillis()-1*24*3600*1000);
                String strDate =formatter2.format(d1);
                if(line.contains(strDate)){
                     String[] strArr = line.split(",");
                     int len = strArr[0].length();
                     String time = strArr[0].substring(1,len-1);
                   
                     String[] timeArr = time.split(":");
                     String strHour = timeArr[0];
                     String hour = strHour.substring(strHour.length()-2,strHour.length());
                     String hourKey = "";
                    if(line.contains("StartASocket")){
                        word.set("SocketCount");
                        context.write(word, one);
                        hourKey = "SocketCount:" + hour;
                        hourWord.set(hourKey);
                        context.write(hourWord, one);
                        word.clear();
                        hourWord.clear();
                    }
                    if(line.contains("SocketException")){
                        word.set("SocketExceptionCount");
                        context.write(word, one);
                        hourKey = "SocketExceptionCount:" + hour;
                        hourWord.set(hourKey);
                        context.write(hourWord, one);
                        word.clear();
                        hourWord.clear();
                    }
                              
    } }
    public static class LogReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static int run(String[] args) throws Exception{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: loganalysis <in> <out>"); System.exit(2); } FileSystem fileSys = FileSystem.get(conf); String inputPath = "input/" + args[0]; fileSys.copyFromLocalFile(new Path(args[0]), new Path(inputPath));//将本地文件系统的文件拷贝到HDFS中 Job job = new Job(conf, "loganalysis"); job.setJarByClass(LogAnalysis.class); job.setMapperClass(LogMapper.class); job.setCombinerClass(LogReducer.class); job.setReducerClass(LogReducer.class); // 设置输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); Date startTime = new Date(); System.out.println("Job started: " + startTime); int ret = job.waitForCompletion(true)? 0 : 1; fileSys.copyToLocalFile(new Path(otherArgs[1]), new Path(otherArgs[1])); fileSys.delete(new Path(inputPath), true); fileSys.delete(new Path(otherArgs[1]), true); Date end_time = new Date(); System.out.println("Job ended: " + end_time); System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); return ret; } public static void main(String[] args) { try { int ret = run(args); System.exit(ret); } catch (Exception e) { e.printStackTrace(); System.out.println(e.getMessage()); } } }

    部署到Hadoop集群:

           代码完成后测试没有问题后,部署到集群当中去执行,我们有几十台服务器,所以每台的服务器的日志当成一个任务来执行。

    workdir="/home/hadoop/hadoop/mytest"
    cd $workdir
    pre1date=`date  +"%Y-%m-%d" -d  "-1 days"`
    servers=(mach1 mach2 mach3 )
    for i in ${servers[@]};do
      inputPath="log/"$pre1date"/"$i
      outputPath="output/log/"$pre1date"/"$i
      echo $inputPath $outputPath
      echo "start job "$i" date:"`date`
      hadoop jar  LogAnalysis.jar loganalysis $inputPath $outputPath
      echo "end job "$i" date:"`date`
    done
  • 相关阅读:
    java 变量的定义 类型转换 基本的数据类型
    Java中的String,StringBuilder,StringBuffer三者的区别?
    Linux配置 ElasticSearch
    Linux 配置 SVN and ideal 配置SVN的客户端 ?
    mysql5.7多实例安装
    MySQL高可用架构之MySQL5.7组复制MGR
    二进制安装MySQL5.6 MySQL5.7
    MySQL主从复制之半同步模式
    MySQL主从复制之异步模式
    基于GTID模式MySQL主从复制
  • 原文地址:https://www.cnblogs.com/cstar/p/3189084.html
Copyright © 2011-2022 走看看