摘要
1.适用场合
2.算法简介
3.代码例子
4.Spark RangePartitioner 中的应用(待补充)
内容
1.适用场合:从包含n个项目的集合S中选取k个样本,其中n为一很大或未知的数量,尤其适用于不能把所有n个项目都存放到主内存的情况。
2.算法简介:
- 从S中抽取首k项放入「水塘」中
- 对于每一个S[i]项(i ≥ k)
随机产生一个范围0到i的整数r
若r < k 则把水塘中的第r项换成S[i]项
深入:papers
3.代码例子:

4.Spark RangePartitioner 中的应用( 对应的类名SamplingUtils )
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
: (Array[T], Long) = {
val reservoir = new Array[T](k)
// Put the first k elements in the reservoir.
var i = 0
while (i < k && input.hasNext) {
val item = input.next()
reservoir(i) = item
i += 1
}
// If we have consumed all the elements, return them. Otherwise do the replacement.
if (i < k) {
// If input size < k, trim the array to return only an array of input size.
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
} else {
// If input size > k, continue the sampling process.
// 随机替换蓄水池中第0到l的一个数
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {
reservoir(replacementIndex.toInt) = item
}
l += 1
}
(reservoir, l)
}
}