1.hadoop有master与slave,Storm与之对应的节点是什么?
2.Storm控制节点上面运行一个后台程序被称之为什么?
3.Supervisor的作用是什么?
4.Topology与Worker之间的关系是什么?
5.Nimbus和Supervisor之间的所有协调工作有master来完成,还是Zookeeper集群完成?
6.storm稳定的原因是什么?
7.如何运行Topology?
strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2
8.spout是什么?
9.bolt是什么?
10.Topology由两部分组成?
11.stream grouping有几种?
Storm对于实时计算的的意义相当于Hadoop对于批处理的意义。Hadoop为我们提供了Map和Reduce原语,使我们对数据进行批处理变的非常的简单和优美。同样,Storm也对数据的实时计算提供了简单Spout和Bolt原语。
Storm适用的场景:
1、流数据处理:Storm可以用来用来处理源源不断的消息,并将处理之后的结果保存到持久化介质中。
2、分布式RPC:由于Storm的处理组件都是分布式的,而且处理延迟都极低,所以可以Storm可以做为一个通用的分布式RPC框架来使用。
1、准备工作
2、一个Storm集群的基本组件
![](http://www.aboutyun.com/data/attachment/forum/201404/15/225641mt3v1okkkrkkp3rk.jpg)
![](http://www.aboutyun.com/data/attachment/forum/201404/15/225641plns69adsbclllak.jpg)
Nimbus和Supervisor之间的所有协调工作都是通过一个Zookeeper集群来完成。并且,nimbus进程和supervisor都是快
速失败(fail-fast)和无状态的。所有的状态要么在Zookeeper里面, 要么在本地磁盘上。这也就意味着你可以用kill
-9来杀死nimbus和supervisor进程, 然后再重启它们,它们可以继续工作,
就好像什么都没有发生过似的。这个设计使得storm不可思议的稳定。
3、Topologies
- strom jar all-your-code.jar backtype.storm.MyTopology arg1 arg2
</ol>
</div>
复制代码
-based语言提交的最简单的方法, 看一下文章: 在生产集群上运行topology去看看怎么启动以及停止topologies。
4、Stream
Bolt处理输入的Stream,并产生新的输出Stream。Bolt可以执行过滤、函数操作、Join、操作数据库等任何操作。Bolt是一个被动的
角色,其接口中有一个execute(Tuple input)方法,在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑。
5、数据模型(Data Model)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | publicclassDoubleAndTripleBoltimplementsIRichBolt { privateOutputCollectorBase _collector; @Override publicvoidprepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; } @Override publicvoidexecute(Tuple input) { intval = input.getInteger( 0 ); _collector.emit(input,newValues(val* 2 , val* 3 )); _collector.ack(input); } @Override publicvoidcleanup() { } @Override publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(newFields( "double" , "triple" )); } } |
1 2 3 4 5 6 | TopologyBuilder builder =newTopologyBuilder(); builder.setSpout( 1 ,newTestWordSpout(), 10 ); builder.setBolt( 2 ,newExclamationBolt(), 3 ) .shuffleGrouping( 1 ); builder.setBolt( 3 ,newExclamationBolt(), 2 ) .shuffleGrouping( 2 ); |
1 2 3 | builder.setBolt( 3 ,newExclamationBolt(), 5 ) .shuffleGrouping( 1 ) .shuffleGrouping( 2 ); |
1 2 3 4 5 6 7 8 | publicvoidnextTuple() { Utils.sleep( 100 ); finalString[] words =newString[] { "nathan" , "mike" , "jackson" , "golda" , "bertels" }; finalRandom rand =newRandom(); finalString word = words[rand.nextInt(words.length)]; _collector.emit(newValues(word)); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | publicstaticclassExclamationBoltimplementsIRichBolt { OutputCollector _collector; publicvoidprepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } publicvoidexecute(Tuple tuple) { _collector.emit(tuple,newValues(tuple.getString( 0 ) + "!!!" )); _collector.ack(tuple); } publicvoidcleanup() { } publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(newFields( "word" )); } } |
让我们看看怎么以local mode运行ExclamationToplogy。
1 2 3 4 5 6 7 8 9 | Config conf =newConfig(); conf.setDebug( true ); conf.setNumWorkers( 2 ); LocalCluster cluster =newLocalCluster(); cluster.submitTopology( "test" , conf, builder.createTopology()); Utils.sleep( 10000 ); cluster.killTopology( "test" ); cluster.shutdown(); |
- TOPOLOGY_WORKERS(setNumWorkers) 定义你希望集群分配多少个工作进程给你来执行这个topology. topology里面的每个组件会被需要线程来执行。每个组件到底用多少个线程是通过setBolt和setSpout来指定的。这些线程都运行在工作进 程里面. 每一个工作进程包含一些节点的一些工作线程。比如, 如果你指定300个线程,60个进程, 那么每个工作进程里面要执行6个线程, 而这6个线程可能属于不同的组件(Spout, Bolt)。你可以通过调整每个组件的并行度以及这些线程所在的进程数量来调整topology的性能。
-
TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每个组件所发射的每条消息。这在本地环境调试topology很有用, 但是在线上这么做的话会影响性能的。
</ul>
Worker processes(进程)
Executors (threads)(线程)
Tasks
![](http://www.aboutyun.com/data/attachment/forum/201404/15/225645ltl3s3llemtumcls.jpg)
![](http://www.aboutyun.com/data/attachment/forum/201404/15/225646ypt59zh9cothkgqv.jpg)
![](http://www.aboutyun.com/data/attachment/forum/201404/15/225647ly1jgghugy2vhomh.jpg)
![](http://www.aboutyun.com/data/attachment/forum/201404/15/225647ftl6c4tt6fv69t46.jpg)
7、流分组策略(Stream grouping)
1 2 3 4 5 6 7 | TopologyBuilder builder =newTopologyBuilder(); builder.setSpout( 1 ,newRandomSentenceSpout(), 5 ); builder.setBolt( 2 ,newSplitSentence(), 8 ) .shuffleGrouping( 1 ); builder.setBolt( 3 ,newWordCount(), 12 ) .fieldsGrouping( 2 ,newFields( "word" )); |
- 最简单的grouping是shuffle grouping, 它随机发给任何一个task。上面例子里面RandomSentenceSpout和SplitSentence之间用的就是shuffle grouping, shuffle grouping对各个task的tuple分配的比较均匀。
-
一种更有趣的grouping是fields grouping, SplitSentence和WordCount之间使用的就是fields
grouping, 这种grouping机制保证相同field值的tuple会去同一个task,
这对于WordCount来说非常关键,如果同一个单词不去同一个task, 那么统计出来的单词次数就不对了。
</ul>
l ShuffleGrouping:随机选择一个Task来发送。
l FiledGrouping:根据Tuple中Fields来做一致性hash,相同hash值的Tuple被发送到相同的Task。
l AllGrouping:广播发送,将每一个Tuple发送到所有的Task。
l GlobalGrouping:所有的Tuple会被发送到某个Bolt中的id最小的那个Task。
l NoneGrouping:不关心Tuple发送给哪个Task来处理,等价于ShuffleGrouping。
l DirectGrouping:直接将Tuple发送到指定的Task来处理。
8、使用别的语言来定义Bolt
1 2 3 4 5 6 7 8 9 | publicstaticclassSplitSentenceextendsShellBoltimplementsIRichBolt { publicSplitSentence() { super ( "python" , "splitsentence.py" ); } publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(newFields( "word" )); } } |
1 2 3 4 5 6 7 8 9 | importstorm classSplitSentenceBolt(storm.BasicBolt): defprocess(self, tup): words=tup.values[ 0 ].split( " " ) forwordinwords: storm.emit([word]) SplitSentenceBolt().run() |
9、可靠的消息处理
![](http://www.aboutyun.com/data/attachment/forum/201404/15/225649o59b9g665zwawafw.jpg)
Storm中的每一个Topology中都包含有一个Acker组件。Acker组件的任务就是跟踪从Spout中流出的每一个messageId所绑定
的Tuple树中的所有Tuple的处理情况。如果在用户设置的最大超时时间内这些Tuple没有被完全处理,那么Acker会告诉Spout该消息处理
失败,相反则会告知Spout该消息处理成功。
当Spout发射完某个MessageId对应的源Tuple之后,它会告诉Acker自己发射的RootId以及生成的那些源Tuple的Id。而当
Bolt处理完一个输入Tuple并产生出新的Tuple时,也会告知Acker自己处理的输入Tuple的Id以及新生成的那些Tuple的Id。
Acker只需要对这些Id进行异或运算,就能判断出该RootId对应的消息单元是否成功处理完成了。
原文:http://www.aboutyun.com/thread-7394-1-1.html
</div>
http请求的全过程
前端知识
jmeter连接MySQL数据库、sqlserver数据库
jmeter 生成随机字符串、获取当前时间
jmeter跨线程组传参
fiddler抓不到iis网站包的问题
jmeter登录数据库-- 通过windows身份验证方法(DNS)
ant安装(Windows系统)
tomcat安装(Windows系统)