zoukankan      html  css  js  c++  java
  • Trident中的解析包含的函数操作与投影操作

    一:函数操作

    1.介绍

      Tuple本身是不可变的

      Function只是在原有的基础上追加新的tuple

    2.说明

      如果原来的字段是log,flag

      新增之后的tuple可以访问这些字段,log,flag,orderId,orderAmt,memberId

    3.先写驱动类

      增加了解析

      然后再将解析的日志进行打印

     1 package com.jun.trident;
     2 
     3 import backtype.storm.Config;
     4 import backtype.storm.LocalCluster;
     5 import backtype.storm.StormSubmitter;
     6 import backtype.storm.generated.AlreadyAliveException;
     7 import backtype.storm.generated.InvalidTopologyException;
     8 import backtype.storm.tuple.Fields;
     9 import backtype.storm.tuple.Values;
    10 import storm.trident.TridentTopology;
    11 import storm.trident.operation.Function;
    12 import storm.trident.operation.TridentCollector;
    13 import storm.trident.operation.TridentOperationContext;
    14 import storm.trident.testing.FixedBatchSpout;
    15 import storm.trident.tuple.TridentTuple;
    16 
    17 import java.util.Map;
    18 
    19 public class TridentDemo {
    20     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    21         TridentTopology tridentTopology=new TridentTopology();
    22         //模拟数据
    23         Fields field=new Fields("log","flag");
    24         FixedBatchSpout spout=new FixedBatchSpout(field,5,
    25             new Values("168.214.187.214 - - [1481953616092] "GET /view.php HTTP/1.1" 200 0 "http://cn.bing.com/search?q=spark mllib" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","A"),
    26             new Values("168.187.202.202 - - [1481953537038] "GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)" "-"","A"),
    27             new Values("61.30.167.187 - - [1481953539039] "GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","A"),
    28             new Values("30.29.132.190 - - [1481953544042] "GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53" "-"","B"),
    29             new Values("222.190.187.201 - - [1481953578068] "GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","B"),
    30             new Values("72.202.43.53 - - [1481953579069] "GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","B")
    31         );
    32         //多次循环
    33         spout.setCycle(true);
    34         //提交
    35         Config config=new Config();
    36         tridentTopology.newStream("orderAnalyse",spout)
    37                 //过滤
    38             .each(new Fields("log"),new ValidLogFilter())
    39                 //解析
    40             .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId"))
    41                 //不添加log了,日志太长,不方便看控制台的现象
    42             .each(new Fields("flag","orderId","orderTime","orderAmtStr","memberId"),new PrintFilter());
    43         if(args==null || args.length<=0){
    44             LocalCluster localCluster=new LocalCluster();
    45             localCluster.submitTopology("tridentDemo",config,tridentTopology.build());
    46         }else {
    47             config.setNumWorkers(2);
    48             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
    49         }
    50     }
    51 }

    4.解析方法类

     1 package com.jun.trident;
     2 
     3 import backtype.storm.tuple.Values;
     4 import storm.trident.operation.Function;
     5 import storm.trident.operation.TridentCollector;
     6 import storm.trident.operation.TridentOperationContext;
     7 import storm.trident.tuple.TridentTuple;
     8 
     9 import java.util.HashMap;
    10 import java.util.Map;
    11 
    12 public class LogParserFunction implements Function {
    13     @Override
    14     public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
    15         //function对传进来的tuple子集
    16         String log=tridentTuple.getStringByField("log");
    17         //解析
    18         int splitIndex=log.indexOf("IBEIfeng.gif")+13;
    19         String orderInfo=log.substring(splitIndex).split(" ")[0];
    20         String[] kvs=orderInfo.split("\&");
    21         Map<String,String> orderInfos=new HashMap<>();
    22         for(String kv:kvs){
    23             String[] keyValues=kv.split("=");
    24             if(keyValues.length==2){
    25                 orderInfos.put(keyValues[0],keyValues[1]);
    26             }
    27         }
    28         String orderId=getValue(orderInfos,"order_id","");//注意这个在日志中是这个字段
    29         String orderTime=getValue(orderInfos,"orderTime","");
    30         String orderAmtStr=getValue(orderInfos,"orderAmt","");
    31         String memberId=getValue(orderInfos,"memberId","");
    32         tridentCollector.emit(new Values(orderId,orderTime,orderAmtStr,memberId));
    33     }
    34 
    35     @Override
    36     public void prepare(Map map, TridentOperationContext tridentOperationContext) {
    37 
    38     }
    39 
    40     @Override
    41     public void cleanup() {
    42 
    43     }
    44 
    45     public String getValue(Map<String,String> map,String key,String defaultValue){
    46         if(map.containsKey(key)){
    47             return map.get(key);
    48         }else{
    49             return defaultValue;
    50         }
    51     }
    52 }

    5.效果

      

    二:投影操作

    1.说明

      可以对tuple进行裁剪操作。

    2.驱动类

      先投影

      然后打印

     1 package com.jun.trident;
     2 
     3 import backtype.storm.Config;
     4 import backtype.storm.LocalCluster;
     5 import backtype.storm.StormSubmitter;
     6 import backtype.storm.generated.AlreadyAliveException;
     7 import backtype.storm.generated.InvalidTopologyException;
     8 import backtype.storm.tuple.Fields;
     9 import backtype.storm.tuple.Values;
    10 import storm.trident.TridentTopology;
    11 import storm.trident.operation.Function;
    12 import storm.trident.operation.TridentCollector;
    13 import storm.trident.operation.TridentOperationContext;
    14 import storm.trident.testing.FixedBatchSpout;
    15 import storm.trident.tuple.TridentTuple;
    16 
    17 import java.util.Map;
    18 
    19 public class TridentDemo {
    20     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    21         TridentTopology tridentTopology=new TridentTopology();
    22         //模拟数据
    23         Fields field=new Fields("log","flag");
    24         FixedBatchSpout spout=new FixedBatchSpout(field,5,
    25             new Values("168.214.187.214 - - [1481953616092] "GET /view.php HTTP/1.1" 200 0 "http://cn.bing.com/search?q=spark mllib" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","A"),
    26             new Values("168.187.202.202 - - [1481953537038] "GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)" "-"","A"),
    27             new Values("61.30.167.187 - - [1481953539039] "GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","A"),
    28             new Values("30.29.132.190 - - [1481953544042] "GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53" "-"","B"),
    29             new Values("222.190.187.201 - - [1481953578068] "GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","B"),
    30             new Values("72.202.43.53 - - [1481953579069] "GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","B")
    31         );
    32         //多次循环
    33         spout.setCycle(true);
    34         //提交
    35         Config config=new Config();
    36         tridentTopology.newStream("orderAnalyse",spout)
    37                 //过滤
    38             .each(new Fields("log"),new ValidLogFilter())
    39                 //解析
    40             .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId"))
    41                 //投影
    42             .project(new Fields("orderId","orderTime","orderAmtStr","memberId"))
    43                 .each(new Fields("orderId","orderTime","orderAmtStr","memberId"),new PrintFilter())
    44 
    45         ;
    46         if(args==null || args.length<=0){
    47             LocalCluster localCluster=new LocalCluster();
    48             localCluster.submitTopology("tridentDemo",config,tridentTopology.build());
    49         }else {
    50             config.setNumWorkers(2);
    51             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
    52         }
    53     }
    54 }

    3.效果

      

    三:解析

    1.说明

      这个部分在解析之后,进入分流阶段。涉及到全局分流与局部分流,现在先使用全局分流,进行打印验证。

    2.聚合操作

      部分删除,部分增加

    3.驱动类

     1 package com.jun.trident;
     2 
     3 import backtype.storm.Config;
     4 import backtype.storm.LocalCluster;
     5 import backtype.storm.StormSubmitter;
     6 import backtype.storm.generated.AlreadyAliveException;
     7 import backtype.storm.generated.InvalidTopologyException;
     8 import backtype.storm.tuple.Fields;
     9 import backtype.storm.tuple.Values;
    10 import storm.trident.Stream;
    11 import storm.trident.TridentState;
    12 import storm.trident.TridentTopology;
    13 import storm.trident.operation.Function;
    14 import storm.trident.operation.TridentCollector;
    15 import storm.trident.operation.TridentOperationContext;
    16 import storm.trident.operation.builtin.Count;
    17 import storm.trident.testing.FixedBatchSpout;
    18 import storm.trident.testing.MemoryMapState;
    19 import storm.trident.tuple.TridentTuple;
    20 
    21 import java.util.Map;
    22 
    23 public class TridentDemo {
    24     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    25         TridentTopology tridentTopology=new TridentTopology();
    26         //模拟数据
    27         Fields field=new Fields("log","flag");
    28         FixedBatchSpout spout=new FixedBatchSpout(field,5,
    29             new Values("168.214.187.214 - - [1481953616092] "GET /view.php HTTP/1.1" 200 0 "http://cn.bing.com/search?q=spark mllib" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","A"),
    30             new Values("168.187.202.202 - - [1481953537038] "GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)" "-"","A"),
    31             new Values("61.30.167.187 - - [1481953539039] "GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","A"),
    32             new Values("30.29.132.190 - - [1481953544042] "GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53" "-"","B"),
    33             new Values("222.190.187.201 - - [1481953578068] "GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","B"),
    34             new Values("72.202.43.53 - - [1481953579069] "GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","B")
    35         );
    36         //多次循环
    37         spout.setCycle(true);
    38         //流处理
    39         Stream stream=tridentTopology.newStream("orderAnalyse",spout)
    40                 //过滤
    41             .each(new Fields("log"),new ValidLogFilter())
    42                 //解析
    43             .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId"))
    44                 //投影
    45             .project(new Fields("orderId","orderTime","orderAmtStr","memberId"))
    46                 //时间解析
    47             .each(new Fields("orderTime"),new DateTransFormerFunction(),new Fields("day","hour","minter"))
    48          ;
    49         //分流
    50         //1.基于minter统计订单数量,分组统计
    51         TridentState state=stream.groupBy(new Fields("minter"))
    52                 //全局聚合,使用内存存储状态信息
    53                 .persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("orderNumByMinter"));
    54         state.newValuesStream().each(new Fields("minter","orderNumByMinter"),new PrintFilter());
    55         
    56         //提交
    57         Config config=new Config();
    58         if(args==null || args.length<=0){
    59             LocalCluster localCluster=new LocalCluster();
    60             localCluster.submitTopology("tridentDemo",config,tridentTopology.build());
    61         }else {
    62             config.setNumWorkers(2);
    63             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
    64         }
    65     }
    66 }

    4.时间解析方法

     1 package com.jun.trident;
     2 
     3 
     4 import backtype.storm.tuple.Values;
     5 import org.slf4j.Logger;
     6 import org.slf4j.LoggerFactory;
     7 
     8 import storm.trident.operation.Function;
     9 import storm.trident.operation.TridentCollector;
    10 import storm.trident.operation.TridentOperationContext;
    11 import storm.trident.tuple.TridentTuple;
    12 
    13 import java.text.DateFormat;
    14 import java.text.SimpleDateFormat;
    15 import java.util.Date;
    16 import java.util.Map;
    17 
    18 public class DateTransFormerFunction implements Function {
    19     private static final Logger logger = LoggerFactory.getLogger(DateTransFormerFunction.class);
    20     @Override
    21     public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
    22         String orderTime=tridentTuple.getStringByField("orderTime");
    23         // 处理时间
    24         try {
    25             long timestamp = Long.parseLong(orderTime);
    26             Date date = new Date();
    27             date.setTime(timestamp);
    28             DateFormat df = new SimpleDateFormat("yyyyMMddHHmm");
    29             String dateStr = df.format(date);
    30             String day = dateStr.substring(0,8);
    31             String hour = dateStr.substring(0,10);
    32             String minute = dateStr ;
    33             //发射
    34             tridentCollector.emit(new Values(day,hour,minute));
    35         }catch (Exception e){
    36             logger.error("日期解析出错"+orderTime);
    37         }
    38     }
    39 
    40     @Override
    41     public void prepare(Map map, TridentOperationContext tridentOperationContext) {
    42 
    43     }
    44 
    45     @Override
    46     public void cleanup() {
    47 
    48     }
    49 }

    5.效果

      

  • 相关阅读:
    Python-炫酷二维码
    Dictionary 序列化与反序列化
    获取数据库所有表名与字段名
    LinQ To Object 基本用法
    使用jq操作脚本生成元素的事件
    表单验证如何让select设置为必选
    js实现复制功能兼容ios
    微信小程序使用函数防抖解决重复点击消耗性能问题
    electronr进行签名与公证
    使用electron在mac升级签名后进行升级出现“QRLUpdaterErrorDomain”的错误
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9162637.html
Copyright © 2011-2022 走看看