Storm 流式计算
1. 概念
1.1 离线计算和实时计算
离线计算:批量获取数据、批量传输数据、周期性批量计算数据、数据展示
代表技术:Sqoop批量导入数据、HDFS批量存储数据、MapReduce批量计算数据、Hive批量计算数据、zookeeper任务调度
1、hivesql
2、调度平台
3、Hadoop集群运维
4、数据清洗(脚本语言)
5、元数据管理
6、数据稽查
7、数据仓库模型架构
流式计算:数据实时产生、数据实时传输、数据实时计算、实时展示
代表技术:Flume实时获取数据、Kafka/metaq实时数据存储、Storm/JStorm实时数据计算、Redis实时结果缓存、持久化存储(mysql)。
一句话总结:将源源不断产生的数据实时收集并实时计算,尽可能快的得到计算结果
区别:
最大的区别:实时收集、实时计算、实时展示
1.2 Storm是什么?
Flume实时采集, 低延迟
Kafka消息队列, 低延迟
Storm实时计算, 低延迟
Redis实时存储, 低延迟
Storm用来实时处理数据,特点:低延迟、高可用、分布式、可扩展、数据不丢失。提供简单容易理解的接口,便于开发。
1.3 Storm与Hadoop的区别
- Storm用于实时计算,Hadoop用于离线计算。
- Storm处理的数据保存在内存中,源源不断;Hadoop处理的数据保存在文件系统中,一批一批。
- Storm的数据通过网络传输进来;Hadoop的数据保存在磁盘中。
- Storm与Hadoop的编程模型相似
Job:任务名称
JobTracker:项目经理
TaskTracker:开发组长、产品经理
Child:负责开发的人员
Mapper/Reduce:开发人员中的两种角色,一种是服务器开发、一种是客户端开发
Topology:任务名称
Nimbus:项目经理
Supervisor:开组长、产品经理
Worker:开人员
Spout/Bolt:开人员中的两种角色,一种是服务器开发、一种是客户端开发
Storm:进程、线程常驻内存运行,数据不进入磁盘,数据通过网络传递。
MapReduce:为TB、PB级别数据设计的批处理计算框架。
1.4 Storm应用场景及行业案例
Storm用来实时计算源源不断产生的数据,如同流水线生产。
- 运用场景
日志分析
从海量日志中分析出特定的数据,并将分析的结果存入外部存储器用来辅佐决策。
管道系统
将一个数据从一个系统传输到另外一个系统,比如将数据库同步到Hadoop
消息转化器
将接受到的消息按照某种格式进行转化,存储到另外一个系统如消息中间件
- 行业案例
一淘-实时分析系统:实时分析用户的属性,并反馈给搜索引擎
最初,用户属性分析是通过每天在云梯上定时运行的MR job来完成的。为了满足实时性的要求,希望能够实时分析用户的行为日志,将最新的用户属性反馈给搜索引擎,能够为用户展现最贴近其当前需求的结果。
携程-网站性能监控:实时分析系统监控携程网的网站性能
利用HTML5提供的performance标准获得可用的指标,并记录日志。Storm集群实时分析日志和入库。使用DRPC聚合成报表,通过历史数据对比等判断规则,触发预警事件。
阿里妈妈-用户画像:实时计算用户的兴趣数据
为了更加精准投放广告,阿里妈妈后台计算引擎需要维护每个用户的兴趣点(理想状态是,你对什么感兴趣,就向你投放哪类广告)。用户兴趣主要基于用户的历史行为、用户的实时查询、用户的实时点击、用户的地理信息而得,其中实时查询、实时点击等用户行为都是实时数据。考虑到系统的实时性,阿里妈妈使用Storm维护用户兴趣数据,并在此基础上进行受众定向的广告投放。
2. Storm核心组件(重要)
Nimbus:是整个集群的控管核心,负责topology的提交、运行状态监控、任务重新分配等工作。
zookeeper就是一个管理者,监控者,Storm的所有的状态信息都是保存在Zookeeper里面,nimbus通过在zookeeper上面写状态信息来分配任务,supervisor,task通过从zookeeper中读状态来领取任务,同时supervisor, task也会定义发送心跳信息到zookeeper,使得nimbus可以监控整个storm集群的状态,从而可以重启一些挂掉的task。ZooKeeper使得整个storm集群十分的健壮,任何一台工作机器挂掉都没有关系,只要重启然后从zookeeper上面重新获取状态信息就可以了。
Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。通过配置文件设置当前supervisor上启动多少个worker,默认4个。
Worker:运行具体处理组件逻辑的进程(在Supervisor)。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务。
Task:worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,不同spout/bolt的task可能会共享一个物理线程,该线程称为executor。
总体描述:nimbus下命令(分配任务),zk监督执行(心跳监控,worker、supurvisor的心跳都归它管),supervisor领旨(下载代码),招募人马(创建worker和线程等),worker、executor就给我干活!task就是具体要干的活。
Storm集群中有两类节点:主控节点(Master Node)和工作节点(Worker Node)。其中,主控节点只有一个,而工作节点可以有多个。
主控节点运行一个称为Nimbus的守护进程类似于Hadoop的JobTracker。Nimbus负责在集群中分发代码,对节点分配任务,并监视主机故障。
每个工作节点运行一个称为Supervisor的守护进程。Supervisor监听其主机上已经分配的主机的作业,启动和停止Nimbus已经分配的工作进程。
流分组,是拓扑定义中的一部分,为每个Bolt指定应该接收哪个流作为输入。流分组定义了流/元组如何在Bolt的任务之间进行分发。Storm内置了8种流分组方式。
Worker是Spout/Bolt中运行具体处理逻辑的进程。一个worker就是一个进程,进程里面包含一个或多个线程。
一个线程就是一个executor,一个线程会处理一个或多个任务。
一个任务就是一个task。
3. Storm编程模型(重要)
Topology:Storm中运行的一个实时应用程序的名称,因为各个组件间的消息流动而形成逻辑上的拓扑结构。(拓扑-DAG有向无环图的实现)
把实时应用程序的运行逻辑打成jar包后提交到Storm的拓扑(Topology)。Storm的拓扑类似于MapReduce的作业(Job)。其主要的区别是,MapReduce的作业最终会完成,而一个拓扑永远都在运行直到它被杀死。一个拓扑是一个图的Spout和Bolt的连接流分组。
Spout:在一个topology中获取源数据流的组件,Spout是拓扑的流的来源,是一个拓扑中产生源数据流的组件。通常情况下,Spout会从外部数据源中读取数据,然后转换为拓扑内部的源数据。
Spout可以是可靠的,也可以是不可靠的。如果Storm处理元组失败,可靠的Spout能够重新发射,而不可靠的Spout就尽快忘记发出的元组。
Spout可以发出超过一个流。
Spout的主要方法是nextTuple()。NextTuple()会发出一个新的Tuple到拓扑,如果没有新的元组发出,则简单返回。
Spout的其他方法是ack()和fail()。当Storm检测到一个元组从Spout发出时,ack()和fail()会被调用,要么成功完成通过拓扑,要么未能完成。Ack()和fail()仅被可靠的Spout调用。
IRichSpout是Spout必须实现的接口。
通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。
Bolt:接受数据然后执行处理的组件,用户可以在其中执行自己想要的操作。
在拓扑中所有处理都在Bolt中完成,Bolt是流的处理节点,从一个拓扑接收数据,然后执行进行处理的组件。Bolt可以完成过滤、业务处理、连接运算、连接与访问数据库等任何操作。
Bolt是一个被动的角色,接口中有一个execute()方法,在接收到消息后会调用此方法,用户可以在其中执行自己希望的操作。
Bolt可以完成简单的流的转换,而完成复杂的流的转换通常需要多个步骤,因此需要多个Bolt。
Bolt可以发出超过一个的流。
Tuple:是消息传递的基本单元,是一个命名的值列表,元组中的字段可以是任何类型的对象。Storm使用元组作为其数据模型,元组支持所有的基本类型、字符串和字节数组作为字段值,只要实现类型的序列化接口就可以使用该类型的对象。元组本来应该是一个key-value的Map,但是由于各个组件间传递的元组的字段名称已经事先定义好,所以只要按序把元组填入各个value即可,所以元组是一个value的List。
Stream:是一个无界的元组系列。源源不断传递的元组就组成了流,在分布式环境中并行地进行创建和处理。
4. 流式计算一般架构图(重要)
5. Storm vs Spark Streaming
5.1 Storm适用场景
- 需要纯实时,不能忍受1秒以上延迟的场景下使用,比如金融系统
- 对于延迟需求很高的纯粹的流处理工作负载
- 需求主要集中在流处理与CEP(即复杂事件处理)式处理层面
- 若还需要针对高峰低峰时间段,动态调整实时计算程序的并行度,以最大限度利用集群资源(通常是在小型公司,集群资源紧张的情况),也可以考虑用Storm
- 如果一个大数据应用系统,它就是纯粹的实时计算,不需要在中间执行SQL交互式查询、复杂的transformation算子等,那么用Storm是比较好的选择
5.2 Spark Streaming适用场景
- 如果对上述适用于Storm的几点,一条都不满足的实时场景,即:不要求纯实时,不要求动态调整并行度等,那么可以考虑使用Spark Streaming
- 考虑使用Spark Streaming最主要的一个因素,应该是针对整个项目进行宏观的考虑,即:如果一个项目除了实时计算之外,还包括了离线批处理、交互式查询等业务功能,而且实时计算中,可能还会牵扯到高延迟批处理、交互式查询等功能,那么就应该首选Spark生态,用Spark Core开发离线批处理,用Spark SQL开发交互式查询,用Spark Streaming开发实时计算,三者可以无缝整合,给系统提供非常高的可扩展性 Spark Streaming与Storm的优劣分析事实上,Spark Streaming绝对谈不上比Storm优秀。
- 必须利用交互式shell通过API调用实现数据探索
5.3 技术特点对比
对比项 | storm | spark |
---|---|---|
处理方式 | 流式数据处理、移动数据(数据流入计算节点) | 批处理数据、移动计算(针对数据形成任务进行计算) |
延迟性 | >=100ms | 2s左右 |
吞吐量 | Low | High |
容错性 | ack组件进行数据流的跟踪,开销大 | 通过lineage以及在内存维护两份数据备份进行容错 |
事务性 | 通过跟踪机制能保证每个记录至少被处理一次,如果需要保证状态只更新一次的话,需要由用户自己来实现。 | 保证数据只被处理一次,并且是在批处理的层次级别。对于statefull的计算,对事务性比较高的话,Spark streaming要更好一些。 |
动态调整并行度 | 支持 | 不支持 |
数据处理保证 | at least once(实现采用record-level acknowledgments),Trident可以支持storm 提供exactly once语义。 | exactly once(实现采用Chandy-Lamport 算法,即marker-checkpoint ) |
如果对延迟要求不高的情况下,建议使用Spark Streaming,丰富的高级API,使用简单,天然对接Spark生态栈中的其他组件,吞吐量大,部署简单,UI界面也做的更加智能,社区活跃度较高,有问题响应速度也是比较快的,比较适合做流式的ETL,而且Spark的发展势头也是有目共睹的,相信未来性能和功能将会更加完善。
6. 分组策略和并发度
6.1 分组策略(Stream Grouping)
stream grouping用来定义一个stream应该如何分配给Bolts上面的多个Executors(多线程、多并发)。Storm里面有7种类型的stream grouping
1)Shuffle Grouping: 随机分组,轮询,平均分配。随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
2)Fields Grouping**:按字段分组**,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolts里的一个task,而不同的userid则会被分配到不同的bolts里的task。
3)All Grouping**:广播发送**,对于每一个tuple,所有的bolts都会收到。
4)Global Grouping**:全局分组**,这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
5)Non Grouping**:不分组**,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是一样的效果。在多线程情况下不平均分配。
6)Direct Grouping**:直接分组**,这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为Direct Stream的消息流可以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
7)Local or Shuffle Grouping**:**如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发送给这些tasks。否则,和普通的Shuffle Grouping行为一致。
8**)customer Grouping**:自定义,相当于MapReduce自己自己去实现一个partition。
6.2 并发度
6.2.1 场景分析
1)单线程下:加减乘除、全局汇总
2)多线程下:局部加减乘除、持久化DB等
(1)思考:如何计算:word总数和word个数?并且在高并发下完成
前者是统计总行数,后者是去重word个数;
类似企业场景:计算网站PV和UV
(2)网站最常用的两个指标:
PV(page views):count (session_id) 即页面浏览量。
UV(user views):count(distinct session_id) 即独立访客数。
a)用ip地址分析
指访问某个站点或点击某个网页的不同IP地址的人数。在同一天内,UV只记录第一次进入网站的具有独立IP的访问者,在同一天内再次访问该网站则不计数。
b)用Cookie分析UV值
当客户端第一次访问某个网站服务器的时候,网站服务器会给这个客户端的电脑发出一个CookieCCookieCookie
实时处理的业务场景主要包括:汇总型(如网站PV、销售额、订单数)、去重型(网站UV、顾客数、销售商品数)
6.2.2 并发度
并发度:用户指定一个任务,可以被多个线程执行,并发度的数量等于线程executor的数量。
Task就是具体的处理逻辑对象,一个executor线程可以执行一个或多个tasks,但一般默认每个executor只执行一个task,所以我们往往认为task就是执行线程,其实不是。
Task代表最大并发度,一个component的task数是不会改变的,但是一个componet的executer数目是会发生变化的(storm rebalance命令),task数>=executor数,executor数代表实际并发数。
对于并发度的配置, 在storm里面可以在多个地方进行配置, 优先级为:
defaults.yaml < storm.yaml < topology-specific configuration
<internal component-specific configuration < external component-specific configuration
worker processes的数目, 可以通过配置文件和代码中配置, worker就是执行进程, 所以考虑并发的效果, 数目至少应该大亍machines的数目
executor的数目, component的并发线程数,只能在代码中配置(通过setBolt和setSpout的参数), 例如, setBolt(“green-bolt”, new GreenBolt(), 2)
tasks的数目, 可以不配置, 默认和executor1:1, 也可以通过setNumTasks()配置
Topology的worker数通过config设置,即执行该topology的worker(java)进程数。它可以通过 storm rebalance 命令任意调整。
TopologyBuilder topologyBuilder = new TopologyBuilder();
Config conf = new Config();
conf.setNumWorkers(2); // 用2个worker
topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // 设置2个并发度
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4).shuffleGrouping("blue-spout"); // 设置2个并发度,4个任务
topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6).shuffleGrouping("green-bolt"); // 设置6个并发度
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, topologyBuilder.createTopology());
3个组件的并发度加起来是10,就是说拓扑一共有10个executor,一共有2个worker,每个worker产生10 / 2 = 5条线程。
绿色的bolt配置成2个executor和4个task。为此每个executor为这个bolt运行2个task。
动态的改变并行度
Storm支持在不 restart topology 的情况下, 动态的改变(增减) worker processes 的数目和 executors 的数目, 称为rebalancing. 通过Storm web UI,或者通过storm rebalance命令实现:
storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
集群部署
1、 集群部署的基本流程
集群部署的流程:下载安装包、解压安装包、修改配置文件、分发安装包、启动集群
注意:
启动zookeeper集群
所有的集群上都需要配置hosts
vi /etc/hosts
192.168.239.128 storm01 zk01 hadoop01
192.168.239.129 storm02 zk02 hadoop02
192.168.239.130 storm03 zk03 hadoop03
2、 集群部署的基础环境准备
安装前的准备工作(zk集群已经部署完毕)
关闭防火墙
chkconfig iptables off && setenforce 0
创建用户
groupadd realtime && useradd realtime && usermod -a -G realtime realtime
创建工作目录并赋权
mkdir /export
mkdir /export/servers
chmod 755 -R /export
切换到realtime用户下
su realtime
3、Storm集群部署
3.1、上传安装包
3.2、解压安装包
tar -zxvf apache-storm-0.9.5.tar.gz -C /export/servers/
cd /export/servers/
ln -s apache-storm-0.9.5 storm
3.3、修改配置文件
mv /export/servers/storm/conf/storm.yaml /export/servers/storm/conf/storm.yaml.bak
vi /export/servers/storm/conf/storm.yaml
输入以下内容:
3.4、分发安装包
scp -r /export/servers/apache-storm-0.9.5 storm02:/export/servers
然后分别在各机器上创建软连 接
cd /export/servers/
ln -s apache-storm-0.9.5 storm
3.5、启动集群
在nimbus.host所属的机器上启动 nimbus服务
cd /export/servers/storm/bin/
nohup ./storm nimbus &
在nimbus.host所属的机器上启动ui服务
cd /export/servers/storm/bin/
nohup ./storm ui &
在其它个点击上启动supervisor服务
cd /export/servers/storm/bin/
nohup ./storm supervisor &
3.6、查看集群
访问nimbus.host:/8080,即可看到storm的ui界面。