zoukankan      html  css  js  c++  java
  • Mapreuduce实现网络数据包的清洗工作

    处理后的数据可直接放到hive或者mapreduce程序来统计网络数据流的信息,比如当前实现的是比较简单的http的Get请求的统计

    第一个mapreduce:将时间、十六进制包头信息提取出来,并放在一行(这里涉及到mapreduce的键值对的对多行的特殊处理,是个值得注意的地方)

    主要遇到两个问题:

      一个数据包包含时间,包头的简单信息,包头的详细信息,初衷是想要把一个数据包的时间、包十六进制详细信息(存在于很多行里)按照顺序放置到一行,在java里面按行读取,很好实现。

    针对mapreduce的键值对处理的特性,原来想到有两种方式解决:

    (1)以时间的key值为准,一个包的信息key值与其相同

    但MR的map每次只处理一行信息,而reduce只对键相同的行做处理,而且从map阶段到reduce的过程中有一个shuffle、sort阶段(估计是这个原因,也可能是因为离reduce近的机器处理完直接发给reduce,先到先处理),相同的key的value是乱序的。

    (2)所有的key值递增

    这样就没有相同的key值,无法放置到一行

    最后的解决办法:

    (3)以时间的key值为准,同一个包的信息的key值与其相同,但在十六进制行里加一个递增的id,放置到一行,虽然是乱序的,但自带ID,就重新排一下就好啦,妙!

    第二个mapreduce: 对十六进制信息进行排序,是第一个mapreduce的补充,至此,清洗工作完毕,可以统计任意位置的十六进制来分析数据

    第三个mapreduce:统计http发送的GET请求个数

    static int id=1;
    	static int hexId=1;
      public static class TokenizerMapper 
           extends Mapper<Object, Text, IntWritable, Text>
     {
        private final static IntWritable one = new IntWritable(2);
        private Text word = new Text();
          
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException
        {
        	//匹配时间
    	 	String regexTime = "([0-2][0-4]):([0-5][0-9]):([0-5][0-9]).[0-9]{6}";// 11:08:56.149361
    		Pattern patternTime = Pattern.compile(regexTime);
    		Matcher matchTime = patternTime.matcher(value.toString());
    		while (matchTime.find()) {
    			String time ="time: " + matchTime.group()+" ";
    			id=id+1;
    			word.set(time);
    			one.set(id);
    			context.write(one, word);
    		}
    		//匹配十六进制
    //		String regexHex = "0x[0-9]{4}:  ([A-Za-z0-9]{4} )+";
    		String regexHex = " ([A-Za-z0-9]{4} )+";
    		Pattern patternHex = Pattern.compile(regexHex);
    		Matcher matchHex = patternHex.matcher(value.toString());
    		while (matchHex.find()) {
    			String hex = " "+ matchHex.group();
    			hexId=hexId+1; 
    			hex="id:"+String.valueOf(hexId)+" "+hex;
    			word.set(hex);
    			one.set(id);
    			context.write(one, word);
    		}
        }
      }
      
      public static class IntSumReducer 
           extends Reducer<IntWritable,Text,IntWritable,Text> 
    {
        private Text result = new Text();
    
        public void reduce(IntWritable key, Iterable<Text> values, 
                           Context context
                           ) throws IOException, InterruptedException
      {
          String sum = "";
          for (Text val : values) 
            {
              sum += val.toString();
             }
          result.set(sum);
          context.write(key, result);
        }
      }
    

      

    public static class TokenizerMapper 
           extends Mapper<Object, Text, Text, Text>
     {
        private final static Text one = new Text();
        private Text word = new Text();
          
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException
        {
        	//匹配时间
    	 	String regexTime = "time: ([0-2][0-4]):([0-5][0-9]):([0-5][0-9]).[0-9]{6}";// 11:08:56.149361
    		Pattern patternTime = Pattern.compile(regexTime);
    		Matcher matchTime = patternTime.matcher(value.toString());
    		while (matchTime.find()) {
    //			String time ="time: " + matchTime.group()+" ";
    			String temptime =matchTime.group();
    			String time =temptime.substring(6, temptime.length()-1);
    			one.set(time);
    		}
    		
    		//排序十六进制
    //		String regexHex = "0x[0-9]{4}:  ([A-Za-z0-9]{4} )+";
    		List<Bar> list = new ArrayList<Bar>();
    		String regexHex = "id:([0-9])+   ([A-Za-z0-9]{4} )+";
    		Pattern patternHex = Pattern.compile(regexHex);
    		Matcher matchHex = patternHex.matcher(value.toString());
    		while (matchHex.find()) {
    			Bar bar = new Bar();
    			String hexline = matchHex.group();
    			String regexHex2 ="id:([0-9])+"; //一行十六进制的序号
    			Pattern patternHex2 = Pattern.compile(regexHex2);
    			Matcher matchHex2 = patternHex2.matcher(hexline);
    			while (matchHex2.find()) {
    				String lineId=matchHex2.group().toString().substring(3);
    				bar.setId(lineId);
    			}
    			String regexHex3 ="([A-Za-z0-9]{4} )+"; //一行十六进制
    			Pattern patternHex3 = Pattern.compile(regexHex3);
    			Matcher matchHex3 = patternHex3.matcher(hexline);
    			while (matchHex3.find()) {
    				String lineHex= matchHex3.group().toString();
    				bar.setHexValue(lineHex);
    			}
    			list.add(bar);
    		}
    		
    		StringBuffer buffer = new StringBuffer("");
    		 Collections.sort(list);
    		for(int i=0;i<list.size();i++){
    			Bar bar=list.get(i);
    			String lineHex=bar.getHexValue();
    			buffer.append(lineHex);
    		}
    		String hexOne= buffer.toString();
    		
    		word.set(hexOne);
    		context.write(one, word);
        }
      }
      
      public static class IntSumReducer 
           extends Reducer<Text,Text,Text,Text> 
    {
        private Text result = new Text();
    
        public void reduce(Text key, Iterable<Text> values, 
                           Context context
                           ) throws IOException, InterruptedException
      {
          String sum = "";
          for (Text val : values) 
            {
        	  context.write(key, val);
             }
        }
      }
    

      

    	public static class TokenizerMapper extends
    			Mapper<Object, Text, Text, IntWritable> {
    		private final static IntWritable one = new IntWritable(1);
    		private Text word = new Text("sumGet");
    
    		public void map(Object key, Text value, Context context)
    				throws IOException, InterruptedException {
    			int timelen=15;
    			int getlen=20*5+timelen;
    			String strline=value.toString();
    			if (strline.length() > getlen) {// ||hexValue[20].equals("4854")
    				String getPos=strline.substring(timelen+20*5,timelen+21*5-1);
    				 if(getPos.equals("4745")){
    					 context.write(word, one);
    				 }
    			}
    		}
    	}
    
    	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    		private IntWritable result = new IntWritable();
    
    		public void reduce(Text key, Iterable<IntWritable> values, Context context)
    				throws IOException, InterruptedException {
    			int sum =0;
    			for (IntWritable val : values) {
    				sum+=val.get();
    			}
    			result.set(sum);
    			context.write(key, result);
    		}
    	}
    

      

  • 相关阅读:
    第六课 3. 外部表
    第六课 2 物化视图
    第六课 1.当有数据文件被误删除时如何恢复
    SQL常用(通用)操作_01
    SQL规范
    C# foreach和for比较
    C# 装箱与拆箱
    C#面向对象笔记
    winform防止输入法对扫码的干扰
    GIT安装包备用地址
  • 原文地址:https://www.cnblogs.com/kxdblog/p/4209068.html
Copyright © 2011-2022 走看看