1.storm概述
应用于实时的流式计算,结合消息队列和数据库进行使用。
Spouts:拓扑的消息源
Bolts:拓扑的处理逻辑单元,每个bolt可以在集群当中多实例的并发执行
tuple:消息元组,数据传递的封装形式
streams:流,不同的消息所经过的路径是不一样的
stream groupings:流的分组策略
Shuffle Grouping——随机分组,随机派发stream里面的tuple
Fields Grouping——按字段分组,具有同样字段的tuple会被分到相同的Bolts里的一个task,而不同的字段则会被分配到不同的bolts里的task
All Grouping——广播发送,对于每一个tuple,所有的bolts都会收到。
Global Grouping——全局分组,这个tuple被分配到storm中的一个bolt的其中一个task,再具体一点就是分配给id值最低的那个task
Non Grouping——不分组,和shuffle Grouping不同的是,Storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行
直接分组——
Tasks:任务处理单元
Executor:工作线程
Workers:工作进程
Configuration:topology的配置
运行结构:supervisor------>多个worker进程------>多个executor线程------>多个task实例
默认一个supervisor对应4个工作槽位
supervisor负责对worker进行负载均衡调度,当supervisor进程被终止后,worker会继续运行
Nimbus进程被终止后,新任务无法提交,但是原来的任务仍然可以继续运行。
2.集群的物理结构及安装
Nimbus负责集群的协调管理
supervisor负责具体的运算。
1)安装一个zookeeper集群
2)上传storm的安装包,解压
3)修改配置文件storm.yaml
#所使用的zookeeper集群主机 storm.zookeeper.servers: - "weekend05" - "weekend06" - "weekend07"
#nimbus所在的主机名 nimbus.host: "weekend05"
supervisor.slots.ports -6701 -6702 -6703 -6704 -6705
启动storm 在nimbus主机上 nohup ./storm nimbus 1>/dev/null 2>&1 &
nohup ./storm ui 1>/dev/null 2>&1 &
在supervisor主机上 nohup ./storm supervisor 1>/dev/null 2>&1 &
storm的深入学习: 分布式共享锁的实现 事务topology的实现机制及开发模式 在具体场景中的跟其他框架的整合(flume/activeMQ/kafka(分布式的消息队列系统) /redis/hbase/mysql cluster)
3.Storm与Hadoop的对比
1)Topology与MapReduce一个关键的区别是,一个MapReduce job最终会结束,而一个topology永远会运行。除非手动杀死进程
2)Nimbus 与 ResourManager 在Storm的集群里面有两种节点: 控制节点(master node)和工作节点(worker node)。控制节点上面运行一个叫Nimbus后台程序,它的作用类似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器, 并且监控状态。
3)Supervisor (worker进程)与NodeManager(YarnChild) 每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。每一个工作进程执行一个topology的一个子集;一个运行的topology由运行在很多机器上的很多工作进程组成。
4.Nimbus和Supervisor
Nimbus和Supervisor之间的所有协调工作都是通过Zookeeper集群完成。
Nimbus进程和Supervisor进程都是快速失败(fail-fast)和无状态的。所有的状态要么在zookeeper里面, 要么在本地磁盘上。
这也就意味着你可以用kill -9来杀死Nimbus和Supervisor进程, 然后再重启它们,就好像什么都没有发生过。这个设计使得Storm异常的稳定。
5.
conf.setNumWorkers(4) 表示设置了4个worker来执行整个topology的所有组件
builder.setBolt("boltA",new BoltA(), 4) ---->指明 boltA组件的线程数excutors总共有4个 builder.setBolt("boltB",new BoltB(), 4) ---->指明 boltB组件的线程数excutors总共有4个 builder.setSpout("randomSpout",new RandomSpout(), 2) ---->指明randomSpout组件的线程数excutors总共有4个
-----意味着整个topology中执行所有组件的总线程数为4+4+2=10个 ----worker数量是4个,有可能会出现这样的负载情况, worker-1有2个线程,worker-2有2个线程,worker-3有3个线程,worker-4有3个线程
如果指定某个组件的具体task并发实例数 builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8); ----意味着对于这个组件的执行线程excutor来说,一个excutor将执行8/4=2个task
6.kafka
1)kafka是一个分布式的消息系统
2)kafka集群中的服务器都叫做broker
3)kafka有两类客户端,一类叫producer,一类叫做consumer,客户端和broker服务器之间采用tcp协议连接
4)kafka中不同业务系统的消息可以通过topic进行区分,而且每一个消息topic都会被分区,以分担消息读写的负载。
5)每一个分区都可以有多个副本,以防止数据的丢失
6)某一个分区中的数据如果需要更新,都必须通过该分区所有副本中的leader来更新
7)消费者可以分组,比如有两个消费者A和B,共同消费一个topic:order_info,A和B所消费的消息不会重复。