spark2-submit --class SparkKafka --master yarn --executor-memory 1G --num-executors 6 --driver-memory 1g --conf spark.driver.supervise=true --conf spark.dynamicAllocation.maxExecutors=6 --conf spark.streaming.kafka.maxRatePerPartition=100 recommend-1.0-SNAPSHOT.jar
主要原因是spark.dynamicAllocation.maxExecutors这个配置,
在CDH中,默认开启了动态资源占用,即资源如果空余时,SparkStreaming会自动按照并发度(并行的block数)来占用资源,而spark-streaming作为一个实时处理系统,在大多数时候是不需要太多资源的。
为了限制spark streaming最多分配的executor数,可以配置spark.dynamicAllocation.maxExecutors为动态资源分配的上限。num-executors其实是资源初始化时所取的值,所以其实还是有用的。
这里要注意的是开源是默认没有开启动态资源占用的,可以通过spark.dynamicAllocation.enabled=true这一配置来开启,如果配置了这一项,同时还需要开启external-shuffle-service,保证在动态回收不再工作的executor的时候不会中断在executor上的shuffle过程spark.shuffle.service.enabled=true。
本配置调试的Spark版本为2.3.1