Apache Flink中实现的两种水塘抽样算法

因为Flink和我毕设有关, 所以这几天一直在看Flink的源码. Flink中实现了两种水塘抽样算法:

  1. Reservoir Sampler With Replacement https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithReplacement.html
  2. Reservoir Sampler Without Replacement https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/java/sampling/ReservoirSamplerWithoutReplacement.html

对应的paper是:

  1. Optimal Random Sampling from Distributed Streams Revisited http://researcher.watson.ibm.com/researcher/files/us-dpwoodru/tw11.pdf
  2. Random Sampling with a Reservoir http://www.cs.umd.edu/~samir/498/vitter.pdf

两种抽样的区别,(假设, k是已知的抽样数量):

  1. With Replacement是先取第一个样品, 然后做k个’复制'(这里的复制是对象是样品的value, 每个复制都有一个weight, 这个weight是随机生成的, 在Flink中, 用的是XOrShift随机数), 装满 大小为k的PriorityQueue中, 然后每次取下一个样品, 都做k个’复制’,然后如果’复制’的样品的weight大于当前PriorityQueue中, 就Remove PriorityQueue中队列顶端元素, 然后把当前’复制’放入PriorityQueue.
  2. Without Replacement是现取k个样品, 存入大小为k的PriorityQueue中, 每次取新的样品时, 算一个随机数, 如果这个随机数比当前PriorityQueue的队列顶端元素的weight值大,就Remove PriorityQueue中队列顶端元素, 然后放入当前元素.

值得一提的是, Flink中, 虽然在partition过程中实现了以上两种算法, 但是在reduce的过程, 并没有区别, 全部都用的是[2]算法, 这里我想,也许是为了保证reduce的速度, 减少随机数的计算时间, 而做的改变. 详情请见: https://github.com/eBay/Flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/sampling/DistributedRandomSampler.java