1.spark rdd为什么不能嵌套?
譬如 val rdd1=sc.parallel(range(1,100))
val rdd2=sc.parallel(range(1,100))
rdd1.map(x=>rdd.count())
因为rdd的构造器中rdd(@trancient sc:SparkContext),这个sc是不可序列化的,而rdd的map之类的操作,需要把参数序列化,
这样就会出问题,sc就成了null,会报空值异常.
为什么sc要设置为不要序列化? 因为sc本身就不能序列化,没有继承serializble接口.
scala中如何使用正则进行抽取字符串中想要的内容?使用模式匹配,例如:
val regex="localhost[(.*)]".r
val master="localhost[4]"
master match{
case regex(threads) => converttoInt(threads)
case _=> println("not found")
}
2.spark rdd 中的partitioner有什么用?
确定数据是如何被划分到partitions里面的,譬如说rdd.repartition就使用了这个类.
rdd内容是1,2,3,4,5,设置了2个分区,每一个分区的数据会是怎么样的?
查看代码你会发现是会分成1,2一组,3,4,5一组
那rdd的partitioner有什么作用呢?
你调用rdd.repartition的时候,数据就根据这个进行重新分组了.
还有如果是转换成新的rdd出现了shuffle的时候就使用这个partitioner.
如果你想把数据重新分组,分成大于3的一组,其他的为一组,如何做?
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.HashPartitioner object Test extends App { //在windows平台上调试运行,需要设置hadoop的home System.setProperty("hadoop.home.dir", "E:\app\hadoop"); val sparkConf = new SparkConf() sparkConf.setMaster("local[2]").setAppName(Test.getClass.toString().dropRight(1)) val sc = new SparkContext(sparkConf) val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5), 2) .map(i => if(i>3) (1,i) else (0,i)) //.repartition(2) .partitionBy(new HashPartitioner(2)) .map( {case (a,b)=>b }) println(rdd.toDebugString) rdd.foreachPartition { p => p.foreach { l => println(l) } println(p.hashCode()) } sc.stop() }