Spark和Storm
Spark基于MapReduce算法实现的分布式计算,不同于MapReduce的是,作业中间结果可以保存在内存中,而不要再读写HDFS,
Spark适用于数据挖掘和机器学习等需要迭代的MapReduce算法
Spark Streaming是建立在Spark上的实时计算框架,可以结合流式、批处理和交互式进行查询和实时计算,
基本原理是将Stream数据分成小的时间片段,以类似batch批量处理的方式来处理这些小部分数据
Spark Streaming相比于基于Record的其他处理框架(Storm),弹性分布式数据集更容易实现高效的容错处理;
此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法,方便了一些需要历史数据和实时数据联合分析的特定应用场合
Spark Streaming和Storm两个框架都提供了可扩展性和容错性,根本区别在于处理模型,Storm处理的是每次传入的一个事件,
而Spark Streaming是处理某个时间段窗口内的事件流。因此,Storm处理一个时间可以达到极低的延迟。
Hadoop和Storm
Topology=spout+Bolt
Hadoop上运行的是Job(Mapper/Reducer),Storm上运行的是Topology(Spout/Bolt),Job会运行结束,Topology会一直运行下去
Hadoop集群包含(Master Node/Worker Node),对应到Storm集群上的(主节点Nimbus/工作节点Supervisor)
Hadoop集群上的(JobTracker/TaskTracker)对应到Storm集群上的(Nimbus/Supervisor)
Storm架构
Supervisor-->Worker(n个Executor(n个Task))-->Topology
Nimbus和Supervisor通过Zookeeper通信,并且这两个进程都是无状态和快速失败的,所有状态只存在于Zookeeper和本地磁盘上
Spout获取数据源的数据,调用nextTuple函数,发射数据供Bolt消费,发射的数据单元叫Tuple(消息传递的基本单元),源源不断的Tuple组成了Stream
客户端提交Topology代码到Nimbus,Nimbus针对该Topology建立本地的目录,Nimbus中的调度器根据Topology的配置计算Task,并把Task分配到不同的Worker上,调度的结果
写入ZooKeeper中,ZooKeeper上建立assignments节点,存储Task和Supervisor中Worker的对应关系。在ZooKeeper上创建workerbeats节点来监控Worker的心跳。
Supervisor去ZooKeeper上获取分配的Tasks信息,启动一个或者多个Worker来执行,每个Worker上运行多个Task,Task由Executor来具体执行。
Worker根据Topology信息初始化建立Task之间的链接,相同Worker类的Task通过DisrupterQueue来通信,不同Worker间默认采用Netty来通信,然后整个Topology就运行起来了
topologies包含所有Topology的静态信息,而cluster中包含了Topology的运行态信息,根据topologies和cluster中的信息,就可以进行真正的调度分配
在worker中,线程间通信使用的是Disruptor,而进程间的通信也就是Worker跟Worker之间的通信使用的是IContext接口实现,也可能是Netty和ZMQ,默认使用Netty
在storm中的backtype.storm.task包中含有若干上下文(GeneralTopologyContextWorkerTopologyContextTopologyContext),用于记录Topology或者Storm中信息
StormTopology类中定义了很多可以操作读取内部信息的方法
Task是在Executor中,通过调用mk-task方法来创建一个新的task,并通过调用mk-task-data函数为该Task创建对应的数据
Topology、work、Executor、task以及组件关系
一个组件(spout/bolt)包含的Executor数量是由在提交Topology时设置的并行度决定的
Topology最终会调度成一个或多个worker,每个worker即为一个真正的操作系统执行进程,
每个worker又可以有多个task,每个task是storm中进行计算的最小的运行单位,也就是spout或者bolt实例,
spout 的nextTuple()会在同一个循环内被ack()和fail()周期性的调用。没有任务时它必须释放对线程的控制,其它方法才有机会得以执行。
executor是worker生成的线程,该线程运行着相同的组件(spout或bolt)的一个或多个task,一个task执行着实际的数据处理
1个组件的task数量总是一样的,但是1个组件的executor的数量可以改变,thread的数量<=task的数量
parallelism_hint参数指定的是bolt的初始的executor的数量
eg
1)
builder.setSpout("id",new Spout(),2);//两个线程执行spout
builder.setBolt("id",new Bolt(),2);//两个线程执行bolt
stormConf.setNumworkers(3);//work数
因为每一个worker默认都会占用一个executor(每个executor会启动一个acker任务(task)),7个executor,7个task
可以在topology中取消acker任务,这样的话就不会多出来一个executor和任务了
加上stormConf.setNumAckers(0);2个executor,2个task
2) 一个线程executor 执行多个 任务task(默认一个executor对应一个task)
int worknum = 3;
builder.setSpout("spout", new RandomSpout(),worknum).setNumTasks(worknum*2); 3个executor,6个task
builder.setBolt("bolt", new SenqueceBolt(),2*worknum).shuffleGrouping("spout").setNumTasks(worknum*2); 6个executor,6个task
conf.setNumWorkers(worknum);
conf.setNumAckers(0);
9个executor,12个task
Stream分组,即消息的分区方法,共7种内置分组方式,也可以通过CustomStreamGrouping接口来定义自己的分组
1 shuffle分组 保证同一级Bolt上的每个Task处理的Tuple数量一致
2 Fields分组 根据Tuple中的某一个Field或者多个Field的值来划分,具有相同Field的会被分发到同一个Task上
3 All分组 所有的Tuple都会分发到所有的Task上,为每个接收数据的实例复制一份元组副本。这种分组方式用于向bolts发送信号。
4 Global分组 整个Stream会选择一个Task作为分发的目的地,通常是具有最新ID的Task
5 None分组 等同于shuffle分组
6 Direct分组 产生数据的Spout/Botl自己明确决定这个Tuple被Bolt的哪些Task所消费,需要使用OutputCollector的emitDirect方法来实现
7 Local or shuffle分组 如果目标Bolt中的一个或多个Task和当前产生数据的Task在同一个Worker进程中,那么就走内部的线程间通信,
将Tuple直接发给在当前Worker进程中的目的Task。否则,同Shuffle分组
可靠性
Spout中发射一个新的Tuple时为其制定一个MessageId,这个MessageId可以是任意的Object对象,多个Stream Tuple可以共用一个MessageId,
Storm会告知用户每一个消息单元是否在一个制定的时间内被完全处理,在Storm中,使用了Acker来解决消息处理的可靠性
消息树
不管是在spout里emit还是bolt里emit的消息,框架都会给这个消息加一个64位的随机数当做id,
其实是<root_id,randomID>这样的一个结构,即每个tuple除了上游给他创建了一个随机64位id外
还带有一个不变的root_id,来自spout task同一个消息树的ack信息都发给同一个acker bolt
jstorm使用的是取模hash算法,只需要对spout的tuple 64位id取模就行了。这样基本上可以满足上面2点要求,
因为spout tuple的id会透传给下游的全部消息树节点,因此,bolt也会正确路由到那个acker bolt
acker bolt弄了一个map来做这件事情,key就是spout tuple id即消息树的root id(64)位的,
一个root id代表一个消息树;value则是一个value对,第一个value是task id(spout),
这些都属于元数据,第二个value是一个64位的数字,这个64位的数字代表了一棵消息树的状态,
当整个value变成0了,说明,消息树被“完全处理”了,就找这个pair的第一个value,发消息就行了。
slot概念
一个节点的slot的数量用来表示某个节点的资源的容量或者说是能力的大小,因而slot是 Hadoop的资源单位
supervisor.slots.ports Storm的slot最好设置成OS核数的整数倍,同时由于storm是基于内存的实时计算,
slot数不要大于每台物理机可运行slot个数:(物理内存-虚拟内存)/单个java进程最大可占用内存数
worker.childopts storm的worker进程的java限制,有效地设置该参数能够在Topology异常时进行原因分析
-XX:+HeapDumpOnOutOfMemoryError 当内存使用量超过Xmx时,java进程将被JVM杀掉同时会生产java_pidxxx.hprof文件,便于使用MemoryAnalyzer分析内存使用情况
schedule-topology方法和DefaultScheduler的default-schedule的有一些相似的逻辑,主要根据当前的可用资源完成对Topology的任务分配,包括获得当前的可用Slot资源,计算当前
Topology所能使用的全部Slot数目、对Slot重新分配和进行排序以及得到最后的分配信息等
Storm运维监控
Ganglia是一个跨平台可扩展的、高性能计算系统下的分布式监控系统,监控Storm主机的信息
zabbix来监控 Nimbus和Supervisor进程,当发现进程挂掉后可以重启并报警
通过Storm UI来监控和调试相关应用,以Storm Metric、ZooKeeper目录以及Hook等方式帮助完成一些深入的调试和监控
鉴于Storm作为一个平台提供给不同的业务共用,进行资源隔离室必须的,开源资源隔离方案有CGoup(其他封装方案)、
YARN及StormOnYarn(可以让Storm、Hadoop、Spark等共同运行在同一套集群上)
storm文件结构
service cgconfig start
在/cgroup目录下会生成 blkio cpu cpuacct cpuset devices freezer memory net_cls子目录,每个子目录对应一个控制项,每个子目录下都会存在以下配置文件
cgroup.procs 文件内容为受控制的进程ID
notify_no_release 文件内容设置为1时,当没有可控制进程是,出发release_agent指定的内容
release_agent 文件内容为可执行文件、命令
tasks 文件内容为受控制的线程ID
内存设置memory.oom_disable为0或1,可以控制使用的内存超过限制的内存时时杀死还是进入休眠
cpu是基于CPU时间片进行的资源带哦度,cpuset是基于CPU核心进行的资源调度
service cgconfig status
Storm开发
Trident是基于Storm的高级抽象,除了提供实时流聚合、分组、过滤等功能,还提供了对数据持久化和事务性操作,保证了Tuple只能被处理一次并且不丢失
每次发送数据,Tuple被分成一组组的batch,每一个batch分配一个唯一的事务ID,batch之间的更新严格有序
分布式RPC(DRPC)用于对Storm上大量的函数调用进行并行计算过程,分布式RPC通过DRPC服务器协调接收一个RPC请求,发送请求到Storm Topology,并从Storm Topology接收结果;
通常应用分布式RPC对Trident存储的各种数据源进行并行查询
用户画像建模
比较流行成熟的SQL-ON-Hadoop是Spark SQL,Mesos Spark或者Yarn Spark