zoukankan      html  css  js  c++  java
  • Storm流处理项目案例

    1.项目框架

      

    ======================程序需要一步一步的调试=====================

    一:第一步,KafkaSpout与驱动类

    1.此时启动的服务有

      

    2.主驱动类

     1 package com.jun.it2;
     2 
     3 import backtype.storm.Config;
     4 import backtype.storm.LocalCluster;
     5 import backtype.storm.StormSubmitter;
     6 import backtype.storm.generated.AlreadyAliveException;
     7 import backtype.storm.generated.InvalidTopologyException;
     8 import backtype.storm.generated.StormTopology;
     9 import backtype.storm.spout.SchemeAsMultiScheme;
    10 import backtype.storm.topology.IRichSpout;
    11 import backtype.storm.topology.TopologyBuilder;
    12 import storm.kafka.*;
    13 
    14 import java.util.UUID;
    15 
    16 public class WebLogStatictis {
    17     /**
    18      * 主函数
    19      * @param args
    20      */
    21     public static void main(String[] args) {
    22         WebLogStatictis webLogStatictis=new WebLogStatictis();
    23         StormTopology stormTopology=webLogStatictis.createTopology();
    24         Config config=new Config();
    25         //集群或者本地
    26         //conf.setNumAckers(4);
    27         if(args == null || args.length == 0){
    28             // 本地执行
    29             LocalCluster localCluster = new LocalCluster();
    30             localCluster.submitTopology("webloganalyse", config , stormTopology);
    31         }else{
    32             // 提交到集群上执行
    33             config.setNumWorkers(4); // 指定使用多少个进程来执行该Topology
    34             try {
    35                 StormSubmitter.submitTopology(args[0],config, stormTopology);
    36             } catch (AlreadyAliveException e) {
    37                 e.printStackTrace();
    38             } catch (InvalidTopologyException e) {
    39                 e.printStackTrace();
    40             }
    41         }
    42 
    43     }
    44     /**
    45      * 构造一个kafkaspout
    46      * @return
    47      */
    48     private IRichSpout generateSpout(){
    49         BrokerHosts hosts = new ZkHosts("linux-hadoop01.ibeifeng.com:2181");
    50         String topic = "nginxlog";
    51         String zkRoot = "/" + topic;
    52         String id = UUID.randomUUID().toString();
    53         SpoutConfig spoutConf = new SpoutConfig(hosts,topic,zkRoot,id);
    54         spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); // 按字符串解析
    55         spoutConf.forceFromStart = true;
    56         KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
    57         return kafkaSpout;
    58     }
    59 
    60     public StormTopology createTopology() {
    61         TopologyBuilder topologyBuilder=new TopologyBuilder();
    62         //指定Spout
    63         topologyBuilder.setSpout(WebLogConstants.KAFKA_SPOUT_ID,generateSpout());
    64         //
    65         topologyBuilder.setBolt(WebLogConstants.WEB_LOG_PARSER_BOLT,new WebLogParserBolt()).shuffleGrouping(WebLogConstants.KAFKA_SPOUT_ID);
    66 
    67         return topologyBuilder.createTopology();
    68     }
    69 
    70 }

    3.WebLogParserBolt

      这个主要的是打印Kafka的Spout发送的数据是否正确。

     1 package com.jun.it2;
     2 
     3 import backtype.storm.task.OutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.IRichBolt;
     6 import backtype.storm.topology.OutputFieldsDeclarer;
     7 import backtype.storm.tuple.Tuple;
     8 
     9 import java.util.Map;
    10 
    11 public class WebLogParserBolt implements IRichBolt {
    12     @Override
    13     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    14 
    15     }
    16 
    17     @Override
    18     public void execute(Tuple tuple) {
    19         String webLog=tuple.getStringByField("str");
    20         System.out.println(webLog);
    21     }
    22 
    23     @Override
    24     public void cleanup() {
    25 
    26     }
    27 
    28     @Override
    29     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    30 
    31     }
    32 
    33     @Override
    34     public Map<String, Object> getComponentConfiguration() {
    35         return null;
    36     }
    37 }

    4.运行Main

      先消费在Topic中的数据。

      

    5.运行kafka的生产者

       bin/kafka-console-producer.sh --topic nginxlog --broker-list linux-hadoop01.ibeifeng.com:9092

      

    6.拷贝数据到kafka生产者控制台

      

    7.Main下面控制台的程序

      

    二:第二步,解析Log

    1.WebLogParserBolt

      如果要是验证,就删除两个部分,打开一个注释:

        删掉分流

        删掉发射

        打开打印的注释。

    2.效果

      这个只要启动Main函数就可以验证。

      

    3.WebLogParserBolt

     1 package com.jun.it2;
     2 
     3 import backtype.storm.task.OutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.IRichBolt;
     6 import backtype.storm.topology.OutputFieldsDeclarer;
     7 import backtype.storm.tuple.Fields;
     8 import backtype.storm.tuple.Tuple;
     9 import backtype.storm.tuple.Values;
    10 
    11 import java.text.DateFormat;
    12 import java.text.SimpleDateFormat;
    13 import java.util.Date;
    14 import java.util.Map;
    15 import java.util.regex.Matcher;
    16 import java.util.regex.Pattern;
    17 
    18 import static com.jun.it2.WebLogConstants.*;
    19 
    20 public class WebLogParserBolt implements IRichBolt {
    21     private Pattern pattern;
    22 
    23     private OutputCollector  outputCollector;
    24     @Override
    25     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    26         pattern = Pattern.compile("([^ ]*) [^ ]* [^ ]* \[([\d+]*)\] \"[^ ]* ([^ ]*) [^ ]*\" \d{3} \d+ \"([^"]*)\" \"([^"]*)\" \"[^ ]*\"");
    27         this.outputCollector = outputCollector;
    28     }
    29 
    30     @Override
    31     public void execute(Tuple tuple) {
    32         String webLog=tuple.getStringByField("str");
    33         if(webLog!= null || !"".equals(webLog)){
    34 
    35             Matcher matcher = pattern.matcher(webLog);
    36             if(matcher.find()){
    37                 //
    38                 String ip = matcher.group(1);
    39                 String serverTimeStr = matcher.group(2);
    40 
    41                 // 处理时间
    42                 long timestamp = Long.parseLong(serverTimeStr);
    43                 Date date = new Date();
    44                 date.setTime(timestamp);
    45 
    46                 DateFormat df = new SimpleDateFormat("yyyyMMddHHmm");
    47                 String dateStr = df.format(date);
    48                 String day = dateStr.substring(0,8);
    49                 String hour = dateStr.substring(0,10);
    50                 String minute = dateStr ;
    51 
    52                 String requestUrl = matcher.group(3);
    53                 String httpRefer = matcher.group(4);
    54                 String userAgent = matcher.group(5);
    55 
    56                 //可以验证是否匹配正确
    57 //                System.err.println(webLog);
    58 //                System.err.println(
    59 //                        "ip=" + ip
    60 //                        + ", serverTimeStr=" + serverTimeStr
    61 //                        +", requestUrl=" + requestUrl
    62 //                        +", httpRefer=" + httpRefer
    63 //                        +", userAgent=" + userAgent
    64 //                );
    65 
    66                 //分流
    67                 this.outputCollector.emit(WebLogConstants.IP_COUNT_STREAM, tuple,new Values(day, hour, minute, ip));
    68                 this.outputCollector.emit(WebLogConstants.URL_PARSER_STREAM, tuple,new Values(day, hour, minute, requestUrl));
    69                 this.outputCollector.emit(WebLogConstants.HTTPREFER_PARSER_STREAM, tuple,new Values(day, hour, minute, httpRefer));
    70                 this.outputCollector.emit(WebLogConstants.USERAGENT_PARSER_STREAM, tuple,new Values(day, hour, minute, userAgent));
    71             }
    72         }
    73         this.outputCollector.ack(tuple);
    74 
    75     }
    76 
    77     @Override
    78     public void cleanup() {
    79 
    80     }
    81 
    82     @Override
    83     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    84         outputFieldsDeclarer.declareStream(WebLogConstants.IP_COUNT_STREAM,new Fields(DAY, HOUR, MINUTE, IP));
    85         outputFieldsDeclarer.declareStream(WebLogConstants.URL_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, REQUEST_URL));
    86         outputFieldsDeclarer.declareStream(WebLogConstants.HTTPREFER_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, HTTP_REFER));
    87         outputFieldsDeclarer.declareStream(WebLogConstants.USERAGENT_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, USERAGENT));
    88     }
    89 
    90     @Override
    91     public Map<String, Object> getComponentConfiguration() {
    92         return null;
    93     }
    94 }

    三:第三步,通用计数器

    1.CountKpiBolt

     1 package com.jun.it2;
     2 
     3 import backtype.storm.task.OutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.IRichBolt;
     6 import backtype.storm.topology.OutputFieldsDeclarer;
     7 import backtype.storm.tuple.Fields;
     8 import backtype.storm.tuple.Tuple;
     9 import backtype.storm.tuple.Values;
    10 
    11 import java.util.HashMap;
    12 import java.util.Iterator;
    13 import java.util.Map;
    14 
    15 public class CountKpiBolt implements IRichBolt {
    16 
    17     private String kpiType;
    18 
    19     private Map<String,Integer> kpiCounts;
    20 
    21     private String currentDay = "";
    22 
    23     private OutputCollector outputCollector;
    24 
    25     public CountKpiBolt(String kpiType){
    26         this.kpiType = kpiType;
    27     }
    28 
    29     @Override
    30     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    31         this.kpiCounts = new HashMap<>();
    32         this.outputCollector = outputCollector;
    33     }
    34 
    35     @Override
    36     public void execute(Tuple tuple) {
    37         String day = tuple.getStringByField("day");
    38         String hour = tuple.getStringByField("hour");
    39         String minute = tuple.getStringByField("minute");
    40         String kpi = tuple.getString(3);
    41         //日期与KPI组合
    42         String kpiByDay = day + "_" + kpi;
    43         String kpiByHour = hour +"_" + kpi;
    44         String kpiByMinute = minute + "_" + kpi;
    45         //将计数信息存放到Map中
    46         int kpiCountByDay = 0;
    47         int kpiCountByHour = 0;
    48         int kpiCountByMinute = 0;
    49         if(kpiCounts.containsKey(kpiByDay)){
    50             kpiCountByDay = kpiCounts.get(kpiByDay);
    51         }
    52         if(kpiCounts.containsKey(kpiByHour)){
    53             kpiCountByHour = kpiCounts.get(kpiByHour);
    54         }
    55         if(kpiCounts.containsKey(kpiByMinute)){
    56             kpiCountByMinute = kpiCounts.get(kpiByMinute);
    57         }
    58         kpiCountByDay ++;
    59         kpiCountByHour ++;
    60         kpiCountByMinute ++;
    61         kpiCounts.put(kpiByDay, kpiCountByDay);
    62         kpiCounts.put(kpiByHour, kpiCountByHour);
    63         kpiCounts.put(kpiByMinute,kpiCountByMinute);
    64         //隔天清空内存
    65         if(!currentDay.equals(day)){
    66             // 说明隔天了
    67             Iterator<Map.Entry<String,Integer>> iter = kpiCounts.entrySet().iterator();
    68             while(iter.hasNext()){
    69                 Map.Entry<String,Integer> entry = iter.next();
    70                 if(entry.getKey().startsWith(currentDay)){
    71                     iter.remove();
    72                 }
    73             }
    74         }
    75         currentDay = day;
    76         //发射
    77         //发射两个字段
    78         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByDay, kpiCountByDay));
    79         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByHour, kpiCountByHour));
    80         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByMinute, kpiCountByMinute));
    81         this.outputCollector.ack(tuple);
    82 
    83     }
    84 
    85     @Override
    86     public void cleanup() {
    87 
    88     }
    89 
    90     @Override
    91     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    92         outputFieldsDeclarer.declare(new Fields(WebLogConstants.SERVERTIME_KPI, WebLogConstants.KPI_COUNTS));
    93     }
    94 
    95     @Override
    96     public Map<String, Object> getComponentConfiguration() {
    97         return null;
    98     }
    99 }

    2.saveBolt.java

      主要是打印功能。

     1 package com.jun.it2;
     2 
     3 import backtype.storm.task.OutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.IRichBolt;
     6 import backtype.storm.topology.OutputFieldsDeclarer;
     7 import backtype.storm.tuple.Tuple;
     8 
     9 import java.util.Map;
    10 
    11 public class SaveBolt implements IRichBolt {
    12 
    13     @Override
    14     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    15 
    16     }
    17 
    18     @Override
    19     public void execute(Tuple tuple) {
    20         String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI);
    21         Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS);
    22         System.err.println("serverTimeAndKpi=" + serverTimeAndKpi + ", kpiCounts=" + kpiCounts);
    23     }
    24 
    25     @Override
    26     public void cleanup() {
    27 
    28     }
    29 
    30     @Override
    31     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    32 
    33     }
    34 
    35     @Override
    36     public Map<String, Object> getComponentConfiguration() {
    37         return null;
    38     }
    39 }

    3.效果

      

    四:保存到HBase中

    1.saveBolt.java

     1 package com.jun.it2;
     2 
     3 import backtype.storm.task.OutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.IRichBolt;
     6 import backtype.storm.topology.OutputFieldsDeclarer;
     7 import backtype.storm.tuple.Tuple;
     8 import org.apache.hadoop.conf.Configuration;
     9 import org.apache.hadoop.hbase.HBaseConfiguration;
    10 import org.apache.hadoop.hbase.client.HTable;
    11 import org.apache.hadoop.hbase.client.Put;
    12 import org.apache.hadoop.hbase.util.Bytes;
    13 
    14 import java.io.IOException;
    15 import java.util.Map;
    16 
    17 import static com.jun.it2.WebLogConstants.HBASE_TABLENAME;
    18 
    19 public class SaveBolt implements IRichBolt {
    20     private HTable table;
    21 
    22     private OutputCollector outputCollector;
    23     @Override
    24     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    25         Configuration configuration = HBaseConfiguration.create();
    26         try {
    27             table = new HTable(configuration,HBASE_TABLENAME);
    28         } catch (IOException e) {
    29             e.printStackTrace();
    30             throw new RuntimeException(e);
    31         }
    32 
    33         this.outputCollector = outputCollector;
    34     }
    35 
    36     @Override
    37     public void execute(Tuple tuple) {
    38         String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI);
    39         Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS);
    40 //        System.err.println("serverTimeAndKpi=" + serverTimeAndKpi + ", kpiCounts=" + kpiCounts);
    41         if(serverTimeAndKpi!= null && kpiCounts != null){
    42 
    43             Put put = new Put(Bytes.toBytes(serverTimeAndKpi));
    44             String columnQuelifier = serverTimeAndKpi.split("_")[0];
    45             put.add(Bytes.toBytes(WebLogConstants.COLUMN_FAMILY),
    46                     Bytes.toBytes(columnQuelifier),Bytes.toBytes(""+kpiCounts));
    47 
    48             try {
    49                 table.put(put);
    50             } catch (IOException e) {
    51                 throw new RuntimeException(e);
    52             }
    53         }
    54         this.outputCollector.ack(tuple);
    55     }
    56 
    57     @Override
    58     public void cleanup() {
    59         if(table!= null){
    60             try {
    61                 table.close();
    62             } catch (IOException e) {
    63                 e.printStackTrace();
    64             }
    65         }
    66     }
    67 
    68     @Override
    69     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    70 
    71     }
    72 
    73     @Override
    74     public Map<String, Object> getComponentConfiguration() {
    75         return null;
    76     }
    77 }

    2.当前服务

      

    3.进入Hbase建表

      

    4.运行程序

      出现报错信息

     1 ERROR org.apache.hadoop.util.Shell - Failed to locate the winutils binary in the hadoop binary path
     2 java.io.IOException: Could not locate executable nullinwinutils.exe in the Hadoop binaries.
     3     at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
     4     at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
     5     at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
     6     at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
     7     at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
     8     at org.apache.hadoop.security.Groups.<init>(Groups.java:86) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
     9     at org.apache.hadoop.security.Groups.<init>(Groups.java:66) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
    10     at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
    11     at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:269) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
    12     at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:246) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
    13     at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:775) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
    14     at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:760) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
    15     at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:633) [hadoop-common-2.5.0-cdh5.3.6.jar:na]
    16     at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:260) [hbase-common-0.98.6-cdh5.3.6.jar:na]
    17     at org.apache.hadoop.hbase.security.User$SecureHadoopUser.<init>(User.java:256) [hbase-common-0.98.6-cdh5.3.6.jar:na]
    18     at org.apache.hadoop.hbase.security.User.getCurrent(User.java:160) [hbase-common-0.98.6-cdh5.3.6.jar:na]
    19     at org.apache.hadoop.hbase.security.UserProvider.getCurrent(UserProvider.java:89) [hbase-common-0.98.6-cdh5.3.6.jar:na]
    20     at org.apache.hadoop.hbase.client.HConnectionKey.<init>(HConnectionKey.java:70) [hbase-client-0.98.6-cdh5.3.6.jar:na]
    21     at org.apache.hadoop.hbase.client.HConnectionManager.getConnection(HConnectionManager.java:267) [hbase-client-0.98.6-cdh5.3.6.jar:na]
    22     at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:199) [hbase-client-0.98.6-cdh5.3.6.jar:na]
    23     at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:161) [hbase-client-0.98.6-cdh5.3.6.jar:na]
    24     at com.jun.it2.SaveBolt.prepare(SaveBolt.java:27) [classes/:na]
    25     at backtype.storm.daemon.executor$fn__3439$fn__3451.invoke(executor.clj:699) [storm-core-0.9.6.jar:0.9.6]
    26     at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) [storm-core-0.9.6.jar:0.9.6]
    27     at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
    28     at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]

    5.网上的解决方

      1.下载winutils的windows版本

        GitHub上,有人提供了winutils的windows的版本,项目地址是:https://github.com/srccodes/hadoop-common-2.2.0-bin,直接下载此项目的zip包,下载后是文件名是hadoop-common-2.2.0-bin-master.zip,随便解压到一个目录

      2.配置环境变量

        增加用户变量HADOOP_HOME,值是下载的zip包解压的目录,然后在系统变量path里增加$HADOOP_HOMEin 即可。  

        再次运行程序,正常执行。

    6.截图

      

    7.添加配置文件

      这个是必须的,在window下面。

      

    8.最终执行效果

      

     五:PS---程序

    1.主驱动类

     1 package com.jun.it2;
     2 
     3 import backtype.storm.Config;
     4 import backtype.storm.LocalCluster;
     5 import backtype.storm.StormSubmitter;
     6 import backtype.storm.generated.AlreadyAliveException;
     7 import backtype.storm.generated.InvalidTopologyException;
     8 import backtype.storm.generated.StormTopology;
     9 import backtype.storm.spout.SchemeAsMultiScheme;
    10 import backtype.storm.topology.IRichSpout;
    11 import backtype.storm.topology.TopologyBuilder;
    12 import backtype.storm.tuple.Fields;
    13 import org.apache.hadoop.fs.Path;
    14 import storm.kafka.*;
    15 
    16 import java.io.File;
    17 import java.io.IOException;
    18 import java.util.UUID;
    19 
    20 public class WebLogStatictis {
    21 
    22     /**
    23      * 主函数
    24      * @param args
    25      */
    26     public static void main(String[] args) throws IOException {
    27         WebLogStatictis webLogStatictis=new WebLogStatictis();
    28         StormTopology stormTopology=webLogStatictis.createTopology();
    29         Config config=new Config();
    30         //集群或者本地
    31         //conf.setNumAckers(4);
    32         if(args == null || args.length == 0){
    33             // 本地执行
    34             LocalCluster localCluster = new LocalCluster();
    35             localCluster.submitTopology("webloganalyse2", config , stormTopology);
    36         }else{
    37             // 提交到集群上执行
    38             config.setNumWorkers(4); // 指定使用多少个进程来执行该Topology
    39             try {
    40                 StormSubmitter.submitTopology(args[0],config, stormTopology);
    41             } catch (AlreadyAliveException e) {
    42                 e.printStackTrace();
    43             } catch (InvalidTopologyException e) {
    44                 e.printStackTrace();
    45             }
    46         }
    47 
    48     }
    49     /**
    50      * 构造一个kafkaspout
    51      * @return
    52      */
    53     private IRichSpout generateSpout(){
    54         BrokerHosts hosts = new ZkHosts("linux-hadoop01.ibeifeng.com:2181");
    55         String topic = "nginxlog";
    56         String zkRoot = "/" + topic;
    57         String id = UUID.randomUUID().toString();
    58         SpoutConfig spoutConf = new SpoutConfig(hosts,topic,zkRoot,id);
    59         spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); // 按字符串解析
    60         spoutConf.forceFromStart = true;
    61         KafkaSpout kafkaSpout = new KafkaSpout(spoutConf);
    62         return kafkaSpout;
    63     }
    64 
    65     public StormTopology createTopology() {
    66         TopologyBuilder topologyBuilder=new TopologyBuilder();
    67         //指定Spout
    68         topologyBuilder.setSpout(WebLogConstants.KAFKA_SPOUT_ID,generateSpout());
    69         //指定WebLogParserBolt
    70         topologyBuilder.setBolt(WebLogConstants.WEB_LOG_PARSER_BOLT,new WebLogParserBolt()).shuffleGrouping(WebLogConstants.KAFKA_SPOUT_ID);
    71         //指定CountKpiBolt:第一个参数是组件,第二个参数是流ID,第三个参数是分组字段
    72         topologyBuilder.setBolt(WebLogConstants.COUNT_IP_BOLT, new CountKpiBolt(WebLogConstants.IP_KPI))
    73                 .fieldsGrouping(WebLogConstants.WEB_LOG_PARSER_BOLT, WebLogConstants.IP_COUNT_STREAM, new Fields(WebLogConstants.IP));
    74         //指定SaveBolt:汇总
    75         topologyBuilder.setBolt(WebLogConstants.SAVE_BOLT ,new SaveBolt(),3)
    76                 .shuffleGrouping(WebLogConstants.COUNT_IP_BOLT)
    77         ;
    78         return topologyBuilder.createTopology();
    79     }
    80 
    81 }

    2.常量类

     1 package com.jun.it2;
     2 
     3 public class WebLogConstants {
     4     //Spout与Bolt的ID
     5     public static String KAFKA_SPOUT_ID="kafkaSpoutId";
     6     public static final String WEB_LOG_PARSER_BOLT = "webLogParserBolt";
     7     public static final String COUNT_IP_BOLT = "countIpBolt";
     8     public static final String COUNT_BROWSER_BOLT = "countBrowserBolt";
     9     public static final String COUNT_OS_BOLT = "countOsBolt";
    10     public static final String USER_AGENT_PARSER_BOLT = "userAgentParserBolt";
    11     public static final String SAVE_BOLT = "saveBolt";
    12 
    13     //流ID
    14     public  static final String IP_COUNT_STREAM = "ipCountStream";
    15     public  static final String URL_PARSER_STREAM = "urlParserStream";
    16     public  static final String HTTPREFER_PARSER_STREAM = "httpReferParserStream";
    17     public  static final String USERAGENT_PARSER_STREAM = "userAgentParserStream";
    18     public  static final String BROWSER_COUNT_STREAM = "browserCountStream";
    19     public  static final String OS_COUNT_STREAM = "osCountStream";
    20 
    21 
    22     //tuple key名称
    23     public static final String DAY = "day";
    24     public static final String HOUR = "hour";
    25     public static final String MINUTE = "minute";
    26     public static final String IP = "ip";
    27     public static final String REQUEST_URL = "requestUrl";
    28     public static final String HTTP_REFER = "httpRefer";
    29     public static final String USERAGENT = "userAgent";
    30     public static final String BROWSER = "browser";
    31     public static final String OS = "os";
    32     public static final String SERVERTIME_KPI = "serverTimeAndKpi";
    33     public static final String KPI_COUNTS = "kpiCounts";
    34 
    35 
    36     //kpi类型
    37     public static final String IP_KPI = "I";
    38     public static final String URL_KPI = "U";
    39     public static final String BROWSER_KPI = "B";
    40     public static final String OS_KPI = "O";
    41 
    42 
    43     //Hbase
    44     public static final String HBASE_TABLENAME = "weblogstatictis";
    45     public static final String COLUMN_FAMILY = "info";
    46 }

    3.解析类

     1 package com.jun.it2;
     2 
     3 import backtype.storm.task.OutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.IRichBolt;
     6 import backtype.storm.topology.OutputFieldsDeclarer;
     7 import backtype.storm.tuple.Fields;
     8 import backtype.storm.tuple.Tuple;
     9 import backtype.storm.tuple.Values;
    10 
    11 import java.text.DateFormat;
    12 import java.text.SimpleDateFormat;
    13 import java.util.Date;
    14 import java.util.Map;
    15 import java.util.regex.Matcher;
    16 import java.util.regex.Pattern;
    17 
    18 import static com.jun.it2.WebLogConstants.*;
    19 
    20 public class WebLogParserBolt implements IRichBolt {
    21     private Pattern pattern;
    22 
    23     private OutputCollector  outputCollector;
    24     @Override
    25     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    26         pattern = Pattern.compile("([^ ]*) [^ ]* [^ ]* \[([\d+]*)\] \"[^ ]* ([^ ]*) [^ ]*\" \d{3} \d+ \"([^"]*)\" \"([^"]*)\" \"[^ ]*\"");
    27         this.outputCollector = outputCollector;
    28     }
    29 
    30     @Override
    31     public void execute(Tuple tuple) {
    32         String webLog=tuple.getStringByField("str");
    33         if(webLog!= null || !"".equals(webLog)){
    34 
    35             Matcher matcher = pattern.matcher(webLog);
    36             if(matcher.find()){
    37                 //
    38                 String ip = matcher.group(1);
    39                 String serverTimeStr = matcher.group(2);
    40 
    41                 // 处理时间
    42                 long timestamp = Long.parseLong(serverTimeStr);
    43                 Date date = new Date();
    44                 date.setTime(timestamp);
    45 
    46                 DateFormat df = new SimpleDateFormat("yyyyMMddHHmm");
    47                 String dateStr = df.format(date);
    48                 String day = dateStr.substring(0,8);
    49                 String hour = dateStr.substring(0,10);
    50                 String minute = dateStr ;
    51 
    52                 String requestUrl = matcher.group(3);
    53                 String httpRefer = matcher.group(4);
    54                 String userAgent = matcher.group(5);
    55 
    56                 //可以验证是否匹配正确
    57 //                System.err.println(webLog);
    58 //                System.err.println(
    59 //                        "ip=" + ip
    60 //                        + ", serverTimeStr=" + serverTimeStr
    61 //                        +", requestUrl=" + requestUrl
    62 //                        +", httpRefer=" + httpRefer
    63 //                        +", userAgent=" + userAgent
    64 //                );
    65 
    66                 //分流
    67                 this.outputCollector.emit(WebLogConstants.IP_COUNT_STREAM, tuple,new Values(day, hour, minute, ip));
    68                 this.outputCollector.emit(WebLogConstants.URL_PARSER_STREAM, tuple,new Values(day, hour, minute, requestUrl));
    69                 this.outputCollector.emit(WebLogConstants.HTTPREFER_PARSER_STREAM, tuple,new Values(day, hour, minute, httpRefer));
    70                 this.outputCollector.emit(WebLogConstants.USERAGENT_PARSER_STREAM, tuple,new Values(day, hour, minute, userAgent));
    71             }
    72         }
    73         this.outputCollector.ack(tuple);
    74 
    75     }
    76 
    77     @Override
    78     public void cleanup() {
    79 
    80     }
    81 
    82     @Override
    83     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    84         outputFieldsDeclarer.declareStream(WebLogConstants.IP_COUNT_STREAM,new Fields(DAY, HOUR, MINUTE, IP));
    85         outputFieldsDeclarer.declareStream(WebLogConstants.URL_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, REQUEST_URL));
    86         outputFieldsDeclarer.declareStream(WebLogConstants.HTTPREFER_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, HTTP_REFER));
    87         outputFieldsDeclarer.declareStream(WebLogConstants.USERAGENT_PARSER_STREAM,new Fields(DAY, HOUR, MINUTE, USERAGENT));
    88     }
    89 
    90     @Override
    91     public Map<String, Object> getComponentConfiguration() {
    92         return null;
    93     }
    94 }

    4.计算类

     1 package com.jun.it2;
     2 
     3 import backtype.storm.task.OutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.IRichBolt;
     6 import backtype.storm.topology.OutputFieldsDeclarer;
     7 import backtype.storm.tuple.Fields;
     8 import backtype.storm.tuple.Tuple;
     9 import backtype.storm.tuple.Values;
    10 
    11 import java.util.HashMap;
    12 import java.util.Iterator;
    13 import java.util.Map;
    14 
    15 public class CountKpiBolt implements IRichBolt {
    16 
    17     private String kpiType;
    18 
    19     private Map<String,Integer> kpiCounts;
    20 
    21     private String currentDay = "";
    22 
    23     private OutputCollector outputCollector;
    24 
    25     public CountKpiBolt(String kpiType){
    26         this.kpiType = kpiType;
    27     }
    28 
    29     @Override
    30     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    31         this.kpiCounts = new HashMap<>();
    32         this.outputCollector = outputCollector;
    33     }
    34 
    35     @Override
    36     public void execute(Tuple tuple) {
    37         String day = tuple.getStringByField("day");
    38         String hour = tuple.getStringByField("hour");
    39         String minute = tuple.getStringByField("minute");
    40         String kpi = tuple.getString(3);
    41         //日期与KPI组合
    42         String kpiByDay = day + "_" + kpi;
    43         String kpiByHour = hour +"_" + kpi;
    44         String kpiByMinute = minute + "_" + kpi;
    45         //将计数信息存放到Map中
    46         int kpiCountByDay = 0;
    47         int kpiCountByHour = 0;
    48         int kpiCountByMinute = 0;
    49         if(kpiCounts.containsKey(kpiByDay)){
    50             kpiCountByDay = kpiCounts.get(kpiByDay);
    51         }
    52         if(kpiCounts.containsKey(kpiByHour)){
    53             kpiCountByHour = kpiCounts.get(kpiByHour);
    54         }
    55         if(kpiCounts.containsKey(kpiByMinute)){
    56             kpiCountByMinute = kpiCounts.get(kpiByMinute);
    57         }
    58         kpiCountByDay ++;
    59         kpiCountByHour ++;
    60         kpiCountByMinute ++;
    61         kpiCounts.put(kpiByDay, kpiCountByDay);
    62         kpiCounts.put(kpiByHour, kpiCountByHour);
    63         kpiCounts.put(kpiByMinute,kpiCountByMinute);
    64         //隔天清空内存
    65         if(!currentDay.equals(day)){
    66             // 说明隔天了
    67             Iterator<Map.Entry<String,Integer>> iter = kpiCounts.entrySet().iterator();
    68             while(iter.hasNext()){
    69                 Map.Entry<String,Integer> entry = iter.next();
    70                 if(entry.getKey().startsWith(currentDay)){
    71                     iter.remove();
    72                 }
    73             }
    74         }
    75         currentDay = day;
    76         //发射
    77         //发射两个字段
    78         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByDay, kpiCountByDay));
    79         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByHour, kpiCountByHour));
    80         this.outputCollector.emit(tuple, new Values(kpiType+"_" + kpiByMinute, kpiCountByMinute));
    81         this.outputCollector.ack(tuple);
    82 
    83     }
    84 
    85     @Override
    86     public void cleanup() {
    87 
    88     }
    89 
    90     @Override
    91     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    92         outputFieldsDeclarer.declare(new Fields(WebLogConstants.SERVERTIME_KPI, WebLogConstants.KPI_COUNTS));
    93     }
    94 
    95     @Override
    96     public Map<String, Object> getComponentConfiguration() {
    97         return null;
    98     }
    99 }

    5.保存类

     1 package com.jun.it2;
     2 
     3 import backtype.storm.task.OutputCollector;
     4 import backtype.storm.task.TopologyContext;
     5 import backtype.storm.topology.IRichBolt;
     6 import backtype.storm.topology.OutputFieldsDeclarer;
     7 import backtype.storm.tuple.Tuple;
     8 import org.apache.hadoop.conf.Configuration;
     9 import org.apache.hadoop.hbase.HBaseConfiguration;
    10 import org.apache.hadoop.hbase.client.HTable;
    11 import org.apache.hadoop.hbase.client.Put;
    12 import org.apache.hadoop.hbase.util.Bytes;
    13 
    14 import java.io.IOException;
    15 import java.util.Map;
    16 
    17 import static com.jun.it2.WebLogConstants.HBASE_TABLENAME;
    18 
    19 public class SaveBolt implements IRichBolt {
    20     private HTable table;
    21 
    22     private OutputCollector outputCollector;
    23     @Override
    24     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    25         Configuration configuration = HBaseConfiguration.create();
    26         try {
    27             table = new HTable(configuration,HBASE_TABLENAME);
    28         } catch (IOException e) {
    29             e.printStackTrace();
    30             throw new RuntimeException(e);
    31         }
    32 
    33         this.outputCollector = outputCollector;
    34     }
    35 
    36     @Override
    37     public void execute(Tuple tuple) {
    38         String serverTimeAndKpi = tuple.getStringByField(WebLogConstants.SERVERTIME_KPI);
    39         Integer kpiCounts = tuple.getIntegerByField(WebLogConstants.KPI_COUNTS);
    40         System.err.println("serverTimeAndKpi=" + serverTimeAndKpi + ", kpiCounts=" + kpiCounts);
    41         if(serverTimeAndKpi!= null && kpiCounts != null){
    42 
    43             Put put = new Put(Bytes.toBytes(serverTimeAndKpi));
    44             String columnQuelifier = serverTimeAndKpi.split("_")[0];
    45             put.add(Bytes.toBytes(WebLogConstants.COLUMN_FAMILY),
    46                     Bytes.toBytes(columnQuelifier),Bytes.toBytes(""+kpiCounts));
    47 
    48             try {
    49                 table.put(put);
    50             } catch (IOException e) {
    51                 throw new RuntimeException(e);
    52             }
    53         }
    54         this.outputCollector.ack(tuple);
    55     }
    56 
    57     @Override
    58     public void cleanup() {
    59         if(table!= null){
    60             try {
    61                 table.close();
    62             } catch (IOException e) {
    63                 e.printStackTrace();
    64             }
    65         }
    66     }
    67 
    68     @Override
    69     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    70 
    71     }
    72 
    73     @Override
    74     public Map<String, Object> getComponentConfiguration() {
    75         return null;
    76     }
    77 }

      

      

  • 相关阅读:
    [已解决]报错:Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m;
    比较asyncio.run_coroutine_threadsafe 和 run_in_executor的区别
    sql server表结构对比
    sql server乱码显示问题
    sql server表分区系列【转】
    无法使用备份文件,因为原先格式化该文件时所用扇区大小为 512,而目前所在设备的扇区大小为 4096
    notepad++安装SQL格式化插件
    Linux学习笔记(21)linux查看系统状态
    mysql导入报错 [Err] 1273
    mysql cte
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9148100.html
Copyright © 2011-2022 走看看