1.并行度
在direct方式下,sparkStreaming的task数量是等于kafka的分区数,kakfa单个分区的一般吞吐量为10M/s
常规设计下:kafka的分区数一般为broken节点的3,6,9倍比较合理
比如我的集群有6个broken节点,创建kafka的分区为18个,sparkStreaming的task也为18个,当然也可以适当放大分区,根据自己的数据量来合理规划集群及分区数
2.序列化
java的序列化很沉重,会序列化好多无关的(时间长)
举例:100000个简单的对象,序列化时间对比
java原生序列化时间:8637 ms
java原生反序列化时间:5851 ms
Kryo 序列化时间:455 ms
Kryo 反序列化时间:207 ms
对对象进行序列化注册
sparkConf.registerKryoClasses( Array( classOf[OrderInfo], classOf[Opt_alliance_business], classOf[DriverInfo], classOf[RegisterUsers] , classOf[Reservation] )
3.限流与背压
不开启背压:每秒钟从kafka每一个分区拉取的数据量是无限制--》刚启动程序时kafka堆积的数大量据都会直接被短时间进行消费,消费不及时,可能会发生内存溢出
开启限流:spark.streaming.kafka.maxRatePerPartition
开启背压:流启动之后 --》checkpoint --》metastore 流信息无法更改
举例:
sparkConf.set("spark.streaming.backpressure.initialRate" , "500") 初始速率500条/s
sparkConf.set("spark.streaming.backpressure.enabled" , "true") 开启压背
sparkConf.set("spark.streaming.kafka.maxRatePerPartition" , "5000") 最大速度不超过5000条
4.cpu空转流 -->task 如果task没拿到数据,也会进行资源消耗
spark.locality.wait 3s
5.不要在代码中判断这个表是否存在不要在实时代码里面判断表是否存在,耗时
6、推测执行
推测执行:就是把执行失败task的转移到另一个executor
场景:task失败造成重试(task启动、压缩、序列化),如果每次task执行3秒失败重试8次需要消耗24秒
sparkConf.set("spark.speculation.interval", "300") 推测执行间隔
sparkConf.set("spark.speculation.quantile","0.9") 推测执行完成的task的阈值
7.关于某个task的执行的任务运行两个小时都运行不完
场景:yarn日志报错:shuffle落地文件找不到、shuffle文件打不开 也会造成task失败 ,spark 4105 shuffle fetch 错误
原因:shuffle1 过程 writeshuffle=》落地(默认lz4)=》readshuffle,写的汇聚shuffle文件被下游的节点打不开或者读取不到,可能是压缩的原因,压缩文件打不开
spark4105错误地址:https://issues.apache.org/jira/browse/SPARK-4105
解决:开启推测执行 =》转移任务,关闭shuffle压缩设置(也就是增加了节点直接传输的文件大小,加大了IO),重新跑数据
8.hashshuffle与sortshuffle
https://www.jianshu.com/p/fafef67c203c
------------恢复内容结束------------