zoukankan      html  css  js  c++  java
  • Storm实时计算:流操作入门编程实践

    转自:http://shiyanjun.cn/archives/977.html

    Storm实时计算:流操作入门编程实践

    Storm是一个分布式是实时计算系统,它设计了一种对流和计算的抽象,概念比较简单,实际编程开发起来相对容易。下面,简单介绍编程实践过程中需要理解的Storm中的几个概念:

    • Topology

    Storm中Topology的概念类似于Hadoop中的MapReduce Job,是一个用来编排、容纳一组计算逻辑组件(Spout、Bolt)的对象(Hadoop MapReduce中一个Job包含一组Map Task、Reduce Task),这一组计算组件可以按照DAG图的方式编排起来(通过选择Stream Groupings来控制数据流分发流向),从而组合成一个计算逻辑更加负责的对象,那就是Topology。一个Topology运行以后就不能停止,它会无限地运行下去,除非手动干预(显式执行bin/storm kill )或意外故障(如停机、整个Storm集群挂掉)让它终止。

    • Spout

    Storm中Spout是一个Topology的消息生产的源头,Spout应该是一个持续不断生产消息的组件,例如,它可以是一个Socket Server在监听外部Client连接并发送消息,可以是一个消息队列(MQ)的消费者、可以是用来接收Flume Agent的Sink所发送消息的服务,等等。Spout生产的消息在Storm中被抽象为Tuple,在整个Topology的多个计算组件之间都是根据需要抽象构建的Tuple消息来进行连接,从而形成流。

    • Bolt

    Storm中消息的处理逻辑被封装到Bolt组件中,任何处理逻辑都可以在Bolt里面执行,处理过程和普通计算应用程序没什么区别,只是需要根据Storm的计算语义来合理设置一下组件之间消息流的声明、分发、连接即可。Bolt可以接收来自一个或多个Spout的Tuple消息,也可以来自多个其它Bolt的Tuple消息,也可能是Spout和其它Bolt组合发送的Tuple消息。

    • Stream Grouping

    Storm中用来定义各个计算组件(Spout、Bolt)之间流的连接、分组、分发关系。Storm定义了如下7种分发策略:Shuffle Grouping(随机分组)、Fields Grouping(按字段分组)、All Grouping(广播分组)、Global Grouping(全局分组)、Non Grouping(不分组)、Direct Grouping(直接分组)、Local or Shuffle Grouping(本地/随机分组),各种策略的具体含义可以参考Storm官方文档、比较容易理解。

    下面,作为入门实践,我们简单介绍几种开发中常用的流操作处理方式的实现:

    Storm组件简单串行

    这种方式是最简单最直观的,只要我们将Storm的组件(Spout、Bolt)串行起来即可实现,只需要了解编写这些组件的基本方法即可。在实际应用中,如果我们需要从某一个数据源连续地接收消息,然后顺序地处理每一个请求,就可以使用这种串行方式来处理。如果说处理单元的逻辑非常复杂,那么就需要处理逻辑进行分离,属于同一类操作的逻辑封装到一个处理组件中,做到各个组件之间弱耦合(除了定义Field的schema外,只通过发送消息来连接各个组件)。
    下面,我实现一个简单的WordCount的例子,各个组件之间的连接方式,如下图所示:
    wordcount-topology
    ProduceRecordSpout类是一个Spout组件,用来产生消息,我们这里模拟发送一些英文句子,实际应用中可以指定任何数据源,如数据库、消息中间件、Socket连接、RPC调用等等。ProduceRecordSpout类代码如下所示:

    01 public static class ProduceRecordSpout extends BaseRichSpout {
    02  
    03      private static final long serialVersionUID = 1L;
    04      private static final Log LOG = LogFactory.getLog(ProduceRecordSpout.class);
    05      private SpoutOutputCollector collector;
    06      private Random random;
    07      private String[] records;
    08      
    09      public ProduceRecordSpout(String[] records) {
    10           this.records = records;
    11      }
    12      
    13      @Override
    14      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    15           this.collector = collector;   
    16           random = new Random();
    17      }
    18  
    19      @Override
    20      public void nextTuple() {
    21           Utils.sleep(500);
    22           String record = records[random.nextInt(records.length)];
    23           List<Object> values = new Values(record);
    24           collector.emit(values, values);
    25           LOG.info("Record emitted: record=" + record);
    26      }
    27  
    28      @Override
    29      public void declareOutputFields(OutputFieldsDeclarer declarer) {
    30           declarer.declare(new Fields("record"));        
    31      }
    32 }

    构造一个ProduceRecordSpout对象时,传入一个字符串数组,然后随机地选择其中一个句子,emit到下游(Downstream)的WordSplitterBolt组件,只声明了一个Field,WordSplitterBolt组件可以根据声明的Field,接收到emit的消息,WordSplitterBolt类代码实现如下所示:

    01 public static class WordSplitterBolt extends BaseRichBolt {
    02  
    03      private static final long serialVersionUID = 1L;
    04      private static final Log LOG = LogFactory.getLog(WordSplitterBolt.class);
    05      private OutputCollector collector;
    06      
    07      @Override
    08      public void prepare(Map stormConf, TopologyContext context,
    09                OutputCollector collector) {
    10           this.collector = collector;             
    11      }
    12  
    13      @Override
    14      public void execute(Tuple input) {
    15           String record = input.getString(0);
    16           if(record != null && !record.trim().isEmpty()) {
    17                for(String word : record.split("\s+")) {
    18                     collector.emit(input, new Values(word, 1));
    19                     LOG.info("Emitted: word=" + word);
    20                     collector.ack(input);
    21                }
    22           }
    23      }
    24  
    25      @Override
    26      public void declareOutputFields(OutputFieldsDeclarer declarer) {
    27           declarer.declare(new Fields("word""count"));        
    28      }
    29      
    30 }

    在execute方法中,传入的参数是一个Tuple,该Tuple就包含了上游(Upstream)组件ProduceRecordSpout所emit的数据,直接取出数据进行处理。上面代码中,我们将取出的数据,按照空格进行的split,得到一个一个的单词,然后在emit到下一个组件,声明的输出schema为2个Field:word和count,当然这里面count的值都为1。
    进行统计词频的组件为WordCounterBolt,实现代码如下所示:

    01 public static class WordCounterBolt extends BaseRichBolt {
    02  
    03      private static final long serialVersionUID = 1L;
    04      private static final Log LOG = LogFactory.getLog(WordCounterBolt.class);
    05      private OutputCollector collector;
    06      private final Map<String, AtomicInteger> counterMap = Maps.newHashMap();
    07      
    08      @Override
    09      public void prepare(Map stormConf, TopologyContext context,
    10                OutputCollector collector) {
    11           this.collector = collector;             
    12      }
    13  
    14      @Override
    15      public void execute(Tuple input) {
    16           String word = input.getString(0);
    17           int count = input.getIntegerByField("count"); // 通过Field名称取出对应字段的数据
    18           AtomicInteger ai = counterMap.get(word);
    19           if(ai == null) {
    20                ai = new AtomicInteger(0);
    21                counterMap.put(word, ai);
    22           }
    23           ai.addAndGet(count);
    24           LOG.info("DEBUG: word=" + word + ", count=" + ai.get());
    25           collector.ack(input);
    26      }
    27  
    28      @Override
    29      public void declareOutputFields(OutputFieldsDeclarer declarer) {          
    30      }
    31      
    32      @Override
    33      public void cleanup() {
    34           // print count results
    35           LOG.info("Word count results:");
    36           for(Entry<String, AtomicInteger> entry : counterMap.entrySet()) {
    37                LOG.info(" word=" + entry.getKey() + ", count=" + entry.getValue().get());
    38           }
    39      }
    40  
    41 }

    上面代码通过一个Map来对每个单词出现的频率进行累加计数,比较简单。因为该组件是Topology的最后一个组件,所以不需要在declareOutputFields方法中声明Field的Schema,而是在cleanup方法中输出最终的结果,只有在该组件结束任务退出时才会调用cleanup方法输出。
    最后,需要基于上面的3个组件来创建一个Topology实例,提交到Storm集群去运行,配置代码如下所示:

    01 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
    02      // configure & build topology
    03      TopologyBuilder builder = new TopologyBuilder();
    04      String[] records = new String[] {
    05                "A Storm cluster is superficially similar to a Hadoop cluster",
    06                "All coordination between Nimbus and the Supervisors is done through a Zookeeper cluster",
    07                "The core abstraction in Storm is the stream"
    08      };
    09      builder
    10           .setSpout("spout-producer"new ProduceRecordSpout(records), 1)
    11           .setNumTasks(3);
    12      builder
    13           .setBolt("bolt-splitter"new WordSplitterBolt(), 2)
    14           .shuffleGrouping("spout-producer")
    15           .setNumTasks(2);
    16      builder.setBolt("bolt-counter"new WordCounterBolt(), 1)
    17           .fieldsGrouping("bolt-splitter"new Fields("word"))
    18           .setNumTasks(2);
    19      
    20      // submit topology
    21      Config conf = new Config();
    22      String name = WordCountTopology.class.getSimpleName();
    23      if (args != null && args.length > 0) {
    24           String nimbus = args[0];
    25           conf.put(Config.NIMBUS_HOST, nimbus);
    26           conf.setNumWorkers(2);
    27           StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    28      else {
    29           LocalCluster cluster = new LocalCluster();
    30           cluster.submitTopology(name, conf, builder.createTopology());
    31           Thread.sleep(60000);
    32           cluster.shutdown();
    33      }
    34 }

    上面通过TopologyBuilder来配置组成一个Topology的多个组件(Spout或Bolt),然后通过调用createTopology()方法创建一个Topology实例。上面方法中,对应着2种运行模式:如果没有传递任何参数,则是使用LocalCluster来运行,适合本地调试代码;如果传递一个Topology名称作为参数,则是在真实的Storm集群上运行,需要对实现的Topology代码进行编译打包,通过StormSubmitter提交到集群上作为服务运行。

    Storm组合多种流操作

    Storm支持流聚合操作,将多个组件emit的数据,汇聚到同一个处理组件来统一处理,可以实现对多个Spout组件通过流聚合到一个Bolt组件(Sout到Bolt的多对一、多对多操作),也可以实现对多个Bolt通过流聚合到另一个Bolt组件(Bolt到Bolt的多对一、多对多操作)。实际,这里面有两种主要的操作,一种是类似工作流中的fork,另一种是类似工作流中的join。下面,我们实现一个例子来演示如何使用,实时流处理逻辑如下图所示:
    multiple-streams-topology
    上图所描述的实时流处理流程,我们期望能够按照如下流程进行处理:

    • 存在3类数据:数字字符串(NUM)、字母字符串(STR)、特殊符号字符串(SIG)
    • 每个ProduceRecordSpout负责处理上面提到的3类数据
    • 所有数据都是字符串,字符串中含有空格,3种类型的ProduceRecordSpout所emit的数据都需要被相同的逻辑处理:根据空格来拆分字符串
    • 一个用来分发单词的组件DistributeWordByTypeBolt能够接收到所有的单词(包含类型信息),统一将每类单词分别分发到指定的一个用来存储数据的组件
    • SaveDataBolt用来存储处理过的单词,对于不同类型单词具有不同的存储逻辑,需要设置3类SaveDataBolt

    将Spout分为3类,每一个Spout发射不同类型的字符串,这里定义了一个Type常量类来区分这三种类型:

    1 interface Type {
    2      String NUMBER = "NUMBER";
    3      String STRING = "STRING";
    4      String SIGN = "SIGN";
    5 }

    首先看一下,我们实现的Topology是如何进行创建的,代码如下所示:

    01 public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
    02  
    03      // configure & build topology
    04      TopologyBuilder builder = new TopologyBuilder();
    05      
    06      // configure 3 spouts
    07      builder.setSpout("spout-number"new ProduceRecordSpout(Type.NUMBER, new String[] {"111 222 333""80966 31"}), 1);
    08      builder.setSpout("spout-string"new ProduceRecordSpout(Type.STRING, new String[] {"abc ddd fasko""hello the word"}), 1);
    09      builder.setSpout("spout-sign"new ProduceRecordSpout(Type.SIGN, new String[] {"++ -*% *** @@""{+-} ^#######"}), 1);
    10      
    11      // configure splitter bolt
    12      builder.setBolt("bolt-splitter"new SplitRecordBolt(), 2)
    13           .shuffleGrouping("spout-number")
    14           .shuffleGrouping("spout-string")
    15           .shuffleGrouping("spout-sign");
    16      
    17      // configure distributor bolt
    18      builder.setBolt("bolt-distributor"new DistributeWordByTypeBolt(), 6)
    19           .fieldsGrouping("bolt-splitter"new Fields("type"));
    20      
    21      // configure 3 saver bolts
    22      builder.setBolt("bolt-number-saver"new SaveDataBolt(Type.NUMBER), 3)
    23           .shuffleGrouping("bolt-distributor""stream-number-saver");
    24      builder.setBolt("bolt-string-saver"new SaveDataBolt(Type.STRING), 3)
    25           .shuffleGrouping("bolt-distributor""stream-string-saver");
    26      builder.setBolt("bolt-sign-saver"new SaveDataBolt(Type.SIGN), 3)
    27           .shuffleGrouping("bolt-distributor""stream-sign-saver");
    28      
    29      // submit topology
    30      Config conf = new Config();
    31      String name = MultiStreamsWordDistributionTopology.class.getSimpleName();
    32      if (args != null && args.length > 0) {
    33           String nimbus = args[0];
    34           conf.put(Config.NIMBUS_HOST, nimbus);
    35           conf.setNumWorkers(3);
    36           StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    37      else {
    38           LocalCluster cluster = new LocalCluster();
    39           cluster.submitTopology(name, conf, builder.createTopology());
    40           Thread.sleep(60 60 1000);
    41           cluster.shutdown();
    42      }
    43 }

    一个SplitRecordBolt组件从3个不同类型的ProduceRecordSpout接收数据,这是一个多Spout流聚合。SplitRecordBolt将处理后的数据发送给DistributeWordByTypeBolt组件,然后根据收到的数据的类型进行一个分发处理,这里用了fieldsGrouping操作,也就是SplitRecordBolt发送的数据会按照类型发送到不同的DistributeWordByTypeBolt任务(Task),每个Task收到的一定是同一个类型的数据,如果直接使用shuffleGrouping操作也没有问题,只不过每个Task可能收到任何类型的数据,在DistributeWordByTypeBolt内部进行流向控制。DistributeWordByTypeBolt组件中定义了多个stream,根据类型来分组发送给不同类型的SaveDataBolt组件。
    下面看每个组件的实现:

    • ProduceRecordSpout组件

    通过我们定义的一个ProduceRecordSpout类,可以创建3个不同的ProduceRecordSpout实例,每个实例负责生产特定类型的数据,实现代码如下所示:

    01 public static class ProduceRecordSpout extends BaseRichSpout {
    02  
    03           private static final long serialVersionUID = 1L;
    04           private static final Log LOG = LogFactory.getLog(ProduceRecordSpout.class);
    05           private SpoutOutputCollector collector;
    06           private Random rand;
    07           private String[] recordLines;
    08           private String type;
    09           
    10           public ProduceRecordSpout(String type, String[] lines) {
    11                this.type = type;
    12                recordLines = lines;
    13           }
    14           
    15           @Override
    16           public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    17                this.collector = collector;   
    18                rand = new Random();
    19           }
    20  
    21  
    22           @Override
    23           public void nextTuple() {
    24                Utils.sleep(500);
    25                String record = recordLines[rand.nextInt(recordLines.length)];
    26                List<Object> values = new Values(type, record);
    27                collector.emit(values, values);
    28                LOG.info("Record emitted: type=" + type + ", record=" + record);
    29           }
    30  
    31           @Override
    32           public void declareOutputFields(OutputFieldsDeclarer declarer) {
    33                declarer.declare(new Fields("type""record"));        
    34           }
    35      }

    这比较简单,根据传递的参数来创建上图中的3个Spout实例。

    • SplitRecordBolt组件

    由于前面3个ProduceRecordSpout产生的数据,在开始时的处理逻辑是相同的,所以可以将3个ProduceRecordSpout聚合到一个包含通用逻辑的SplitRecordBolt组件,实现如下所示:

    01 public static class SplitRecordBolt extends BaseRichBolt {
    02  
    03      private static final long serialVersionUID = 1L;
    04      private static final Log LOG = LogFactory.getLog(SplitRecordBolt.class);
    05      private OutputCollector collector;
    06      
    07      @Override
    08      public void prepare(Map stormConf, TopologyContext context,
    09                OutputCollector collector) {
    10           this.collector = collector;   
    11      }
    12  
    13      @Override
    14      public void execute(Tuple input) {
    15           String type = input.getString(0);
    16           String line = input.getString(1);
    17           if(line != null && !line.trim().isEmpty()) {
    18                for(String word  : line.split("\s+")) {
    19                     collector.emit(input, new Values(type, word));
    20                     LOG.info("Word emitted: type=" + type + ", word=" + word);
    21                     // ack tuple
    22                     collector.ack(input);
    23                }
    24           }
    25      }
    26  
    27      @Override
    28      public void declareOutputFields(OutputFieldsDeclarer declarer) {
    29           declarer.declare(new Fields("type""word"));
    30      }
    31 }

    无论接收到的Tuple是什么类型(STRING、NUMBER、SIGN)的数据,都进行split,然后在emit的时候,仍然将类型信息传递给下一个Bolt组件。

    • DistributeWordByTypeBolt组件

    DistributeWordByTypeBolt组件只是用来分发Tuple,通过定义Stream,将接收到的Tuple发送到指定的下游Bolt组件进行处理。通过SplitRecordBolt组件emit的Tuple包含了类型信息,所以在DistributeWordByTypeBolt中根据类型来进行分发,代码实现如下:

    01 public static class DistributeWordByTypeBolt extends BaseRichBolt {
    02  
    03      private static final long serialVersionUID = 1L;
    04      private static final Log LOG = LogFactory.getLog(DistributeWordByTypeBolt.class);
    05      private OutputCollector collector;
    06      
    07      @Override
    08      public void prepare(Map stormConf, TopologyContext context,
    09                OutputCollector collector) {
    10           this.collector = collector;   
    11           Map<GlobalStreamId, Grouping> sources = context.getThisSources();
    12           LOG.info("sources==> " + sources);
    13      }
    14  
    15      @Override
    16      public void execute(Tuple input) {
    17           String type = input.getString(0);
    18           String word = input.getString(1);
    19           switch(type) {
    20                case Type.NUMBER:
    21                     emit("stream-number-saver", type, input, word);
    22                     break;
    23                case Type.STRING:
    24                     emit("stream-string-saver", type, input, word);
    25                     break;
    26                case Type.SIGN:
    27                     emit("stream-sign-saver", type, input, word);
    28                     break;
    29                default:
    30                     // if unknown type, record is discarded.
    31                     // as needed, you can define a bolt to subscribe the stream 'stream-discarder'.
    32                     emit("stream-discarder", type, input, word);
    33           }
    34           // ack tuple
    35           collector.ack(input);
    36      }
    37      
    38      private void emit(String streamId, String type, Tuple input, String word) {
    39           collector.emit(streamId, input, new Values(type, word));
    40           LOG.info("Distribution, typed word emitted: type=" + type + ", word=" + word);
    41      }
    42  
    43      @Override
    44      public void declareOutputFields(OutputFieldsDeclarer declarer) {
    45           declarer.declareStream("stream-number-saver"new Fields("type""word"));
    46           declarer.declareStream("stream-string-saver"new Fields("type""word"));
    47           declarer.declareStream("stream-sign-saver"new Fields("type""word"));
    48           declarer.declareStream("stream-discarder"new Fields("type""word"));
    49      }
    50 }

    实际上,下游的3个Bolt组件(SaveDataBolt)在订阅该流组件(DistributeWordByTypeBolt)的时候,方式相同,只是分发的逻辑交由DistributeWordByTypeBolt来统一控制。
    我们在配置该Bolt组件时,使用了fieldsGrouping分组方式,实际每个DistributeWordByTypeBolt只会收到同一种类型的Tuple,这里也可以使用shuffleGrouping分组方式,这种分组方式会有不同类型的Tuple被emit到同一个DistributeWordByTypeBolt组件上。
    另外,该Bolt组件中我们还定义了一个名称为stream-discarder的stream,在Topology中并没有满足该stream的条件,可以根据实际情况选择是否实现它。

    • SaveDataBolt组件

    最后这个Bolt用来模拟保存处理过的数据内容,代码如下:

    01 public static class SaveDataBolt extends BaseRichBolt {
    02  
    03      private static final long serialVersionUID = 1L;
    04      private static final Log LOG = LogFactory.getLog(SaveDataBolt.class);
    05      private OutputCollector collector;
    06      
    07      private String type;
    08      
    09      public SaveDataBolt(String type) {
    10           this.type = type;
    11      }
    12      
    13      @Override
    14      public void prepare(Map stormConf, TopologyContext context,
    15                OutputCollector collector) {
    16           this.collector = collector;   
    17      }
    18  
    19      @Override
    20      public void execute(Tuple input) {
    21           // just print the received tuple for being waited to persist
    22           LOG.info("[" + type + "] " +
    23                     "SourceComponent=" + input.getSourceComponent() +
    24                     ", SourceStreamId=" + input.getSourceStreamId() +
    25                     ", type=" + input.getString(0) +
    26                     ", value=" + input.getString(1));
    27      }
    28  
    29      @Override
    30      public void declareOutputFields(OutputFieldsDeclarer declarer) {
    31           // do nothing        
    32      }
    33      
    34 }

    在实际应用中,你可能需要将处理过的数据保存到数据库中,就可以在该Bolt中实现存储数据的逻辑。

    总结

    Storm中最核心的计算组件的抽象就是Spout、Bolt,以及Stream Grouping,其它高级的功能,像Trident、DRPC,他们或者基于这些基础组件以及Streaming Grouping分发策略来实现的,屏蔽了底层的分发计算处理逻辑以更高层的编程抽象面向开发者,减轻了开发人员对底层复杂机制的处理;或者是为了方便使用Storm计算服务而增加的计算机制衍生物,如批量事务处理、RPC等。

    参考链接

    Creative Commons License

    本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

  • 相关阅读:
    tcp传送报文
    整理下本周工作中遇到的疑问;uid/euid/suid;docker镜像管理
    网络隔离
    ubuntu 只有客人会话登录(第一次深刻感受文件权限的威力 )
    ubuntu 只有客人会话登录(第一次深刻感受文件权限的威力)
    使用gdb查看栈帧的情况,有ebp
    使用gdb查看栈帧的情况, 没有ebp
    再看perf是如何通过dwarf处理栈帧的
    dwarf是如何处理栈帧的?
    数据库设计的误区—>CHAR与VARCHAR
  • 原文地址:https://www.cnblogs.com/wq3435/p/7390999.html
Copyright © 2011-2022 走看看