zoukankan      html  css  js  c++  java
  • Learning storm book 笔记8-Log Processing With Storm

    有代码的书籍看起来就是爽,看完顺便跑个demo,感觉很爽!

    场景分析

    主要是利用apache的访问日志来进行分析统计
    如用户的IP来源,来自哪个国家或地区,用户使用的Os,浏览器等信息,以及像搜索的热词等信息的统计
    这里日志信息如下

    24.25.135.19 - - [1-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"
    

    这里为了后面的keyword关键词不为空,增加了name=qq

    180.183.50.208 - - [1-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html?name=qq" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"
    

    这里主要涉及到这几个属性

    ip ,ip来源 180.183.50.208

    dateTime 访问时间 2011:06:20:31

    request 请求类型 GET / HTTP/1.1

    response 相应状态200

    bytesSent 864

    referrer http://www.adeveloper.com/resource.html

    useragent Mozilla/5.0xxxxx.....

    country

    browser Firefox

    os Windows

    keyword qq

    主要流程

    文中使用file->kafka-->storm-->mysql的模式

    这里不让kafka直接读文件了,改了下从flume获取数据,还是使用典型的:

    flume—>kafka-->storm-->mysql 这一套来做

    环境要求zookeeper,kafka,mysql,flume,storm,其实跑demo本地模式完全不需要storm的

    apache-flume-1.4.0-bin.tar.gz
    kafka_2.8.0-0.8.0.tar.gz 
    zookeeper-3.4.5-cdh4.4.0.tar.gz
    Storm 
    

    首先是建立数据库

    create table apachelog(
          id INT NOT NULL AUTO_INCREMENT,
          ip VARCHAR(100) NOT NULL,
          dateTime VARCHAR(200) NOT NULL,
          request VARCHAR(100) NOT NULL,
          response VARCHAR(200) NOT NULL,
          bytesSent VARCHAR(200) NOT NULL,
          referrer VARCHAR(500) NOT NULL,
          useragent VARCHAR(500) NOT NULL,
          country VARCHAR(200) NOT NULL,
          browser VARCHAR(200) NOT NULL,
          os VARCHAR(200) NOT NULL,
          keyword VARCHAR(200) NOT NULL,
          PRIMARY KEY (id)
    );
    

    然后,flume kafka producer的配置
    cat conf/producer2.properties

    #agent section
    producer.sources = s
    producer.channels = c
    producer.sinks = r
    
    #source section
    producer.sources.s.type = exec
    producer.sources.s.command = tail -F /data/apache.log
    producer.sources.s.channels = c
    
    # Each sink's type must be defined
    producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
    producer.sinks.r.metadata.broker.list=127.0.0.1:9092
    producer.sinks.r.partition.key=0
    producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
    producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
    producer.sinks.r.request.required.acks=0
    producer.sinks.r.max.message.size=1000000
    producer.sinks.r.producer.type=sync
    producer.sinks.r.custom.encoding=UTF-8
    producer.sinks.r.custom.topic.name=kafkaToptic
    
    #Specify the channel the sink should use
    producer.sinks.r.channel = c
    
    # Each channel's type is defined.
    producer.channels.c.type = memory
    producer.channels.c.capacity = 1000
    

    这里数据准备可以参考kafka-producer项目下的apache_test.log,这里为了给keyword创造点数据,特意加上了?name=qq

    echo '202.27.9.1 - - [2-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html?name=qq" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"'>>/data/apache.log
    

    数据准备ok
    启动zookeeper,启动kafka,

    cd kafka_2.8.0-0.8.0
    bin/kafka-server-start.sh config/server.properties 
    

    启动flume

    bin/flume-ng agent --conf conf  --conf-file conf/producer2.properties  --name producer -Dflume.root.logger=INFO,console
    

    启动flume之后可以自己造点数据了

    echo '202.27.9.1 - - [2-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html?name=qq" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"'>>/data/apache.log
    

    到此数据准备完成,下面是Topology
    主类:LogProcessingTopology

    import storm.kafka.KafkaSpout;
    import storm.kafka.SpoutConfig;
    import storm.kafka.StringScheme;
    import storm.kafka.ZkHosts;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.spout.SchemeAsMultiScheme;
    import backtype.storm.topology.TopologyBuilder;
    
    public class LogProcessingTopology {
    	public static void main(String[] args) throws Exception {
    
    		ZkHosts zkHosts = new ZkHosts("192.168.137.10:2181");
    		SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "kafkaToptic", "","id");
    		
    		kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    		//每次都从头开始,额呵呵!
    		kafkaConfig.forceFromStart = true;
    		TopologyBuilder builder = new TopologyBuilder();
    
    		builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);
    
    		builder.setBolt("LogSplitter", new ApacheLogSplitterBolt(), 1)
    				.globalGrouping("KafkaSpout");
    		builder.setBolt("IpToCountry",new UserInformationGetterBolt(args[0]), 1)
    				.globalGrouping("LogSplitter");
    		builder.setBolt("Keyword", new KeyWordIdentifierBolt(), 1)
    				.globalGrouping("IpToCountry");
    		builder.setBolt("PersistenceBolt",new PersistenceBolt(args[1], args[2], args[3], args[4]),
    				1).globalGrouping("Keyword");
    		if (args.length == 6) {
    			// Run the topology on remote cluster.
    			Config conf = new Config();
    			conf.setNumWorkers(4);
    			try {
    				StormSubmitter.submitTopology(args[4], conf,
    						builder.createTopology());
    			} catch (AlreadyAliveException alreadyAliveException) {
    				System.out.println(alreadyAliveException);
    			} catch (InvalidTopologyException invalidTopologyException) {
    				System.out.println(invalidTopologyException);
    			}
    		} else {
    			// in local mode.
    			LocalCluster cluster = new LocalCluster();
    			Config conf = new Config();
    			cluster.submitTopology("KafkaToplogy", conf,builder.createTopology());
    			try {
    				System.out.println("**********************Waiting to consume from kafka");
    				Thread.sleep(10000);
    			} catch (Exception exception) {
    				System.out.println("******************Thread interrupted exception : "+ exception);
    			}
    			cluster.killTopology("KafkaToplogy");
    
    			cluster.shutdown();
    
    		}
    
    	}
    }
    

    这里有KafkaSpout,以及ApacheLogSplitterBolt,UserInformationGetterBolt,KeyWordI

    dentifierBolt,PersistenceBolt,开着类名其实就知道是干啥的了,KafkaSpout必然是从

    kafka获取数据了,ApacheLogSplitterBolt用来split日志,UserInformationGetterBolt是

    用户信息相关,KeyWordIdentifierBolt关键词,热词,PersistenceBolt最后一个自然是讲

    数据写入mysql

    在来看看各个类的实现

    ApacheLogSplitterBolt就是通过表达式来从日志中匹配我们要得东西,主要还是看ApacheLogSplitter

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.commons.lang.StringUtils;
    
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    
    public class ApacheLogSplitterBolt extends BaseBasicBolt {
    
    	private static final long serialVersionUID = 1L;
    	private static final ApacheLogSplitter apacheLogSplitter = new ApacheLogSplitter();
    	private static final List<String> LOG_ELEMENTS = new ArrayList<String>();
    	static {
    		LOG_ELEMENTS.add("ip");
    		LOG_ELEMENTS.add("dateTime");
    		LOG_ELEMENTS.add("request");
    		LOG_ELEMENTS.add("response");
    		LOG_ELEMENTS.add("bytesSent");
    		LOG_ELEMENTS.add("referrer");
    		LOG_ELEMENTS.add("useragent");
    	}
    
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		String log = input.getString(0);
    
    		if (StringUtils.isBlank(log)||log.equals("xxxx")) {
    			return;
    		}
    		Map<String, Object> logMap = apacheLogSplitter.logSplitter(log);
    		List<Object> logdata = new ArrayList<Object>();
    		for (String element : LOG_ELEMENTS) {
    			logdata.add(logMap.get(element));
    		}
    		collector.emit(logdata);
    
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("ip", "dateTime", "request", "response","bytesSent", "referrer", "useragent"));
    	}
    }
    

    ApacheLogSplitter类,负责日志split。

    public class ApacheLogSplitter {
    
    	public Map<String, Object> logSplitter(String apacheLog) {
    
    		String logEntryLine = apacheLog;
    		String logEntryPattern = "^([\d.]+) (\S+) (\S+) \[([\w-:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) (\d+) "([^"]+)" "([^"]+)"";
    
    		Pattern p = Pattern.compile(logEntryPattern);
    		Matcher matcher = p.matcher(logEntryLine);
    		Map<String, Object> logMap = new HashMap<String, Object>();
    		if (!matcher.matches() || 9 != matcher.groupCount()) {
    			System.err.println("Bad log entry (or problem with RE?):");
    			System.err.println(logEntryLine);
    			return logMap;
    		}
    		// set the ip, dateTime, request, etc into map.
    		logMap.put("ip", matcher.group(1));
    		logMap.put("dateTime", matcher.group(4));
    		logMap.put("request", matcher.group(5));
    		logMap.put("response", matcher.group(6));
    		logMap.put("bytesSent", matcher.group(7));
    		logMap.put("referrer", matcher.group(8));
    		System.out.println("#######"+matcher.group(8));
    		logMap.put("useragent", matcher.group(9));
    		return logMap;
    	}
    }
    

    UserInformationGetterBolt这个做的事有点多,主要是从ip到country的,以及os,浏览器的的定位

    package com.learningstorm.stormlogprocessing;
    
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    /**
     * This class use the IpToCountryConverter and UserAgentTools class to calculate
     * the country, os and browser from log line.
     * 
     */
    public class UserInformationGetterBolt extends BaseRichBolt {
    
    	private static final long serialVersionUID = 1L;
    	private IpToCountryConverter ipToCountryConverter = null;
    	private UserAgentTools userAgentTools = null;
    	public OutputCollector collector;
    	private String pathTOGeoLiteCityFile;
    
    	public UserInformationGetterBolt(String pathTOGeoLiteCityFile) {
    		this.pathTOGeoLiteCityFile = pathTOGeoLiteCityFile;
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("ip", "dateTime", "request", "response",
    				"bytesSent", "referrer", "useragent", "country", "browser",
    				"os"));
    	}
    
    	public void prepare(Map stormConf, TopologyContext context,
    			OutputCollector collector) {
    		this.collector = collector;
    		this.ipToCountryConverter = new IpToCountryConverter(this.pathTOGeoLiteCityFile);
    		this.userAgentTools = new UserAgentTools();
    
    	}
    
    	public void execute(Tuple input) {
    
    		String ip = input.getStringByField("ip").toString();
    		
    		// calculate the country from ip
    		Object country = ipToCountryConverter.ipToCountry(ip);
    		// calculate the browser from useragent.
    		Object browser = userAgentTools.getBrowser(input.getStringByField(
    				"useragent").toString())[1];
    		// calculate the os from useragent.
    		Object os = userAgentTools.getOS(input.getStringByField("useragent")
    				.toString())[1];
    		collector.emit(new Values(input.getString(0), input.getString(1), input
    				.getString(2), input.getString(3), input.getString(4), input
    				.getString(5), input.getString(6), country, browser, os));
    
    	}
    }
    

    ip到country使用的是GeoIP包得LookupService

    package com.learningstorm.stormlogprocessing;
    
    import com.maxmind.geoip.Location;
    import com.maxmind.geoip.LookupService;
    
    /**
     * This class contains logic to calculate the country name from IP address
     * 
     */
    public class IpToCountryConverter {
    
    	private static LookupService cl = null;
    
    	/**
    	 * An parameterised constructor which would take the location of
    	 * GeoLiteCity.dat file as input.
    	 * 
    	 * @param pathTOGeoLiteCityFile
    	 */
    	public IpToCountryConverter(String pathTOGeoLiteCityFile) {
    		try {
    			cl = new LookupService(pathTOGeoLiteCityFile,
    					LookupService.GEOIP_MEMORY_CACHE);
    		} catch (Exception e) {
    			throw new RuntimeException(
    					"Error occurred while initializing IpToCountryConverter class : "+e.getMessage());
    		}
    	}
    
    	/**
    	 * This method takes ip address an input and convert it into country name.
    	 * 
    	 * @param ip
    	 * @return
    	 */
    	public String ipToCountry(String ip) {
    		Location location = cl.getLocation(ip);
    		if (location == null) {
    			return "NA";
    		}
    		if (location.countryName == null) {
    			return "NA";
    		}
    		return location.countryName;
    	}
    }
    

    用户,浏览器,终端这种慢慢看吧,太废劲!

    package com.learningstorm.stormlogprocessing;
    
    public class UserAgentTools {
        
    	  public  String getFirstVersionNumber(String a_userAgent, int a_position, int numDigits) {
    	    String ver = getVersionNumber(a_userAgent, a_position);
    	    if (ver==null) return "";
    	    int i = 0;
    	    String res="";
    	    while (i<ver.length() && i<numDigits) {
    	      res+=String.valueOf(ver.charAt(i));
    	      i++;
    	    }
    	    return res;
    	  }
    	  public  String getVersionNumber(String a_userAgent, int a_position) {
    	      if (a_position<0) return "";
    	      StringBuffer res = new StringBuffer();
    	      int status = 0;
    	      
    	      while (a_position < a_userAgent.length()) {
    	          char c = a_userAgent.charAt(a_position);
    	          switch (status) {
    	            case 0: //<SPAN class="codecomment"> No valid digits encountered yet</span>
    	              if (c == ' ' || c=='/') break;
    	              if (c == ';' || c==')') return "";
    	              status = 1;
    	            case 1: //<SPAN class="codecomment"> Version number in progress</span>
    	              if (c == ';' || c=='/' || c==')' || c=='(' || c=='[') return res.toString().trim();
    	              if (c == ' ') status = 2;
    	              res.append(c);
    	              break;
    	            case 2: //<SPAN class="codecomment"> Space encountered - Might need to end the parsing</span>
    	              if ((Character.isLetter(c) && 
    	                   Character.isLowerCase(c)) ||
    	                  Character.isDigit(c)) {
    	                  res.append(c);
    	                  status=1;
    	              } else
    	                  return res.toString().trim();
    	              break;
    	          }
    	          a_position++;
    	      }
    	      return res.toString().trim();
    	  }
    
    	  public  String[]getArray(String a, String b, String c) {
    	    String[]res = new String[3];
    	    res[0]=a;
    	    res[1]=b;
    	    res[2]=c;
    	    return res;
    	  }
    
    	  public  String[] getBotName(String userAgent) {
    	    userAgent = userAgent.toLowerCase();
    	    int pos=0;
    	    String res=null;
    	    if ((pos=userAgent.indexOf("help.yahoo.com/"))>-1) {
    	        res= "Yahoo";
    	        pos+=7;
    	    } else
    	    if ((pos=userAgent.indexOf("google/"))>-1) {
    	        res= "Google";
    	        pos+=7;
    	    } else
    	    if ((pos=userAgent.indexOf("msnbot/"))>-1) {
    	        res= "MSNBot";
    	        pos+=7;
    	    } else
    	    if ((pos=userAgent.indexOf("googlebot/"))>-1) {
    	        res= "Google";
    	        pos+=10;
    	    } else
    	    if ((pos=userAgent.indexOf("webcrawler/"))>-1) {
    	        res= "WebCrawler";
    	        pos+=11;
    	    } else
    	    //<SPAN class="codecomment"> The following two bots don't have any version number in their User-Agent strings.</span>
    	    if ((pos=userAgent.indexOf("inktomi"))>-1) {
    	        res= "Inktomi";
    	        pos=-1;
    	    } else
    	    if ((pos=userAgent.indexOf("teoma"))>-1) {
    	        res= "Teoma";
    	        pos=-1;
    	    }
    	    if (res==null) return null;
    	    return getArray(res,res,res + getVersionNumber(userAgent,pos));
    	  }
    	  
    	  
    	  public  String[] getOS(String userAgent) {
    	    if (getBotName(userAgent)!=null) return getArray("Bot","Bot","Bot");
    	    String[]res = null;
    	    int pos;
    	    if ((pos=userAgent.indexOf("Windows-NT"))>-1) {
    	        res = getArray("Win","WinNT","Win"+getVersionNumber(userAgent,pos+8));
    	    } else
    	    if (userAgent.indexOf("Windows NT")>-1) {
    	        //<SPAN class="codecomment"> The different versions of Windows NT are decoded in the verbosity level 2</span>
    	        //<SPAN class="codecomment"> ie: Windows NT 5.1 = Windows XP</span>
    	        if ((pos=userAgent.indexOf("Windows NT 5.1"))>-1) {
    	            res = getArray("Win","WinXP","Win"+getVersionNumber(userAgent,pos+7));
    	        } else
    	        if ((pos=userAgent.indexOf("Windows NT 6.0"))>-1) {
    	            res = getArray("Win","Vista","Vista"+getVersionNumber(userAgent,pos+7));
    	        } else
    	        if ((pos=userAgent.indexOf("Windows NT 6.1"))>-1) {
    	            res = getArray("Win","Seven","Seven "+getVersionNumber(userAgent,pos+7));
    	        } else
    	        if ((pos=userAgent.indexOf("Windows NT 5.0"))>-1) {
    	            res = getArray("Win","Win2000","Win"+getVersionNumber(userAgent,pos+7));
    	        } else
    	        if ((pos=userAgent.indexOf("Windows NT 5.2"))>-1) {
    	            res = getArray("Win","Win2003","Win"+getVersionNumber(userAgent,pos+7));
    	        } else
    	        if ((pos=userAgent.indexOf("Windows NT 4.0"))>-1) {
    	            res = getArray("Win","WinNT4","Win"+getVersionNumber(userAgent,pos+7));
    	        } else
    	        if ((pos=userAgent.indexOf("Windows NT)"))>-1) {
    	            res = getArray("Win","WinNT","WinNT");
    	        } else
    	        if ((pos=userAgent.indexOf("Windows NT;"))>-1) {
    	            res = getArray("Win","WinNT","WinNT");
    	        } else
    	        res = getArray("Win","<B>WinNT?</B>","<B>WinNT?</B>");
    	    } else
    	    if (userAgent.indexOf("Win")>-1) {
    	        if (userAgent.indexOf("Windows")>-1) {
    	            if ((pos=userAgent.indexOf("Windows 98"))>-1) {
    	                res = getArray("Win","Win98","Win"+getVersionNumber(userAgent,pos+7));
    	            } else
    	            if ((pos=userAgent.indexOf("Windows_98"))>-1) {
    	                res = getArray("Win","Win98","Win"+getVersionNumber(userAgent,pos+8));
    	            } else
    	            if ((pos=userAgent.indexOf("Windows 2000"))>-1) {
    	                res = getArray("Win","Win2000","Win"+getVersionNumber(userAgent,pos+7));
    	            } else
    	            if ((pos=userAgent.indexOf("Windows 95"))>-1) {
    	                res = getArray("Win","Win95","Win"+getVersionNumber(userAgent,pos+7));
    	            } else
    	            if ((pos=userAgent.indexOf("Windows 9x"))>-1) {
    	                res = getArray("Win","Win9x","Win"+getVersionNumber(userAgent,pos+7));
    	            } else
    	            if ((pos=userAgent.indexOf("Windows ME"))>-1) {
    	                res = getArray("Win","WinME","Win"+getVersionNumber(userAgent,pos+7));
    	            } else
    	            if ((pos=userAgent.indexOf("Windows CE;"))>-1) {
    	                res = getArray("Win","WinCE","WinCE");
    	            } else
    	            if ((pos=userAgent.indexOf("Windows 3.1"))>-1) {
    	                res = getArray("Win","Win31","Win"+getVersionNumber(userAgent,pos+7));
    	            }
    
    	        }
    	        if (res == null) {
    	            if ((pos=userAgent.indexOf("Win98"))>-1) {
    	                res = getArray("Win","Win98","Win"+getVersionNumber(userAgent,pos+3));
    	            } else
    	            if ((pos=userAgent.indexOf("Win31"))>-1) {
    	                res = getArray("Win","Win31","Win"+getVersionNumber(userAgent,pos+3));
    	            } else
    	            if ((pos=userAgent.indexOf("Win95"))>-1) {
    	                res = getArray("Win","Win95","Win"+getVersionNumber(userAgent,pos+3));
    	            } else
    	            if ((pos=userAgent.indexOf("Win 9x"))>-1) {
    	                res = getArray("Win","Win9x","Win"+getVersionNumber(userAgent,pos+3));
    	            } else
    	            if ((pos=userAgent.indexOf("WinNT4.0"))>-1) {
    	                res = getArray("Win","WinNT4","Win"+getVersionNumber(userAgent,pos+3));
    	            } else
    	            if ((pos=userAgent.indexOf("WinNT"))>-1) {
    	                res = getArray("Win","WinNT","Win"+getVersionNumber(userAgent,pos+3));
    	            }
    	        }
    	        if (res == null) {
    	            if ((pos=userAgent.indexOf("Windows"))>-1) {
    	              res = getArray("Win","<B>Win?</B>","<B>Win?"+getVersionNumber(userAgent,pos+7)+"</B>");
    	            } else
    	            if ((pos=userAgent.indexOf("Win"))>-1) {
    	              res = getArray("Win","<B>Win?</B>","<B>Win?"+getVersionNumber(userAgent,pos+3)+"</B>");
    	            } else
    	              res = getArray("Win","<B>Win?</B>","<B>Win?</B>");
    	        }
    	    } else
    	    if ((pos=userAgent.indexOf("Mac OS X"))>-1) {
    	        if ((userAgent.indexOf("iPhone"))>-1) {
    	            pos = userAgent.indexOf("iPhone OS");
    	            if ((userAgent.indexOf("iPod"))>-1) {
    	                res = getArray("iOS","iOS-iPod","iOS-iPod "+((pos<0)?"":getVersionNumber(userAgent,pos+9)));
    	            } else {
    	                res = getArray("iOS","iOS-iPhone","iOS-iPhone "+((pos<0)?"":getVersionNumber(userAgent,pos+9)));
    	            }
    	        } else
    	        if ((userAgent.indexOf("iPad"))>-1) {
    	            pos = userAgent.indexOf("CPU OS");
    	            res = getArray("iOS","iOS-iPad","iOS-iPad "+((pos<0)?"":getVersionNumber(userAgent,pos+6)));
    	        } else
    	            res = getArray("Mac","MacOSX","MacOS "+getVersionNumber(userAgent,pos+8));
    	    } else
    	    if ((pos=userAgent.indexOf("Android"))>-1) {
    	        res = getArray("Linux","Android","Android "+getVersionNumber(userAgent,pos+8));
    	    } else
    	    if ((pos=userAgent.indexOf("Mac_PowerPC"))>-1) {
    	        res = getArray("Mac","MacPPC","MacOS "+getVersionNumber(userAgent,pos+3));
    	    } else
    	    if ((pos=userAgent.indexOf("Macintosh"))>-1) {
    	        if (userAgent.indexOf("PPC")>-1)
    	            res = getArray("Mac","MacPPC","Mac PPC");
    	        else
    	            res = getArray("Mac?","Mac?","MacOS?");
    	    } else
    	    if ((pos=userAgent.indexOf("FreeBSD"))>-1) {
    	        res = getArray("*BSD","*BSD FreeBSD","FreeBSD "+getVersionNumber(userAgent,pos+7));
    	    } else
    	    if ((pos=userAgent.indexOf("OpenBSD"))>-1) {
    	        res = getArray("*BSD","*BSD OpenBSD","OpenBSD "+getVersionNumber(userAgent,pos+7));
    	    } else
    	    if ((pos=userAgent.indexOf("Linux"))>-1) {
    	        String detail = "Linux "+getVersionNumber(userAgent,pos+5);
    	        String med = "Linux";
    	        if ((pos=userAgent.indexOf("Ubuntu/"))>-1) {
    	            detail = "Ubuntu "+getVersionNumber(userAgent,pos+7);
    	            med+=" Ubuntu";
    	        }
    	        res = getArray("Linux",med,detail);
    	    } else
    	    if ((pos=userAgent.indexOf("CentOS"))>-1) {
    	        res = getArray("Linux","Linux CentOS","CentOS");
    	    } else
    	    if ((pos=userAgent.indexOf("NetBSD"))>-1) {
    	        res = getArray("*BSD","*BSD NetBSD","NetBSD "+getVersionNumber(userAgent,pos+6));
    	    } else
    	    if ((pos=userAgent.indexOf("Unix"))>-1) {
    	        res = getArray("Linux","Linux","Linux "+getVersionNumber(userAgent,pos+4));
    	    } else
    	    if ((pos=userAgent.indexOf("SunOS"))>-1) {
    	        res = getArray("Unix","SunOS","SunOS"+getVersionNumber(userAgent,pos+5));
    	    } else
    	    if ((pos=userAgent.indexOf("IRIX"))>-1) {
    	        res = getArray("Unix","IRIX","IRIX"+getVersionNumber(userAgent,pos+4));
    	    } else
    	    if ((pos=userAgent.indexOf("SonyEricsson"))>-1) {
    	        res = getArray("SonyEricsson","SonyEricsson","SonyEricsson"+getVersionNumber(userAgent,pos+12));
    	    } else
    	    if ((pos=userAgent.indexOf("Nokia"))>-1) {
    	        res = getArray("Nokia","Nokia","Nokia"+getVersionNumber(userAgent,pos+5));
    	    } else
    	    if ((pos=userAgent.indexOf("BlackBerry"))>-1) {
    	        res = getArray("BlackBerry","BlackBerry","BlackBerry"+getVersionNumber(userAgent,pos+10));
    	    } else
    	    if ((pos=userAgent.indexOf("SymbianOS"))>-1) {
    	        res = getArray("SymbianOS","SymbianOS","SymbianOS"+getVersionNumber(userAgent,pos+10));
    	    } else
    	    if ((pos=userAgent.indexOf("BeOS"))>-1) {
    	        res = getArray("BeOS","BeOS","BeOS");
    	    } else
    	    if ((pos=userAgent.indexOf("Nintendo Wii"))>-1) {
    	        res = getArray("Nintendo Wii","Nintendo Wii","Nintendo Wii"+getVersionNumber(userAgent,pos+10));
    	    } else
    	    if ((pos=userAgent.indexOf("J2ME/MIDP"))>-1) {
    	        res = getArray("Java","J2ME","J2ME/MIDP");
    	    } else
    	    res = getArray("<b>?</b>","<b>?</b>","<b>?</b>");
    	    return res;
    	  }
    
    	  
    	  public  String []getBrowser(String userAgent) {
    	    String []botName;
    	    if ((botName=getBotName(userAgent))!=null) return botName;
    	    String[]res = null;
    	    int pos;
    	    if ((pos=userAgent.indexOf("Lotus-Notes/"))>-1) {
    	        res = getArray("LotusNotes","LotusNotes","LotusNotes"+getVersionNumber(userAgent,pos+12));
    	    } else
    	    if ((pos=userAgent.indexOf("Opera"))>-1) {
    	        String ver = getVersionNumber(userAgent,pos+5);
    	        res = getArray("Opera","Opera"+getFirstVersionNumber(userAgent,pos+5,1),"Opera"+ver);
    	        if ((pos=userAgent.indexOf("Opera Mini/"))>-1) {
    	            String ver2 = getVersionNumber(userAgent,pos+11);
    	            res = getArray("Opera","Opera Mini","Opera Mini "+ver2);
    	        } else
    	        if ((pos=userAgent.indexOf("Opera Mobi/"))>-1) {
    	            String ver2 = getVersionNumber(userAgent,pos+11);
    	            res = getArray("Opera","Opera Mobi","Opera Mobi "+ver2);
    	        }
    	    } else
    	    if (userAgent.indexOf("MSIE")>-1) {
    	        if ((pos=userAgent.indexOf("MSIE 6.0"))>-1) {
    	            res = getArray("MSIE","MSIE6","MSIE"+getVersionNumber(userAgent,pos+4));
    	        } else
    	        if ((pos=userAgent.indexOf("MSIE 5.0"))>-1) {
    	            res = getArray("MSIE","MSIE5","MSIE"+getVersionNumber(userAgent,pos+4));
    	        } else
    	        if ((pos=userAgent.indexOf("MSIE 5.5"))>-1) {
    	            res = getArray("MSIE","MSIE5.5","MSIE"+getVersionNumber(userAgent,pos+4));
    	        } else
    	        if ((pos=userAgent.indexOf("MSIE 5."))>-1) {
    	            res = getArray("MSIE","MSIE5.x","MSIE"+getVersionNumber(userAgent,pos+4));
    	        } else
    	        if ((pos=userAgent.indexOf("MSIE 4"))>-1) {
    	            res = getArray("MSIE","MSIE4","MSIE"+getVersionNumber(userAgent,pos+4));
    	        } else
    	        if ((pos=userAgent.indexOf("MSIE 7"))>-1 && userAgent.indexOf("Trident/4.0")<0) {
    	            res = getArray("MSIE","MSIE7","MSIE"+getVersionNumber(userAgent,pos+4));
    	        } else
    	        if ((pos=userAgent.indexOf("MSIE 8"))>-1 || userAgent.indexOf("Trident/4.0")>-1) {
    	            res = getArray("MSIE","MSIE8","MSIE"+getVersionNumber(userAgent,pos+4));
    	        } else
    	        if ((pos=userAgent.indexOf("MSIE 9"))>-1 || userAgent.indexOf("Trident/4.0")>-1) {
    	            res = getArray("MSIE","MSIE9","MSIE"+getVersionNumber(userAgent,pos+4));
    	        } else
    	        res = getArray("MSIE","<B>MSIE?</B>","<B>MSIE?"+getVersionNumber(userAgent,userAgent.indexOf("MSIE")+4)+"</B>");
    	    } else
    	    if ((pos=userAgent.indexOf("Gecko/"))>-1) {
    	        res = getArray("Gecko","Gecko","Gecko"+getFirstVersionNumber(userAgent,pos+5,4));
    	        if ((pos=userAgent.indexOf("Camino/"))>-1) {
    	            res[1]+="(Camino)";
    	            res[2]+="(Camino"+getVersionNumber(userAgent,pos+7)+")";
    	        } else
    	        if ((pos=userAgent.indexOf("Chimera/"))>-1) {
    	            res[1]+="(Chimera)";
    	            res[2]+="(Chimera"+getVersionNumber(userAgent,pos+8)+")";
    	        } else
    	        if ((pos=userAgent.indexOf("Firebird/"))>-1) {
    	            res[1]+="(Firebird)";
    	            res[2]+="(Firebird"+getVersionNumber(userAgent,pos+9)+")";
    	        } else
    	        if ((pos=userAgent.indexOf("Phoenix/"))>-1) {
    	            res[1]+="(Phoenix)";
    	            res[2]+="(Phoenix"+getVersionNumber(userAgent,pos+8)+")";
    	        } else
    	        if ((pos=userAgent.indexOf("Galeon/"))>-1) {
    	            res[1]+="(Galeon)";
    	            res[2]+="(Galeon"+getVersionNumber(userAgent,pos+7)+")";
    	        } else
    	        if ((pos=userAgent.indexOf("Firefox/"))>-1) {
    	            res[1]+="(Firefox)";
    	            res[2]+="(Firefox"+getVersionNumber(userAgent,pos+8)+")";
    	        } else
    	        if ((pos=userAgent.indexOf("Netscape/"))>-1) {
    	            if ((pos=userAgent.indexOf("Netscape/6"))>-1) {
    	                res[1]+="(NS6)";
    	                res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
    	            } else
    	            if ((pos=userAgent.indexOf("Netscape/7"))>-1) {
    	                res[1]+="(NS7)";
    	                res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
    	            } else 
    	            if ((pos=userAgent.indexOf("Netscape/8"))>-1) {
    	                res[1]+="(NS8)";
    	                res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
    	            } else 
    	            if ((pos=userAgent.indexOf("Netscape/9"))>-1) {
    	                res[1]+="(NS9)";
    	                res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
    	            } else {
    	                res[1]+="(NS?)";
    	                res[2]+="(NS?"+getVersionNumber(userAgent,userAgent.indexOf("Netscape/")+9)+")";
    	            }
    	        }
    	    } else
    	    if ((pos=userAgent.indexOf("Netscape/"))>-1) {
    	        if ((pos=userAgent.indexOf("Netscape/4"))>-1) {
    	            res = getArray("NS","NS4","NS"+getVersionNumber(userAgent,pos+9));
    	        } else
    	            res = getArray("NS","NS?","NS?"+getVersionNumber(userAgent,pos+9));
    	    } else
    	    if ((pos=userAgent.indexOf("Chrome/"))>-1) {
    	        res = getArray("KHTML","KHTML(Chrome)","KHTML(Chrome"+getVersionNumber(userAgent,pos+6)+")");
    	    } else
    	    if ((pos=userAgent.indexOf("Safari/"))>-1) {
    	        res = getArray("KHTML","KHTML(Safari)","KHTML(Safari"+getVersionNumber(userAgent,pos+6)+")");
    	    } else
    	    if ((pos=userAgent.indexOf("Konqueror/"))>-1) {
    	        res = getArray("KHTML","KHTML(Konqueror)","KHTML(Konqueror"+getVersionNumber(userAgent,pos+9)+")");
    	    } else
    	    if ((pos=userAgent.indexOf("KHTML"))>-1) {
    	        res = getArray("KHTML","KHTML?","KHTML?("+getVersionNumber(userAgent,pos+5)+")");
    	    } else
    	    if ((pos=userAgent.indexOf("NetFront"))>-1) {
    	        res = getArray("NetFront","NetFront","NetFront "+getVersionNumber(userAgent,pos+8));
    	    } else
    	    if ((pos=userAgent.indexOf("BlackBerry"))>-1) {
    	        pos=userAgent.indexOf("/",pos+2);
    	        res = getArray("BlackBerry","BlackBerry","BlackBerry"+getVersionNumber(userAgent,pos+1));
    	    } else
    	    //<SPAN class="codecomment"> We will interpret Mozilla/4.x as Netscape Communicator is and only if x</span>
    	    //<SPAN class="codecomment"> is not 0 or 5</span>
    	    if (userAgent.indexOf("Mozilla/4.")==0 &&
    	        userAgent.indexOf("Mozilla/4.0")<0 &&
    	        userAgent.indexOf("Mozilla/4.5 ")<0) {
    	        res = getArray("Communicator","Communicator","Communicator"+getVersionNumber(userAgent,pos+8));
    	    } else
    	    return getArray("<B>?</B>","<B>?</B>","<B>?</B>");
    	    return res;
    	  }
    	}  
    

    还有个keyword的提取

    package com.learningstorm.stormlogprocessing;
    
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    /**
     * This class use the KeywordGenerator class to generate the search keyword from
     * referrer URL.
     * 
     */
    public class KeyWordIdentifierBolt extends BaseRichBolt {
    
    	private static final long serialVersionUID = 1L;
    	private KeywordGenerator keywordGenerator = null;
    	public OutputCollector collector;
    
    	public KeyWordIdentifierBolt() {
    
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("ip", "dateTime", "request", "response",
    				"bytesSent", "referrer", "useragent", "country", "browser",
    				"os", "keyword"));
    	}
    
    	public void prepare(Map stormConf, TopologyContext context,
    			OutputCollector collector) {
    		this.collector = collector;
    		this.keywordGenerator = new KeywordGenerator();
    
    	}
    
    	public void execute(Tuple input) {
    
    		String referrer = input.getStringByField("referrer").toString();
    		// call the getKeyword(String referrer) method KeywordGenerator class to
    		// generate the search keyword.
    		Object keyword = keywordGenerator.getKeyword(referrer);
    		// emits all the field emitted by previous bolt + keyword
    		collector.emit(new Values(input.getString(0), input.getString(1), input
    				.getString(2), input.getString(3), input.getString(4), input
    				.getString(5), input.getString(6), input.getString(7), input
    				.getString(8), input.getString(9), keyword));
    
    	}
    }
    

    这里干事的其实也是keywordGenerator,通过匹配URL的参数

    
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class KeywordGenerator {
    	public String getKeyword(String referer) {
    
    		String[] temp;
    		Pattern pat = Pattern.compile("[?&#]name=([^&]+)");
    		Matcher m = pat.matcher(referer);
    		if (m.find()) {
    			String searchTerm = null;
    			searchTerm = m.group(1);
    			temp = searchTerm.split("\+");
    			searchTerm = temp[0];
    			for (int i = 1; i < temp.length; i++) {
    				searchTerm = searchTerm + " " + temp[i];
    			}
    			return searchTerm;
    		} else {
    			pat = Pattern.compile("[?&#]p=([^&]+)");
    			m = pat.matcher(referer);
    			if (m.find()) {
    				String searchTerm = null;
    				searchTerm = m.group(1);
    				temp = searchTerm.split("\+");
    				searchTerm = temp[0];
    				for (int i = 1; i < temp.length; i++) {
    					searchTerm = searchTerm + " " + temp[i];
    				}
    				return searchTerm;
    			} else {
    				//
    				pat = Pattern.compile("[?&#]query=([^&]+)");
    				m = pat.matcher(referer);
    				if (m.find()) {
    					String searchTerm = null;
    					searchTerm = m.group(1);
    					temp = searchTerm.split("\+");
    					searchTerm = temp[0];
    					for (int i = 1; i < temp.length; i++) {
    						searchTerm = searchTerm + " " + temp[i];
    					}
    					return searchTerm;
    				}  else {
    						return "NA";
    					}
    				}
    		}
    	}
    	
    }
    

    最后是PersistenceBolt,主要是将最终的结果

    public class PersistenceBolt implements IBasicBolt {
    
    	private MySQLDump mySQLDump = null;
    	private static final long serialVersionUID = 1L;
    	private String database;
    	private String user;
    	private String ip;
    	private String password;
    
    	public PersistenceBolt(String ip, String database, String user,
    			String password) {
    		this.ip = ip;
    		this.database = database;
    		this.user = user;
    		this.password = password;
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    	}
    
    	public Map<String, Object> getComponentConfiguration() {
    		return null;
    	}
    
    	public void prepare(Map stormConf, TopologyContext context) {
    		mySQLDump = new MySQLDump(ip, database, user, password);
    	}
    
    	public void execute(Tuple input, BasicOutputCollector collector) {
    		System.out.println("Input tuple : " + input);
    		mySQLDump.persistRecord(input);
    	}
    
    	public void cleanup() {
    		mySQLDump.close();
    	}
    
    }
    

    实际的数据库操作

    package com.learningstorm.stormlogprocessing;
    
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    
    import backtype.storm.tuple.Tuple;
    /**
     * This class contains logic to persist record into MySQL database.
     * 
     */
    public class MySQLDump {
    	private String database;
    	private String user;
    	private String ip;
    	private String password;
    	
    	public MySQLDump(String ip, String database, String user, String password) {
    		this.ip = ip;
    		this.database = database;
    		this.user = user;
    		this.password = password;
    	}
    	
    	private Connection connect = MySQLConnection.getMySQLConnection("192.168.137.10", "test", "root", "111111");
    
    	private PreparedStatement preparedStatement = null;
    	public void persistRecord(Tuple tuple) {
    		try {
    
    			preparedStatement = connect
    					.prepareStatement("insert into  apachelog values (default,?, ?, ?, ?, ? , ?, ?, ?,?,?,?)");
    
    			preparedStatement.setString(1, tuple.getStringByField("ip"));
    			preparedStatement.setString(2, tuple.getStringByField("dateTime"));
    			preparedStatement.setString(3, tuple.getStringByField("request"));
    			preparedStatement.setString(4, tuple.getStringByField("response"));
    			preparedStatement.setString(5, tuple.getStringByField("bytesSent"));
    			preparedStatement.setString(6, tuple.getStringByField("referrer"));
    			preparedStatement.setString(7, tuple.getStringByField("useragent"));
    			preparedStatement.setString(8, tuple.getStringByField("country"));
    			preparedStatement.setString(9, tuple.getStringByField("browser"));
    			preparedStatement.setString(10, tuple.getStringByField("os"));
    			preparedStatement.setString(11, tuple.getStringByField("keyword"));
    			
    			preparedStatement.executeUpdate();
    
    		} catch (Exception e) {
    			throw new RuntimeException(
    					"Error occurred while persisting records in mysql : ");
    		} finally {
    			if (preparedStatement != null) {
    				try {
    					preparedStatement.close();
    				} catch (Exception exception) {
    					System.out
    							.println("Error occurred while closing PreparedStatement : ");
    				}
    			}
    		}
    
    	}
    	
    	public void close() {
    		try {
    		connect.close();
    		}catch(Exception exception) {
    			System.out.println("Error occurred while clossing the connection");
    		}
    	}
    	
    	
    }
    

    数据库连接类MySQLConnection

    public class MySQLConnection {
    
    	private static Connection connect = null;
    
    	public static Connection getMySQLConnection(String ip, String database, String user, String password) {
    		try {
    			Class.forName("com.mysql.jdbc.Driver");
    			String url ="jdbc:mysql://"+ip+"/"+database+"?"
    					+ "user="+user+"&password="+password+"";
    			connect = DriverManager
    					.getConnection(url);
    			return connect;
    		} catch (Exception e) {
    			throw new RuntimeException("Error occurred while get mysql connection : " +e.getMessage());
    		}
    	}
    	
    

    运行producer,topo之后,数据库结果

    +----------------+----------+
    | browser        | count(*) |
    +----------------+----------+
    | Gecko(Firefox) |       66 |
    +----------------+----------+
    1 row in set (0.00 sec)
    
    mysql> select browser,count(*) from apachelog group by browser;
    +----------------+----------+
    | browser        | count(*) |
    +----------------+----------+
    | Gecko(Firefox) |       66 |
    +----------------+----------+
    1 row in set (0.00 sec)
    
    mysql> select os,count(*) from apachelog group by os;
    +-------+----------+
    | os    | count(*) |
    +-------+----------+
    | WinXP |       66 |
    +-------+----------+
    1 row in set (0.00 sec)
    
    mysql> select country,count(*) from apachelog group by country;
    +---------+----------+
    | country | count(*) |
    +---------+----------+
    | India   |       66 |
    +---------+----------+
    1 row in set (0.01 sec)
    
    mysql> 
    
  • 相关阅读:
    USACO 玛丽卡(最短路+枚举)
    POJ 1161 Walls(最短路+枚举)
    Windows 上配置Docker Desktop 的k8s
    菜鸡学算法--70. 爬楼梯
    CLR 异步函数
    【.NET Core开发实战学习笔记】依赖注入框架:管理服务的依赖与生命周期
    【.NET Core 开发实战学习笔记】StartUp:理解程序的启动过程
    .ef core 多对对关系的关联方法
    HttpGet请求传递数组(集合)
    使用wkhtmltopdf工具生成pdf
  • 原文地址:https://www.cnblogs.com/donganwangshi/p/4156697.html
Copyright © 2011-2022 走看看