一:介绍
1.说明
Storm里面引入DRPC主要是利用storm的实时计算能力来并行化CPU intensive的计算。DRPC的storm topology以函数的参数流作为输入,而把这些函数调用的返回值作为topology的输出流。
2.工作机制
Distributed RPC是由一个”DPRC Server”协调的(storm自带了一个实现)DRPC服务器协调
1) 接收一个RPC请求。
2) 发送请求到storm topology
3) 从storm topology接收结果。
4) 把结果发回给等待的客户端。从客户端的角度来看一个DRPC调用跟一个普通的RPC调用没有任何区别。
3.工作流程
客户端给DRPC服务器发送要执行的方法的名字,以及这个方法的参数。
实现了这个函数的topology使用 DRPCSpout
从DRPC服务器接收函数调用流。
每个函数调用被DRPC服务器标记了一个唯一的id。 这个topology然后计算结果,在topology的最后一个叫做 ReturnResults
的bolt会连接到DRPC服务器,并且把这个调用的结果发送给DRPC服务器(通过那个唯一的id标识)。DRPC服务器用那个唯一id来跟等待的客户端匹配上,唤醒这个客户端并且把结果发送给它。
二:本地DRPC
1.主驱动类
1 package com.jun.tridentWithHbase; 2 3 import backtype.storm.Config; 4 import backtype.storm.LocalCluster; 5 import backtype.storm.LocalDRPC; 6 import backtype.storm.StormSubmitter; 7 import backtype.storm.generated.AlreadyAliveException; 8 import backtype.storm.generated.InvalidTopologyException; 9 import backtype.storm.tuple.Fields; 10 import backtype.storm.tuple.Values; 11 import org.apache.storm.hbase.trident.state.HBaseMapState; 12 import storm.trident.Stream; 13 import storm.trident.TridentState; 14 import storm.trident.TridentTopology; 15 import storm.trident.operation.builtin.Count; 16 import storm.trident.operation.builtin.MapGet; 17 import storm.trident.operation.builtin.Sum; 18 import storm.trident.state.OpaqueValue; 19 import storm.trident.state.StateFactory; 20 import storm.trident.testing.FixedBatchSpout; 21 import storm.trident.testing.MemoryMapState; 22 23 public class TridentDemo { 24 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 25 TridentTopology tridentTopology=new TridentTopology(); 26 //模拟数据 27 Fields field=new Fields("log","flag"); 28 FixedBatchSpout spout=new FixedBatchSpout(field,5, 29 new Values("168.214.187.214 - - [1481953616092] "GET /view.php HTTP/1.1" 200 0 "http://cn.bing.com/search?q=spark mllib" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","A"), 30 new Values("168.187.202.202 - - [1481953537038] "GET /IBEIfeng.gif?order_id=1063&orderTime=1481953537038&memberId=4000012340500607&productInfos=10005-2099.48-B-1|10004-1886.62-A-2|10001-961.99-A-1&orderAmt=6834.70 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2;Tident/6.0)" "-"","A"), 31 new Values("61.30.167.187 - - [1481953539039] "GET /IBEIfeng.gif?order_id=1064&orderTime=1481953539039&memberId=4000930409959999&productInfos=10007-3329.13-B-1|10009-2607.71-B-1|10002-390.62-A-1|10006-411.00-B-2&orderAmt=7149.46 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","A"), 32 new Values("30.29.132.190 - - [1481953544042] "GET /IBEIfeng.gif?order_id=1065&orderTime=1481953544043&memberId=1234568970080798&productInfos=10005-2099.48-B-1|10001-3242.40-C-2|10006-411.00-B-1&orderAmt=8995.28 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 7_)_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53" "-"","B"), 33 new Values("222.190.187.201 - - [1481953578068] "GET /IBEIfeng.gif?order_id=1066&orderTime=1481953578068&memberId=3488586887970809&productInfos=10005-2099.48-B-1|10001-2774.16-C-2&orderAmt=7647.80 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; rv:2.0.1) Gecko/20100101 Firefox/4.0.1" "-"","B"), 34 new Values("72.202.43.53 - - [1481953579069] "GET /IBEIfeng.gif?order_id=1067&orderTime=1481953579069&memberId=2084859896989877&productInfos=10007-3329.13-B-1|10001-961.99-A-2&orderAmt=5253.10 HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19" "-"","B") 35 ); 36 //多次循环 37 spout.setCycle(true); 38 //流处理 39 Stream stream=tridentTopology.newStream("orderAnalyse",spout) 40 //过滤 41 .each(new Fields("log"),new ValidLogFilter()) 42 //解析 43 .each(new Fields("log"), new LogParserFunction(),new Fields("orderId","orderTime","orderAmtStr","memberId")) 44 //投影 45 .project(new Fields("orderId","orderTime","orderAmtStr","memberId")) 46 //时间解析 47 .each(new Fields("orderTime"),new DateTransFormerFunction(),new Fields("day","hour","minter")) 48 ; 49 //分流 50 //1.基于minter统计订单数量,分组统计 51 TridentState state=stream.groupBy(new Fields("minter")) 52 //全局聚合,使用内存存储状态信息 53 .persistentAggregate(new MemoryMapState.Factory(),new Count(),new Fields("orderNumByMinter")); 54 // state.newValuesStream().each(new Fields("minter","orderNumByMinter"),new PrintFilter()); 55 56 //2.另一个流,基于分钟的订单金额,局部聚合 57 Stream partitionStream=stream.each(new Fields("orderAmtStr"),new TransforAmtToDoubleFunction(),new Fields("orderAmt")) 58 .groupBy(new Fields("minter")) 59 //局部聚合 60 .chainedAgg() //聚合链 61 .partitionAggregate(new Fields("orderAmt"),new LocalSum(),new Fields("orderAmtSumOfLocal")) 62 .chainEnd(); //聚合链 63 64 //做一次全局聚合 65 TridentState partitionState=partitionStream.groupBy(new Fields("minter")) 66 //全局聚合 67 .persistentAggregate(new MemoryMapState.Factory(),new Fields("orderAmtSumOfLocal"),new Sum(),new Fields("totalOrderAmt")); 68 partitionState.newValuesStream().each(new Fields("minter","totalOrderAmt"),new PrintFilter()); 69 70 //提交 71 Config config=new Config(); 72 if(args==null || args.length<=0){ 73 //应该是构建一个DRPC的服务器 74 LocalDRPC localDRPC=new LocalDRPC(); 75 tridentTopology.newDRPCStream("orderDataServer",localDRPC) 76 //参数处理 77 .each(new Fields("args"),new RequestParamsParserFunction(),new Fields("date")) 78 //查询,重要的参数是上面的partitionState 79 .stateQuery(partitionState,new Fields("date"),new MapGet(),new Fields("totalAmtByMinter")) 80 //投影 81 .project(new Fields("date","totalAmtByMinter")); 82 //提交任务 83 LocalCluster localCluster=new LocalCluster(); 84 localCluster.submitTopology("tridentDemo",config,tridentTopology.build()); 85 //获取值 86 String jsonResult=localDRPC.execute("orderDataServer","201612171345 201612171345"); 87 System.out.println("***"+jsonResult+"***"); 88 89 }else { 90 config.setNumWorkers(2); 91 StormSubmitter.submitTopology(args[0],config,tridentTopology.build()); 92 } 93 } 94 }
2.请求参数处理类
1 package com.jun.tridentWithHbase; 2 3 import backtype.storm.tuple.Values; 4 import storm.trident.operation.Function; 5 import storm.trident.operation.TridentCollector; 6 import storm.trident.operation.TridentOperationContext; 7 import storm.trident.tuple.TridentTuple; 8 9 import java.util.Map; 10 11 public class RequestParamsParserFunction implements Function { 12 @Override 13 public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) { 14 String parameters=tridentTuple.getStringByField("args"); 15 String[] params=parameters.split(" "); 16 for (String param:params){ 17 tridentCollector.emit(new Values(param)); 18 } 19 } 20 21 @Override 22 public void prepare(Map map, TridentOperationContext tridentOperationContext) { 23 24 } 25 26 @Override 27 public void cleanup() { 28 29 } 30 }
3.效果
三:集群模式的DRPC
1.主驱动类
1 config.setNumWorkers(2); 2 //集群上构建DRPC服务器 3 tridentTopology.newDRPCStream("orderDataServer") 4 //参数处理 5 .each(new Fields("args"),new RequestParamsParserFunction(),new Fields("date")) 6 //查询,重要的参数是上面的partitionState 7 .stateQuery(partitionState,new Fields("date"),new MapGet(),new Fields("totalAmtByMinter")) 8 //投影 9 .project(new Fields("date","totalAmtByMinter")); 10 StormSubmitter.submitTopology(args[0],config,tridentTopology.build());
2.配置DRPC服务和端口
3.启动storm
4.启动Drpc进程
在drpc.servers参数所指定的服务器上。
命令:nohup bin/storm drpc >>/dev/null 2>&1 &
查看端口是否正常打开:netstat -tlnup | grep 3772
5.将jar包提交到集群上
6.编写客户端
1 package com.jun.tridentWithKafka; 2 3 import backtype.storm.generated.DRPCExecutionException; 4 import backtype.storm.utils.DRPCClient; 5 import org.apache.thrift7.TException; 6 7 public class DrpcClientDemo { 8 public static void main(String[] args) { 9 DRPCClient drpcClient=new DRPCClient("linux-hadoop01.ibeifeng.com",3772); 10 try { 11 String jsonResult=drpcClient.execute("orderDataServer","201612171345 201612171345"); 12 System.out.println("==="+jsonResult+"==="); 13 } catch (TException e) { 14 e.printStackTrace(); 15 } catch (DRPCExecutionException e) { 16 e.printStackTrace(); 17 } 18 } 19 }