Storm作为当前最流行的实时计算框架,自Twitter将其开源后就一直备受关注。由于其具有先天的稳定性以及便捷性,目前被许多大公司所采用,国外像雅虎、雅虎日本、Twitter、OOYALA、Spotify,国内像京东、腾讯、阿里等都使用Storm来完成大量实时计算来为用户提供优质服务。目前官方的最新发布版本是0.10.0。
本文将对官网的Storm手册进行翻译,由于本人英语能力有限,翻译难免有些不妥之处,望大家指正。点击此处阅读官网英文原版。
-------------------------------------------------------------------------------------------------------------
帮助手册
在本手册中,你将了解如何创建Storm的Topology和如何将它们部署到Storm的集群中。本文主要使用java作为演示语言,但是为了说明Storm支持多语言编程,一些示例将用Python编写。
前言
本手册使用storm-starter项目中的例子。建议在你的项目中复制这些示例并且在这个示例的基础上修改来满足你的需求。阅读设置Storm开发环境和创建Storm项目两篇文章来设置你的机器。
Storm集群
一个Storm集群和一个Hadoop集群非常像。不同的是Handoop上运行的是"MapReduce"任务,而在Storm运行的是"Topology"拓扑图(比较抽象,这里暂且翻译成拓扑图)。"MapReduce"和"Topology"是很不一样的 —— 一个主要的不同点是:"MapReduce"最终会完成,而Topology会一直执行下去(除非你手动将其kill掉)。
在Storm集群中有两种节点:master节点和worker节点。master节点运行一个叫"Nimbus"的守护进行,他非常像Hadoop中的"JobTracker"。Nimbus负责向集群分发代码,分配任务,并且监测主机是否出现故障。
每一个worker节点运行一个叫"Supervisor"的守护进程。它监听它所在机器上已经分配的任务,并且在必要时启动或者停止Nimbus已经分配给它的任务。每个worker进程执行一个Topology的子集;一个Topology由许多worker组成。
所有Nimbus和Supervisor之间的协调或者调度工作都由Zookeeper集群来完成。此外,Nimbus和Supervisor守护进程被设计成快速失败和无状态的;所有的状态被保存在Zookeeper集群或者本地磁盘上。这就意味着你可以使用"kill -9"来终止Nimbus和Supervisor,就像什么也没发生一样,Storm会继续正常运行。这种设计使得Storm集群非常稳定。下面是Storm的大体结构图:
Topology(拓扑图)
要在Storm上进行实时计算,你需要创建Topology。一个Topology就是一个图的计算。Topology中的每个节点都包含了逻辑处理,并且指明了节点之间如何传递数据。
运行一个topology是非常简单的。首先,你需要将你的代码和你代码所依赖的jar打成一个包。然后,运行下面的命令
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
上面运行了backtype.storm.MyTopology这个类,并且附带两个参数。这个类的主要功能是定义Topology并且将其提交到Nimbus,storm jar连接Nimbus并上传jar。
由于Topology被定义成Thrifts结构,而且Nimbus是一个Thrift服务,你可以用任何编程语言创建与提交Topology。上面的例子是最简单的并且是基于jvm语言的。阅读集群上运行Topology一文来获取更多的关于启动和停止Topology的信息。
Stream(流)
Stream是Storm的核心抽象概念,Stream是一个无界的元组序列。Storm通过一个分布式并且可靠的方式将原始流转换到新流。例如,你可能将推文流转换成一个热门话题。
Storm中完成流转的角色分别是"Spout"和"Bolt"。在你应用程序中,你需要实现"Spout"和"Bolt"提供的接口来完成业务逻辑。
Spout是Stream的来源。例如,Spout可能从Kestrel队列读取数据并且以流的形式将他们发射出去,或者Spout通过Twitter API获取数据来发射一个推文流。
Bolt消费任意数量的输入流,进行一些处理,然后以新流的形式发射它们。复杂的流转,比如通过计算将推文转换成热门话题,需要许多步骤,因此需要许多blot。Bolt可以做很多事情,过滤元组、流聚合、流连接、和数据库对话等等。
一个Spout和Bolt的网络被打包成一个Topology,它是你提交到Storm集群执行的最高层次的抽象。一个Topology是一个流转图,它的每个节点是一个Spout或者是一个Bolt。图中的边线表示Bolt订阅了哪些流。当Spout或者Bolt向流发射元组,它将向所有订阅它的Bolt发射这些流。下图展示了一个Topology模型:
Topology中节点之间的链接表明了元组如何被传递。例如,Spout A和Bolt B之间有一个链接,Spout A和Bolt C之间有一个链接,Bolt和Bolt C之间有一个链接。那么每次Spout A发射一个元组,它都会将此元组发射到Bolt B和Bolt C。同样所有Bolt B的输出也会发射到Bolt C。
Topology中的每个节点都是并行执行的。你可以指定每个节点有多少个线程并行执行,然后Storm会产生相应数量的线程交给集群去执行。
Topology会永远执行下去,知道你kill掉它。Storm会自动重新分配失败的任务。此外,即便机器故障或者消息丢失,Storm也会保证数据不会丢失。
Data model(数据模型)
Storm使用元组作为它的数据模型。一个元组可以使一个list,一个Object或者任何类型。此外,Storm支持所有的原始类型,S字符串、字节数组等作为它的元祖值。如果你想使用其他类型的对象,只需实现一个serializer接口即可。
Topology中的每个节点都必须声明一个它要发设的元组的类型。例如,下面的Bolt发射了"double"和"triple"两个元组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public class DoubleAndTripleBolt extends BaseRichBolt { private OutputCollectorBase _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override public void execute(Tuple input) { int val = input.getInteger( 0 ); _collector.emit(input, new Values(val* 2 , val* 3 )); _collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "double" , "triple" )); } } |
declareOutputFields方法为组件声明了["double","triple"]两个输出域。Bolt的详细功能将在后面章节中阐述。
A simple topology(Topology简单示例)
光说不练假把式,让我们来通过storm-starter中的ExclamationTopology实例来说明一个Topology如何编写。
1
2
3
4
|
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "word" , new TestWordSpout(), 10 ); builder.setBolt( "exclaim1" , new ExclamationBolt(), 3 ).shuffleGrouping( "word" ); builder.setBolt( "exclaim2" , new ExclamationBolt(), 2 ).shuffleGrouping( "exclaim1" ); |
上面的Topology示例包括一个Spout和两个Blot。Spout发射word,然后每个Bolt在其后追加一个"!!!"字符串。节点间的流转流程是:Spout首先发射到第一个Bolt,然后第一个Bolt再发射到第二个Bolt。如果Spout发射出的元组为["bob"]和["john"],那么经过第二个Bolt后将发射["bob!!!!!!"]和["john!!!!!!"]。
代码中指定节点用setSpout和setBolt方法。这两个方法都指定了三个入参,他们分别是:用户自己指定的id、用户的业务处理逻辑类和指定的并行运行的数量。在上面的例子中,Spout指定id为"words",Bolt分别指定了"exclaim1"和"exclaim2"作为id。
逻辑处理类分别实现了IRichSpout(Spout处理类实现)接口和IRichBolt(Bolt处理类实现)接口。
最后一个参数指定了节点并行运行的数量,是可选的。它表示集群中有多少个线程执行处理逻辑。如果不指定该参数,Storm默认会分配一个线程来执行。
setBolt返回一个InputDeclarer对象。组件"exclaim1"使用shuffleGrouping来表明它会接收"words"组件发射出来的所有元组,同样组件"exclaim2"使用shuffleGrouping来表明它会接收"exclaim1"发射出来的所有元组。"shuffleGrouping"表示元组会随机的从输入任务分发到Bolt任务。组件与组件之间有很多种方法实现数据分组,后面的章节将会详细阐述。
如果你想让组件"exclaim2"既读"words"组件发射出的元组又读"exclaim1"发射出的元组,你可以像下面这样写:
1
2
3
|
builder.setBolt( "exclaim2" , new ExclamationBolt(), 5 ) .shuffleGrouping( "words" ) .shuffleGrouping( "exclaim1" ); |
正如你上面看到的,可以链式的为Bolt指定多个输入源。
我们来详细看下这个Topology中的Spout和Bolt的实现。Spout向Topology中发射新消息。这个Topology中的TestWordSpout每隔100ms就会从从列表["nathan", "mike", "jackson", "golda", "bertels"]中随机取出一个字符串发射出去。TestWordSpout中的nextTuple()方法实现如下:
1
2
3
4
5
6
7
|
public void nextTuple() { Utils.sleep( 100 ); final String[] words = new String[] { "nathan" , "mike" , "jackson" , "golda" , "bertels" }; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit( new Values(word)); } |
如你所见,实现是非常简单的。
ExclamationBolt向输入源追加"!!!"字符串。我们来看下ExclamationBolt的整个实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString( 0 ) + "!!!" )); _collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word" )); } } |
prepare方法为Bolt提供了一个OutputCollector用来发射元组。元组可以在任何时候被发射出去--prepare、execute或者cleanup方法都可以,设置可以在另外一个异步线程中发射。在上面的示例中,prepare方法只是简单的保存了OutputCollector实例以备在后面的execute方法中使用。
execute方法从多个Bolt输入源中接收一个元组。ExckamationBolt获取元组的第一个字段,并且追加"!!!"字符串后将其发射出去。如果你要实现一个订阅了多个输入源的Bolt,可以使用Tuple的getSourceComponent方法便可以知道这个元组来自哪个组件。
execute中还可以做一些其他的事情,即将元组作为emit的第一个参数,并且在最后一行发送确认信息(ack)。这些是Storm API的一部分,以此来保证数据不丢失,后面章节会详解。
当Bolt停止运行并且需要清理我们打开的资源的时候,cleanup方法会被调用。但是,不保证集群会执行此方法:例如,如果运行任务的机器挂掉,那么没有任何办法来调用cleanup。仅在本地模式(在一个进程中模拟Storm集群)运行并且想kill掉Topology来节省资源时,建议你使用此方法。
declareOutputFields方法定义了ExclamationBolt发射的元组名为"word"。
getComponentConfiguration方法允许你指定组件的运行条件。后续章节会有Configuration的阐述。
cleanup和getComponentConfiguration两个方法在Bolt的实现中不是经常被用到。你可以使用提供了默认实现的基本类来简单的定义Bolt。如果实现BaseRichBolt类ExclamationBolt会更简洁,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString( 0 ) + "!!!" )); _collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields( "word" )); } } |
本地模式运行ExclamationTopology
本小节我们来看一下如何在本地模式运行ExclamationTopology。
Storm有两种运行模式:本地模式和集群模式。在本地模式下,Storm用线程模拟worker节点。本地模式对开发和测试Topology来说是非常有帮助的。当你运行storm-starter项目中的Topology示例的时候,它们在本地模式运行,并且你可以看见每个组件发射的信息。点击Local mode阅读更多的在本地模式下运行Topology的信息。
在分布式模式下,Storm运行在机器集群上。当你向master提交一个Topology的同时,你也需要提交此Topology的代码。master会分发你的代码并且分配worker运行你提交的Topology。如果worker挂掉,master会重新分配其他的worker。
点击在集群上运行Topology一文来阅读更多的关于如何在集群模式下运行Topology的信息。
下面是在本地模式下运行ExclamationTopology的代码:
1
2
3
4
5
6
7
8
9
|
Config conf = new Config(); conf.setDebug( true ); conf.setNumWorkers( 2 ); LocalCluster cluster = new LocalCluster(); cluster.submitTopology( "test" , conf, builder.createTopology()); Utils.sleep( 10000 ); cluster.killTopology( "test" ); cluster.shutdown(); |
首先,代码通过创建LocalCluster对象来指定伪集群。向伪集群提交Topology和向分布式集群提交Topology一样。调用submitTopology方法向LocalCluster提交Topology,第一个参数指定Topology的名称;第二个参数指定配置;第三个参数指定要提交的Topology。
名称用于辨别Topology以便你在后面kill掉它。一个Topology会一直运行下去直到你kill掉它。
configuration被用来设置Topology运行的各种参数。下面提到的两个参数会经常用到:
1、TOPOLOGY_WORKERS(使用setNumWorkers方法设置)参数指定集群非配多少个进程来运行Topology。每个Topology中的组件都会以多线程的方式运行。分配给组件的线程数量通过setBolt和setSpout的方法设置。这些线程存在于工作进程中。每个工作进程包括了若干组件的若干线程。例如,在你的组件中指定了300个线程,在配置中指定了50个工作进程,那么每个工作进程将执行6个线程,每个线程都有可能属于不同的组件。你可以通过调整Topology中各个组件的并行性,和工作进程的线程数来调优Storm。
2、TOPOLOGY_DEBUG(使用setDebug来设置),当它设置为true的时候,就告诉Storm记录每个组件的每个信息。这在本地测试Topology时非常有用,然而当运行在生产环境的集群中时,你可能希望关闭日志。
你可以设置很多Topology的参数,点击the Javadoc for Config来查看详细信息。
如果想了解如果设置开发环境来在本地模式下运行Topology(例如Eclipse),查看Creating a new Storm project。
Stream groupings(流分组)
一个流分组阐述了Topology如何在两个组件间发送元组。记住,Spout和Bolt以任务的形式在集群中并行执行,像下面图中展示的那样:
当Bolt A的任务发送给Bolt B,那么Bolt的任务将会发给谁呢?
"stream grouping"通过告诉Storm如何在任务集合间发送元组回答了这个问题。在我们深入了解各种stream grouping钱,我们先看一下storm-starter项目中的另一个Topology。WordCountTopology通过Spout读取句子,然后WordCountBolt再每个单词输出之前输出它的次数:
1
2
3
4
5
6
7
|
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout( "sentences" , new RandomSentenceSpout(), 5 ); builder.setBolt( "split" , new SplitSentence(), 8 ) .shuffleGrouping( "sentences" ); builder.setBolt( "count" , new WordCount(), 12 ) .fieldsGrouping( "split" , new Fields( "word" )); |
SplitSentence发送它收到的每个句子的每个单词的元组,WordCount在内存中保存单词和次数的Map映射。每次WordCount接收到一个单词,它都会更新单词的数量。
Storm有一些不同的流分组。其中最简单的分组叫做"shuffle grouping",它会发射元组到一个随机的任务。WordCountTopology中使用了"shuffle grouping" 来从RandomSentenceSpout向SplitSentence发射元组。元组将均匀的分发到所有SplitSentence的任务中。
一个更有趣的分组叫做"fields grouping"。"fields grouping"被用于SplitSentence和WordCount之间。它有一个关键的作用是保证相同的单词总是被分发到同一个任务中。否则,不止一个任务会得到相同的单词,它们将发射一个错误的值,因为它们得到的是不完整的信息。"fields grouping"允许你通过字段的子集进行分组。这会导致和子集相等的值会去到同一个任务。由于WordCount通过在字段"word"使用"fields grouping" 来订阅SplitSentence的输出流,所以相同的单词会由同一个任务执行并且Bolt生产出正确的输出。
"fields grouping"是实现流连接、流聚合以及其他更多的情况的基础。"fields grouping"是通过取模哈希来实现的。
还有一些其他的stream grouping,你可以阅读更多关于stream grouping的概念。