zoukankan      html  css  js  c++  java
  • 【原】Storm分布式RPC

    5. Storm高级篇

    序列化

    分布式RPC

    High level overview

    LinearDRPCTopologyBuilder

    Local mode DRPC

    Remote mode DRPC

    更复杂的例子

    Non-linear DRPC topologies

    LinearDRPCTopologyBuilder如何起作用

    Advanced



    分布式RPC

    分布式 RPC(DRPC)的设计目标是充分利用 Storm 的计算能力实现高密度的并行实时计算。Storm 接收若干个函数参数作为输入流,然后通过 DRPC 输出这些函数调用的结果。严格来说,DRPC 并不能算作是 Storm 的一个特性,因为它只是一种基于 Storm 原语 (Stream、Spout、Bolt、Topology) 实现的计算模式。虽然可以将 DRPC 从 Storm 中打包出来作为一个独立的库,但是与 Storm 集成在一起显然更有用。

    High level overview

    分布式RPC是通过“DRPC server”协调处理的(Storm用一个包来实现该功能)。DRPC server 负责接收 RPC 请求,并将该请求发送到 Storm 中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。因此,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。例如,以下是一个使用参数 “http://twitter.com” 调用 “reach” 函数计算结果的例子:

    DRPCClient client = new DRPCClient("drpc-host", 3772);
    String result = client.execute("reach", "http://twitter.com");
    

    分布式RPC工作流示意图如下所示:

    客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC server中接收一个函数调用流,DRPC Server会为每个函数调用都标记了一个唯一的 id,随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC Server,根据函数调用的 id 来将函数调用的结果返回。

    LinearDRPCTopologyBuilder

    Storm中提供了名为LinearDRPCTopologyBuilder 的topology builder,它几乎自动完成了DRPC的所有步骤,如下所示:
    1.设置spout
    2.向DRPC server返回运行结果。
    3.给bolts提供了聚集元组的功能。
    让我们一起看一下简单的例子,该例子是DRPC topology的一个实现并返回结果为输入附加字符串“!”。

    public static class ExclaimBolt extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String input = tuple.getString(1);
            collector.emit(new Values(tuple.getValue(0), input + "!"));
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "result"));
        }}
    public static void main(String[] args) throws Exception {
        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
        builder.addBolt(new ExclaimBolt(), 3);
        // ...}
    

    正如你所见,当创建LinearDRPCTopologyBuilder 时,你需要让topology知道 DRPC函数的名字。单个DRPC server负责很多函数的协调处理,且这些函数的功能不同。你声明的第一个bolt将接受一个2元组,第一个域是请求id,第二个域是请求参数。LinearDRPCTopologyBuilder 中最后一个bolt会输出形式为[id,result]的2元组输出流。最后,所有中间结果的元组的第一个域必须包括请求id。
    在本例子中,ExclaimBolt 只是简单地给第二个域附加字符串“!”。LinearDRPCTopologyBuilder 继续和DRPC server通信并将结果返回。

    Local mode DRPC

    DRPC可以以本地模式运行,下面以本地模式运行的例子:

    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
    System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
    cluster.shutdown();
    drpc.shutdown();
    

    首先你会创建一个 LocalDPRC 对象,该对象会在进程中模拟一个 DRPC 服务器,就像LocalCluster 在进程中模拟 Storm 集群的功能一样。然后,创建LocalCluster以本地模式运行topology 。LinearDRPCTopologyBuilder 有独立的方法用于创建本地topologies 和远程topologies 。在本地模式下,LocalDRPC 对象没有绑定任何端口所以topology需要知道正在和它进行通信的对象,这是方法createLocalTopology 接受LocalDRPC 对象作为输入参数的原因。
    在启动拓扑后,你可以使用 execute 方法来完成 DRPC 调用。

    Remote mode DRPC

    在一个真实的集群中使用 DRPC 有以下三个步骤:
    1.启动 DRPC Server;
    2.配置 DRPC Server的地址;
    3.将 DRPC topologies 提交到集群运行。
    可以像 Nimbus、Supervisor 那样使用 storm 命令来启动 DRPC Serve,如下:
    bin/storm drpc
    接下来,你需要在集群上配置 DRPC Server的地址。这是为了让 DRPCSpout 获取从哪里触发函数调用的方法。可以通过编辑 storm.yaml 或者添加拓扑配置的方式实现配置。配置 storm.yaml 的方式类似于下面这样:

    drpc.servers:
      - "drpc1.foo.com"
      - "drpc2.foo.com"
    

    最后,你可以像其他拓扑一样使用 StormSubmitter 来启动拓扑。以下是使用远程模式构造拓扑的一个例子:

    StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
    

    createRemoteTopology 方法是用来创建集群模式下运行的topologies 。

    更复杂的例子

    上面描述的exclamation DRPC是为了说明DRPC的简单例子。下面让我们共同学习一下更复杂的例子-Storm集群并行计算的DRPC函数调用,该例子是计算Twitter上URL的访问。
    URL访问是指不同的人在Twitter上发的推文,你需要完成如下计算:
    1.获取所有tweeted了该URL的所有人。
    2.获取所有关注了1中的所有人。
    3.2中所有人的set集合。
    4.统计3中set集合的个数。
    一次计算可能涉及上百次的数据库调用和数以千万计的关注记录,这计算规模确实很大。正如你所见,实现Storm函数是非常简单的。在单台机器上,计算需要1分钟;但在集群中,即使最难计算的URL访问也只需数秒。
    一个简单的访问topology 可以在storm-starter中找到。下面是定义访问topology 的具体步骤:

    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
    builder.addBolt(new GetTweeters(), 3);
    builder.addBolt(new GetFollowers(), 12)
            .shuffleGrouping();builder.addBolt(new PartialUniquer(), 6)
            .fieldsGrouping(new Fields("id", "follower"));
    builder.addBolt(new CountAggregator(), 2)
            .fieldsGrouping(new Fields("id"));
    

    topology 的执行按照以下四步骤:
    1.GetTweeters 得到tweeted了URL的用户。它将[id,url]格式的输入流转换为[id,tweeter]格式的输出流。每个url将映射为多个tweeter元组。
    2.GetFollowers得到tweeters的关注者。它将[id,tweeter]格式的输入流转换为[id,follower]格式的输出流。这些任务中,可能有重复的元组,因为一些人可能关注了多个人都tweeted了同样的URL。
    3.PartialUniquer根据关注者id分组。这会导致同样的关注者在同样的任务中处理,所以每个PartialUniquer 的任务将会接受多个互补的关注者集合。一旦PartialUniquer 接受了所有的关注者元组,它将会输出关注者子集合元素的个数。
    4.最后,CountAggregator 从PartialUniquer 接受部分count值然后累加完成整个计算,并返回结果。
    下面看PartialUniquer bolt的代码实现:

    public class PartialUniquer extends BaseBatchBolt {
        BatchOutputCollector _collector;
        Object _id;
        Set<String> _followers = new HashSet<String>();
    
        @Override
        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
            _collector = collector;
            _id = id;
        }
    
        @Override
        public void execute(Tuple tuple) {
            _followers.add(tuple.getString(1));
        }
    
        @Override
        public void finishBatch() {
            _collector.emit(new Values(_id, _followers.size()));
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "partial-count"));
        }}
    

    PartialUniquer 通过继承BaseBatchBolt 间接实现了IBatchBolt 接口。batch bolt提供了第一个类API,用于将一批元组作为一个具体单元进行处理。对于每个请求id,会创建一个新的batch bolt实例,在适当时候Storm也会清理这些实例。
    当PartialUniquer 在execute方法中接受了一个关注者元组时,会将该元组添加到请求id的set集合中。
    当任务中的一批元组处理完成后会调用batch bolts的finishBatch 方法,该方法输出关注者id集合元素个数形式的一元组。在后台,CoordinatedBolt 用来监测何时给定bolt已经接受到了请求id所有的元组,它用direct stream来管理。
    topology 中剩下的部分都能很容易明白,正如你看到的,每个访问计算步骤都是并行完成的,并且定义DRPC topology是非常简单的。

    Non-linear DRPC topologies

    LinearDRPCTopologyBuilder 只处理线性DRPC的topology,它的计算是一个序列步骤。不难想象功能需求将需要更复杂的topology ,它可能涉及到bolts的分支与整合。

    LinearDRPCTopologyBuilder如何起作用

    DRPCSpout 发送[args,return-info]元组。return-info是DRPC server的主机名、端口号和生成的id。
    topology的构造参数包括:
    DRPCSpout
    PrepareRequest (生成请求id和创建返回信息stream和参数stream)
    CoordinatedBolt wrappers and direct groupings
    JoinResult (返回信息进行join操作)
    ReturnResult (连接到DRPC server并返回结果)
    LinearDRPCTopologyBuilder 是一个很好的例子

    Advanced

    KeyedFairBolt 同时处理多个请求。
    如何直接使用CoordinatedBolt。

  • 相关阅读:
    mysql小结
    微信小程序中使用iconfont图标
    微信小程序添加底部自定义导航栏(tabBar)
    微信小程序实现简单计算器
    使用koa2做一个简单的图片上传web
    Promise 对象
    css常用的布局属性
    微信小程序知识点总结(持续更新中)
    Cannot read property 'data' of undefined;at api showModal success callback function
    Asp.net Core WebApi使用Swagger
  • 原文地址:https://www.cnblogs.com/yourarebest/p/6011490.html
Copyright © 2011-2022 走看看