zoukankan      html  css  js  c++  java
  • Trident中的DRPC实现

    一:介绍

    1.说明

      Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流。

    2.工作机制

      Distributed RPC是由一个”DPRC Server”协调的(storm自带了一个实现)DRPC服务器协调

      1) 接收一个RPC请求。

      2) 发送请求到storm topology 

      3) 从storm topology接收结果。

      4) 把结果发回给等待的客户端。从客户端的角度来看一个DRPC调用跟一个普通的RPC调用没有任何区别。

    3.工作流程

      

      客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。

      实现了这个函数的topology使用 DRPCSpout 从DRPC服务器接收函数调用流。

      每个函数调用被DRPC服务器标记了一个唯一的id。 这个topology然后计算结果,在topology的最后一个叫做 ReturnResults 的bolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。

    二:本地DRPC

    1.主驱动类

     1 package com.jun.tridentWithHbase;
     2 
     3 import backtype.storm.Config;
     4 import backtype.storm.LocalCluster;
     5 import backtype.storm.LocalDRPC;
     6 import backtype.storm.StormSubmitter;
     7 import backtype.storm.generated.AlreadyAliveException;
     8 import backtype.storm.generated.InvalidTopologyException;
     9 import backtype.storm.tuple.Fields;
    10 import backtype.storm.tuple.Values;
    11 import org.apache.storm.hbase.trident.state.HBaseMapState;
    12 import storm.trident.Stream;
    13 import storm.trident.TridentState;
    14 import storm.trident.TridentTopology;
    15 import storm.trident.operation.builtin.Count;
    16 import storm.trident.operation.builtin.MapGet;
    17 import storm.trident.operation.builtin.Sum;
    18 import storm.trident.state.OpaqueValue;
    19 import storm.trident.state.StateFactory;
    20 import storm.trident.testing.FixedBatchSpout;
    21 import storm.trident.testing.MemoryMapState;
    22 
    23 public class TridentDemo {
    24     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    25         TridentTopology tridentTopology=new TridentTopology();
    26         //模拟数据
    27         Fields field=new Fields("log","flag");
    28         FixedBatchSpout spout=new FixedBatchSpout(field,5,
    29             new Values("168.214.187.214 - - [1481953616092] "GET /view.php HTTP/1.1" 200 0 "http://cn.bing.com/search?q=spark mllib" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","A"),
    30             new Values("168.187.202.202 - - [1481953537038] "GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)" "-"","A"),
    31             new Values("61.30.167.187 - - [1481953539039] "GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","A"),
    32             new Values("30.29.132.190 - - [1481953544042] "GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53" "-"","B"),
    33             new Values("222.190.187.201 - - [1481953578068] "GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","B"),
    34             new Values("72.202.43.53 - - [1481953579069] "GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","B")
    35         );
    36         //多次循环
    37         spout.setCycle(true);
    38         //流处理
    39         Stream stream=tridentTopology.newStream("orderAnalyse",spout)
    40                 //过滤
    41             .each(new Fields("log"),new ValidLogFilter())
    42                 //解析
    43             .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId"))
    44                 //投影
    45             .project(new Fields("orderId","orderTime","orderAmtStr","memberId"))
    46                 //时间解析
    47             .each(new Fields("orderTime"),new DateTransFormerFunction(),new Fields("day","hour","minter"))
    48          ;
    49         //分流
    50         //1.基于minter统计订单数量,分组统计
    51         TridentState state=stream.groupBy(new Fields("minter"))
    52                 //全局聚合,使用内存存储状态信息
    53                 .persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("orderNumByMinter"));
    54 //        state.newValuesStream().each(new Fields("minter","orderNumByMinter"),new PrintFilter());
    55 
    56         //2.另一个流,基于分钟的订单金额,局部聚合
    57         Stream partitionStream=stream.each(new Fields("orderAmtStr"),new TransforAmtToDoubleFunction(),new Fields("orderAmt"))
    58             .groupBy(new Fields("minter"))
    59                     //局部聚合
    60                 .chainedAgg()    //聚合链
    61             .partitionAggregate(new Fields("orderAmt"),new LocalSum(),new Fields("orderAmtSumOfLocal"))
    62                 .chainEnd();      //聚合链
    63 
    64         //做一次全局聚合
    65         TridentState partitionState=partitionStream.groupBy(new Fields("minter"))
    66                 //全局聚合
    67                 .persistentAggregate(new MemoryMapState.Factory(),new Fields("orderAmtSumOfLocal"),new Sum(),new Fields("totalOrderAmt"));
    68         partitionState.newValuesStream().each(new Fields("minter","totalOrderAmt"),new PrintFilter());
    69 
    70         //提交
    71         Config config=new Config();
    72         if(args==null || args.length<=0){
    73             //应该是构建一个DRPC的服务器
    74             LocalDRPC localDRPC=new LocalDRPC();
    75             tridentTopology.newDRPCStream("orderDataServer",localDRPC)
    76                     //参数处理
    77                 .each(new Fields("args"),new RequestParamsParserFunction(),new Fields("date"))
    78                     //查询,重要的参数是上面的partitionState
    79                 .stateQuery(partitionState,new Fields("date"),new MapGet(),new Fields("totalAmtByMinter"))
    80                     //投影
    81                 .project(new Fields("date","totalAmtByMinter"));
    82             //提交任务
    83             LocalCluster localCluster=new LocalCluster();
    84             localCluster.submitTopology("tridentDemo",config,tridentTopology.build());
    85             //获取值
    86             String jsonResult=localDRPC.execute("orderDataServer","201612171345 201612171345");
    87             System.out.println("***"+jsonResult+"***");
    88 
    89         }else {
    90             config.setNumWorkers(2);
    91             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
    92         }
    93     }
    94 }

    2.请求参数处理类

     1 package com.jun.tridentWithHbase;
     2 
     3 import backtype.storm.tuple.Values;
     4 import storm.trident.operation.Function;
     5 import storm.trident.operation.TridentCollector;
     6 import storm.trident.operation.TridentOperationContext;
     7 import storm.trident.tuple.TridentTuple;
     8 
     9 import java.util.Map;
    10 
    11 public class RequestParamsParserFunction implements Function {
    12     @Override
    13     public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
    14         String parameters=tridentTuple.getStringByField("args");
    15         String[] params=parameters.split(" ");
    16         for (String param:params){
    17             tridentCollector.emit(new Values(param));
    18         }
    19     }
    20 
    21     @Override
    22     public void prepare(Map map, TridentOperationContext tridentOperationContext) {
    23 
    24     }
    25 
    26     @Override
    27     public void cleanup() {
    28 
    29     }
    30 }

    3.效果

      

    三:集群模式的DRPC 

    1.主驱动类

     1  config.setNumWorkers(2);
     2             //集群上构建DRPC服务器
     3             tridentTopology.newDRPCStream("orderDataServer")
     4                     //参数处理
     5                     .each(new Fields("args"),new RequestParamsParserFunction(),new Fields("date"))
     6                     //查询,重要的参数是上面的partitionState
     7                     .stateQuery(partitionState,new Fields("date"),new MapGet(),new Fields("totalAmtByMinter"))
     8                     //投影
     9                     .project(new Fields("date","totalAmtByMinter"));
    10             StormSubmitter.submitTopology(args[0],config,tridentTopology.build());

    2.配置DRPC服务和端口

      

    3.启动storm

    4.启动Drpc进程

      在drpc.servers参数所指定的服务器上。

      命令:nohup bin/storm drpc >>/dev/null 2>&1 &

      查看端口是否正常打开:netstat -tlnup | grep 3772

    5.将jar包提交到集群上

    6.编写客户端

     1 package com.jun.tridentWithKafka;
     2 
     3 import backtype.storm.generated.DRPCExecutionException;
     4 import backtype.storm.utils.DRPCClient;
     5 import org.apache.thrift7.TException;
     6 
     7 public class DrpcClientDemo {
     8     public static void main(String[] args) {
     9         DRPCClient drpcClient=new DRPCClient("linux-hadoop01.ibeifeng.com",3772);
    10         try {
    11             String jsonResult=drpcClient.execute("orderDataServer","201612171345 201612171345");
    12             System.out.println("==="+jsonResult+"===");
    13         } catch (TException e) {
    14             e.printStackTrace();
    15         } catch (DRPCExecutionException e) {
    16             e.printStackTrace();
    17         }
    18     }
    19 }

      

  • 相关阅读:
    SQL SERVER 和EXCEL的数据导入导出
    常用SQL语句
    ASP.NET 中 ContentType 类型
    Centos7 安装MongoDB
    Scrapy:pipeline管道的open_spider、close_spider
    pipreqs(找当前项目依赖的包)
    Docker版本Jenkins的使用
    CentOS安装nginx,部署vue项目
    centos7中安装mysql
    flask框架使用flaskmigrate进行数据库的管理,非常方便!!!
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9175496.html
Copyright © 2011-2022 走看看