Reservoir Sampling

Reservoir Sampling refers to a family of algorithms for sampling a fixed number of elements from an input of unknown length with uniform probabilities. In this article, I aim to give an overview of Reservoir Sampling usage, implementation and testing.

When/why is Reservoir Sampling useful?

The main benefit of Reservoir Sampling is that it provides an upper bound on memory usage that is invariant of the input stream length. This allows sampling input streams which are far larger in size than the available memory of a system. It is also useful in situations where computations on a small sample are both representative of the complete population and faster/more efficient than operating on the complete input.

In general, Reservoir Sampling is well suited to situations where data arrives sequentially over time and/or is too large to fit entirely into memory.

For example:

  • Streaming data calculations.
  • Testing/verifying a system in production.
    • Checking every input would impact performance too much.
  • Estimating aggregate properties of a large dataset
    • Total number of records satisfying a predicate.
  • Type inference on semi-structured data.
    • Infer the type of each column in a large CSV file without exhaustively checking every row.

How does Reservoir Sampling work?

Though many different implementations exist (more on some of these later), they all follow the same high level approach. A reservoir (usually represented by an array) of elements is maintained whilst the input is read sequentially. New elements replace those in the reservoir with uniform probability. When the input is exhausted, the reservoir contains the final sample. For a sample size nn, number of elements read so far tt and reservoir RR:

  1. Initialise an empty reservoir of size nn.

  2. The reservoir is populated with the first nn elements of the input:

    R=(x0,x1,,xn) R = (x_0,x_1,\ldots,x_n)

  3. The subsequent ttth elements overwrite those in the reservoir with probability t/(n+1)t/(n+1). For any (t>n)(t\gt n)th element:

    P(xtεR)=t/(n+1) P(x_t \varepsilon R) = t / (n + 1)

In pseudo-code:

reservoir = []
t = 0 # Number of elements read so far
while not EOF:
  m = int(t * random()) # 1 <= m < t
  if m < n:
    # Make the next element a candidate for the sample, replacing one at random
    reservoir[m] = read_next_element()
  t += 1

Another formulation of this approach was proposed by Vitter1 (more on this work later) which calculates the gap between elements being added to the reservoir and “jumps” through the input in chunks:

reservoir = []
t = 0
while not EOF:
  to_skip = calculate_skip(n, t)
  skip(to_skip)
  if not EOF:
    # Which element to replace
    m = int(n * random()) # 1 <= m < n
    reservoir[m] = read_next_element()
  t += to_skip + 1

Assuming that to_skip is not prohibitively expensive, this approach reduces the CPU usage as it avoids generating a random number for every element in the input.

How can Reservoir Sampling be implemented?

As mentioned previously, there are many well known and well studied implementations of Reservoir Sampling - each with their own advantages and disadvantages. I’ll detail several here with the intent of providing an understanding of the basic implementation and some of the most important improvements that have been made to it. The reader is encouraged to research further into other implementations.

Note: The following sections assume uniformly random number generation.

Algorithm R

Attributed to Alan G. Waterman by Knuth2, Algorithm R represents the simplest implementation by following the basic framework outlined previously with no optimisation.

  1. Populate the reservoir with the first nn elements.
  2. For each subsequent ttth element, generate a random integer mε(0,t)m \varepsilon (0, t).
  3. If m<nm \lt n (e.g. mm is a valid index of the reservoir), replace the mmth element of the reservoir with the new one. Otherwise, discard the new element.
from itertools import islice
from random import randrange
from typing import Iterable

def algorithm_r(iterable: Iterable, n: int):
    iterable = iter(iterable)
    # Initialise reservoir with first n elements
    reservoir = list(islice(iterable, n))

    for t, x in enumerate(iterable, n + 1):
        m = randrange(t)
        if m < n:  # Make the next record a candidate, replacing one at random
            reservoir[m] = x

    return reservoir

Algorithm R runs in O(N)O(N) time because processing a single element takes constant time (dominated by generating a random number) and the entire input (NN elements) is read. The primary CPU cost of this algorithm is the random number generation.

Method 4 (Protection Reservoir)3

Suppose the entire input is available sequentially in a single file and a uniform random number u=U(0,1)u = U(0,1) is assigned to each item xx:

# u: x
0.1765: 3
0.0214: 12
0.0093: 8
0.1238: 11
0.0615: 12
0.2852: 15
0.6827: 1
0.7547: 11
0.0946: 15
0.6403: 3

Let tt be the nnth smallest of the uu-values. The items with uu-values less than or equal to tt now form a uniform random sample of size nn from the input. For n=5n=5:

0.1766: 3
0.0214: 12 <-- 2nd smallest
0.0093: 8  <-- Smallest
0.1238: 11 <-- 5th smallest (t)
0.0615: 7  <-- 3rd smallest
0.2852: 10
0.6827: 1
0.7547: 11
0.0946: 15 <- 4th smallest
0.6403: 3

Thus forming the sample (12,8,11,7,15)(12, 8, 11, 7, 15). Fan et al.3 use this idea to define Method 4:

  1. Populate the reservoir with the first nn elements.
  2. Assign each reservoir element a random number uu and store these in the uu-array.
  3. For each subsequent ttth element xx:
    1. Generate a random number uu
    2. If uu is smaller than the largest value in the uu-array, replace the largest value in the uu-array with uu and replace the corresponding element of the reservoir with xx.

This maintains the invariant that at any point through the input the reservoir contains a uniform random sample of the elements processed so far.

from itertools import islice
from random import random
from typing import Iterable

import numpy as np


def method_4(iterable: Iterable, n: int):
    iterable = iter(iterable)
    # Initialise reservoir with first n elements
    reservoir = np.array(list(islice(iterable, n)))
    # Assign random numbers to the first n elements
    u_array = np.array([random() for _ in range(n)])

    # Track the index of the largest u-value
    u_max_idx = u_array.argmax()
    for x in iterable:
        u = random()
        if u < u_array[u_max_idx]:
	    # Replace largest u-value
            u_array[u_max_idx] = u
	    # Replace corresponding reservoir element
            reservoir[u_max_idx] = x
	    # Update largest u-value
            u_max_idx = u_array.argmax()

    return reservoir

As with Algorithm R, Method 4 requires O(N)O(N) time and due to the secondary uu-value structure may require more memory. Why discuss an arguably worse algorithm? Method 4 forms the basis for another algorithm which is significantly more efficient than both that have been discussed so far.

Algorithm L (Geometric Jumps)

Algorithm L takes the idea from Method 4 and greatly improves it’s runtime to O(nlog(n)(1+log(N/n)))O(n log(n)(1+log(N/n))) using several ,in my opinion, rather genius mathematicalfrom Devroye4 and Li5. The improvement comes from following the second framework and “jumping” through the input directly to the elements which are to be added to the reservoir.

Insight 1: The size of the “jumps” between reservoir insertions follows a geometric distribution.

Let WW be the largest value in the uu-array from Method 4 and SS be the number of elements to be skipped (the “jump” size) before the next reservoir insertion. Given WW,

S=log(random())/log(1W) S = \lfloor log(random()) / log(1 - W)\rfloor

Insight 2: The uu-value of the next record to enter the reservoir has the U(0,W)U(0,W) distribution.

As stated in Method 4, a new element enters the reservoir when it’s uu-value is less than or equal to WW. Insight 2 therefore follows from the fact that uu-values are generated randomly with distribution U(0,1)U(0, 1).

Insight 3: Maintaining the uu-array is not necessary - only the variable WW is required.

After the reservoir has been populated with the first nn elements in the input, the uu-array would contain a sample of nn values from U(0,1)U(0, 1). WW, therefore, has the same distribution as the largest value in a sample of size nn from U(0,1)U(0, 1).

Subsequently, the next value, WW', has the same distribution as the largest value in a sample of size nn from U(0,W)U(0, W).

So the distribution of WW is known, but how can this be used to generate a concrete value as part of the sampling algorithm? Well, the how is actually rather simple! Li provides the following expression to generate the largest value from a sample of size nn from U(0,b)U(0, b): X=U(0,b)max(x1,...,xn)=brandom()1n=bexp(log(random())/n) X = U(0, b)\\ max(x_1,...,x_n) = b*random()^{\frac{1}{n}} = b*exp(log(random())/n)

However, the why requires some relatively involved mathematics and I (as Li did in his paper) won’t go into it here6. For now, let’s assume the Li is correct and move on to describe Algorithm L:

  1. Populate the reservoir with the first nn elements.
  2. Set W=exp(log(random())/n)W = \text{exp}(\text{log}(\text{random}())/n).
  3. Set S=log(random())/log(1W)S = \lfloor \text{log}(\text{random}()) / \text{log}(1 - W)\rfloor.
  4. Until the input is exhausted:
    1. Skip over the next SS records.
    2. Replace a random element of the reservoir with the S+1S+1th record.
    3. Set W=Wexp(log(random())/n)W = W*exp(log(random())/n)
from itertools import islice
from math import exp, floor, log
from random import random, randrange
from typing import Any, Iterable, Optional

# Sentinel value to mark End Of Iterator
EOI = object


def algorithm_l(iterable: Iterable, n: int):
    iterable = iter(iterable)
    # Initialise reservoir with first n elements
    reservoir = list(islice(iterable, n))

    w = exp(log(random()) / n)

    while True:
        s = floor(log(random()) / log(1 - w))
        next_elem = nth(iterable, s + 1, default=EOI)

        if next_elem is EOI:
            return reservoir

        reservoir[randrange(0, n)] = next_elem
        w *= exp(log(random()) / n)

    return reservoir


def nth(iterable: Iterable, n: int, default: Optional[Any] = None):
    """Returns the ``n``th item in ``iterable`` or a default value.

    Credit: https://docs.python.org/3.7/library/itertools.html#itertools-recipes
    """
    return next(islice(iterable, n, None), default)

As mentioned previously, Algorithm L has O(nlog(n)(1+log(N/n)))O(n \text{log}(n)(1 + \text{log}(N/n))) runtime. I encourage the reader to take a look through the extensive proof of this by Li in the original paper5.

How can a Reservoir Sampling implementation be practically tested for correctness?

Excellent! We know how multiple different Reservoir Sampling algorithms work and have the mathematical proofs of their correctness. But what about an actual implementation? How can we tell if the code follows the algorithm correctly and thus the proofs still hold? All Reservoir Sampling algorithms share two main correctness properties, based on the output sample:

  1. Size nn.
  2. Uniformly distributed across the input.

The first is trivial to test - run the algorithm in question over the input and check the output size is equal to nn. The second property, on the other hand, requires a different approach. As the sampling is probabilistic (through the use of randomisation) simple example-based7 tests are insufficient - each test run will produce a different answer. Statistical tests can be used instead, whereby a test asserts against some statistical property of the result as opposeopposed to it’s concrete value. The key to statistical testing is choosing which property to use.

One approach that (in my opinion) fits well with Reservoir Sampling involves using the output to calculate the Maximum Likelihood Estimator(MLE) for a parameter of the input distribution. If the estimate matches (within some tolerance) the real parameter value then we have strong evidence that the algorithm is implemented correctly.

  1. Generate input data with a known distribution and parameters.
  2. Sample the data using the algorithm under test.
  3. Calculate the MLE of a parameter of the distribution using the output from step 2.
  4. Test the MLE is within some small tolerance ϵ\epsilon of the real value.

The larger the input and sample sizes, the smaller the tolerance can be.

Assuming we have access to a method of generating the input data, the rate parameter λ\lambda of an Exponential distribution Exp(λ)\text{Exp}(\lambda) works nicely for Reservoir Sampling as all the information required for the formula to calculate the MLE for λ\lambda is already available. Given random variable X=Exp(λ)X = \text{Exp}(\lambda) and nn samples8 x1,,xnx_1, \ldots, x_n from XX, the MLE for rate is the reciprocal of the sample mean:

λ^=nixi \hat{\lambda} = \frac{n}{\sum_i x_i}

To use this in a statistical test:

  1. Choose a value for λ\lambda.
  2. Generate input data with distribution Exp(λ)\text{Exp}(\lambda).
  3. Choose a value for nn.
  4. Pass the input into a Reservoir Sampling algorithm to generate a sample x1,,xnx_1, \ldots, x_n.
  5. Calculate λ^=nixi\hat{\lambda} = \frac{n}{\sum_i x_i}.
  6. Assert λ^=λ±ϵ\hat{\lambda} = \lambda \pm \epsilon.
import pytest
from numpy.random import default_rng


def test_sample_distribution():
    # Test parameters - adjust to affect the statistical significance of the test.
    n = 1000  # Sample size
    population_size = n * 1000  # Input size.
    rate = 0.1  # Exponential distribution rate parameter.
    tolerance = 0.1  # How close to the real rate the MLE needs to be.

    # Generate exponentially distributed data with known parameters
    rng = default_rng()
    population = rng.exponential(
        1 / rate,  # numpy uses the scale parameter which is the inverse of the rate
        population_size,
    )
    population.sort()

    # Run Reservoir Sampling to generate a sample
    sample = reservoir_algorithm(population, N)

    # Calculate the MLE for the rate parameter of the population distribution
    rate_mle = len(sample) / sum(sample)

    assert pytest.approx(rate_mle, tolerance) == rate

Conclusion

Reservoir Sampling describes a common framework for probabilistic sampling algorithms that can be useful in situations where a sample from an input of unknown size is required and where the complete input will not fit into memory. The progression from the simplest implementation of Algorithm R to Algorithm L (and others not discussed here) clearly demonstrates how re-framing a problem can lead to powerful insights that make a real difference. In addition, I think this is a great example where “doing the simple thing” first leads to better results in the future - I very much doubt that Li would ever have come up with Algorithm L had Algorithm R not been proposed first.


  1. Jeffrey S. Vitter. 1985. Random sampling with a reservoir. ACM Trans. Math. Softw. 11, 1 (March 1985), 37–57. DOI:https://doi.org/10.1145/3147.3165 ↩︎

  2. Donald E. Knuth. 1997. The art of computer programming, volume 2 (3rd ed.): Seminumerical Algorithms, 144. Addison-Wesley Longman Publishing Co., Inc., USA. ↩︎

  3. FAN, C. W., MULLER, M. E., AND REZUCHA, I. Development of sampling plans by using sequential (item by item) selection techniques and digital computers, j. Am. Statist. Assoc. 57 (June 1962), 387-402. ↩︎ ↩︎

  4. Devroye, Non-Uniform Random Variate Generation. New York, NY: Springer New York, 1986. ↩︎

  5. Kim-Hung Li. 1994. Reservoir-sampling algorithms of time complexity O(n(1 + ~ log(N/n))). ACM Trans. Math. Softw. 20, 4 (Dec. 1994), 485–486. DOI:https://doi.org/10.1145/198429.198435 ↩︎ ↩︎

  6. But I may write up a separate article for it! ↩︎

  7. Given some input xx, the algorithm always produces some output yy↩︎

  8. Assuming IID↩︎


Feedback

Contact me at ben@steadbytes.com or via Twitter @SteadBytes for feedback, questions, etc.