Reservoir Sampling refers to a family of algorithms for sampling a fixed number of elements from an input of unknown length with uniform probabilities. In this article, I aim to give an overview of Reservoir Sampling usage, implementation and testing.
When/why is Reservoir Sampling useful?
The main benefit of Reservoir Sampling is that it provides an upper bound on memory usage that is invariant of the input stream length. This allows sampling input streams which are far larger in size than the available memory of a system. It is also useful in situations where computations on a small sample are both representative of the complete population and faster/more efficient than operating on the complete input.
In general, Reservoir Sampling is well suited to situations where data arrives sequentially over time and/or is too large to fit entirely into memory.
For example:
- Streaming data calculations.
- Testing/verifying a system in production.
- Checking every input would impact performance too much.
- Estimating aggregate properties of a large dataset
- Total number of records satisfying a predicate.
- Type inference on semi-structured data.
- Infer the type of each column in a large CSV file without exhaustively checking every row.
How does Reservoir Sampling work?
Though many different implementations exist (more on some of these later), they all follow the same high level approach. A reservoir (usually represented by an array) of elements is maintained whilst the input is read sequentially. New elements replace those in the reservoir with uniform probability. When the input is exhausted, the reservoir contains the final sample. For a sample size , number of elements read so far and reservoir :
-
Initialise an empty reservoir of size .
-
The reservoir is populated with the first elements of the input:
-
The subsequent th elements overwrite those in the reservoir with probability . For any th element:
In pseudo-code:
reservoir = []
t = 0 # Number of elements read so far
while not EOF:
m = int(t * random()) # 1 <= m < t
if m < n:
# Make the next element a candidate for the sample, replacing one at random
reservoir[m] = read_next_element()
t += 1
Another formulation of this approach was proposed by Vitter1 (more on this work later) which calculates the gap between elements being added to the reservoir and “jumps” through the input in chunks:
reservoir = []
t = 0
while not EOF:
to_skip = calculate_skip(n, t)
skip(to_skip)
if not EOF:
# Which element to replace
m = int(n * random()) # 1 <= m < n
reservoir[m] = read_next_element()
t += to_skip + 1
Assuming that to_skip
is not prohibitively expensive, this approach reduces the CPU
usage as it avoids generating a random number for every element in the input.
How can Reservoir Sampling be implemented?
As mentioned previously, there are many well known and well studied implementations of Reservoir Sampling - each with their own advantages and disadvantages. I’ll detail several here with the intent of providing an understanding of the basic implementation and some of the most important improvements that have been made to it. The reader is encouraged to research further into other implementations.
Note: The following sections assume uniformly random number generation.
Algorithm R
Attributed to Alan G. Waterman by Knuth2, Algorithm R represents the simplest implementation by following the basic framework outlined previously with no optimisation.
- Populate the reservoir with the first elements.
- For each subsequent th element, generate a random integer .
- If (e.g. is a valid index of the reservoir), replace the th element of the reservoir with the new one. Otherwise, discard the new element.
from itertools import islice
from random import randrange
from typing import Iterable
def algorithm_r(iterable: Iterable, n: int):
iterable = iter(iterable)
# Initialise reservoir with first n elements
reservoir = list(islice(iterable, n))
for t, x in enumerate(iterable, n + 1):
m = randrange(t)
if m < n: # Make the next record a candidate, replacing one at random
reservoir[m] = x
return reservoir
Algorithm R runs in time because processing a single element takes constant time (dominated by generating a random number) and the entire input ( elements) is read. The primary CPU cost of this algorithm is the random number generation.
Method 4 (Protection Reservoir)3
Suppose the entire input is available sequentially in a single file and a uniform random number is assigned to each item :
# u: x
0.1765: 3
0.0214: 12
0.0093: 8
0.1238: 11
0.0615: 12
0.2852: 15
0.6827: 1
0.7547: 11
0.0946: 15
0.6403: 3
Let be the th smallest of the -values. The items with -values less than or equal to now form a uniform random sample of size from the input. For :
0.1766: 3
0.0214: 12 <-- 2nd smallest
0.0093: 8 <-- Smallest
0.1238: 11 <-- 5th smallest (t)
0.0615: 7 <-- 3rd smallest
0.2852: 10
0.6827: 1
0.7547: 11
0.0946: 15 <- 4th smallest
0.6403: 3
Thus forming the sample . Fan et al.3 use this idea to define Method 4:
- Populate the reservoir with the first elements.
- Assign each reservoir element a random number and store these in the -array.
- For each subsequent th element :
- Generate a random number
- If is smaller than the largest value in the -array, replace the largest value in the -array with and replace the corresponding element of the reservoir with .
This maintains the invariant that at any point through the input the reservoir contains a uniform random sample of the elements processed so far.
from itertools import islice
from random import random
from typing import Iterable
import numpy as np
def method_4(iterable: Iterable, n: int):
iterable = iter(iterable)
# Initialise reservoir with first n elements
reservoir = np.array(list(islice(iterable, n)))
# Assign random numbers to the first n elements
u_array = np.array([random() for _ in range(n)])
# Track the index of the largest u-value
u_max_idx = u_array.argmax()
for x in iterable:
u = random()
if u < u_array[u_max_idx]:
# Replace largest u-value
u_array[u_max_idx] = u
# Replace corresponding reservoir element
reservoir[u_max_idx] = x
# Update largest u-value
u_max_idx = u_array.argmax()
return reservoir
As with Algorithm R, Method 4 requires time and due to the secondary -value structure may require more memory. Why discuss an arguably worse algorithm? Method 4 forms the basis for another algorithm which is significantly more efficient than both that have been discussed so far.
Algorithm L (Geometric Jumps)
Algorithm L takes the idea from Method 4 and greatly improves it’s runtime to using several ,in my opinion, rather genius mathematicalfrom Devroye4 and Li5. The improvement comes from following the second framework and “jumping” through the input directly to the elements which are to be added to the reservoir.
Insight 1: The size of the “jumps” between reservoir insertions follows a geometric distribution.
Let be the largest value in the -array from Method 4 and be the number of elements to be skipped (the “jump” size) before the next reservoir insertion. Given ,
Insight 2: The -value of the next record to enter the reservoir has the distribution.
As stated in Method 4, a new element enters the reservoir when it’s -value is less than or equal to . Insight 2 therefore follows from the fact that -values are generated randomly with distribution .
Insight 3: Maintaining the -array is not necessary - only the variable is required.
After the reservoir has been populated with the first elements in the input, the -array would contain a sample of values from . , therefore, has the same distribution as the largest value in a sample of size from .
Subsequently, the next value, , has the same distribution as the largest value in a sample of size from .
So the distribution of is known, but how can this be used to generate a concrete value as part of the sampling algorithm? Well, the how is actually rather simple! Li provides the following expression to generate the largest value from a sample of size from :
However, the why requires some relatively involved mathematics and I (as Li did in his paper) won’t go into it here6. For now, let’s assume the Li is correct and move on to describe Algorithm L:
- Populate the reservoir with the first elements.
- Set .
- Set .
- Until the input is exhausted:
- Skip over the next records.
- Replace a random element of the reservoir with the th record.
- Set
from itertools import islice
from math import exp, floor, log
from random import random, randrange
from typing import Any, Iterable, Optional
# Sentinel value to mark End Of Iterator
EOI = object
def algorithm_l(iterable: Iterable, n: int):
iterable = iter(iterable)
# Initialise reservoir with first n elements
reservoir = list(islice(iterable, n))
w = exp(log(random()) / n)
while True:
s = floor(log(random()) / log(1 - w))
next_elem = nth(iterable, s + 1, default=EOI)
if next_elem is EOI:
return reservoir
reservoir[randrange(0, n)] = next_elem
w *= exp(log(random()) / n)
return reservoir
def nth(iterable: Iterable, n: int, default: Optional[Any] = None):
"""Returns the ``n``th item in ``iterable`` or a default value.
Credit: https://docs.python.org/3.7/library/itertools.html#itertools-recipes
"""
return next(islice(iterable, n, None), default)
As mentioned previously, Algorithm L has runtime. I encourage the reader to take a look through the extensive proof of this by Li in the original paper5.
How can a Reservoir Sampling implementation be practically tested for correctness?
Excellent! We know how multiple different Reservoir Sampling algorithms work and have the mathematical proofs of their correctness. But what about an actual implementation? How can we tell if the code follows the algorithm correctly and thus the proofs still hold? All Reservoir Sampling algorithms share two main correctness properties, based on the output sample:
- Size .
- Uniformly distributed across the input.
The first is trivial to test - run the algorithm in question over the input and check the output size is equal to . The second property, on the other hand, requires a different approach. As the sampling is probabilistic (through the use of randomisation) simple example-based7 tests are insufficient - each test run will produce a different answer. Statistical tests can be used instead, whereby a test asserts against some statistical property of the result as opposeopposed to it’s concrete value. The key to statistical testing is choosing which property to use.
One approach that (in my opinion) fits well with Reservoir Sampling involves using the output to calculate the Maximum Likelihood Estimator(MLE) for a parameter of the input distribution. If the estimate matches (within some tolerance) the real parameter value then we have strong evidence that the algorithm is implemented correctly.
- Generate input data with a known distribution and parameters.
- Sample the data using the algorithm under test.
- Calculate the MLE of a parameter of the distribution using the output from step 2.
- Test the MLE is within some small tolerance of the real value.
The larger the input and sample sizes, the smaller the tolerance can be.
Assuming we have access to a method of generating the input data, the rate parameter of an Exponential distribution works nicely for Reservoir Sampling as all the information required for the formula to calculate the MLE for is already available. Given random variable and samples8 from , the MLE for rate is the reciprocal of the sample mean:
To use this in a statistical test:
- Choose a value for .
- Generate input data with distribution .
- Choose a value for .
- Pass the input into a Reservoir Sampling algorithm to generate a sample .
- Calculate .
- Assert .
import pytest
from numpy.random import default_rng
def test_sample_distribution():
# Test parameters - adjust to affect the statistical significance of the test.
n = 1000 # Sample size
population_size = n * 1000 # Input size.
rate = 0.1 # Exponential distribution rate parameter.
tolerance = 0.1 # How close to the real rate the MLE needs to be.
# Generate exponentially distributed data with known parameters
rng = default_rng()
population = rng.exponential(
1 / rate, # numpy uses the scale parameter which is the inverse of the rate
population_size,
)
population.sort()
# Run Reservoir Sampling to generate a sample
sample = reservoir_algorithm(population, N)
# Calculate the MLE for the rate parameter of the population distribution
rate_mle = len(sample) / sum(sample)
assert pytest.approx(rate_mle, tolerance) == rate
Conclusion
Reservoir Sampling describes a common framework for probabilistic sampling algorithms that can be useful in situations where a sample from an input of unknown size is required and where the complete input will not fit into memory. The progression from the simplest implementation of Algorithm R to Algorithm L (and others not discussed here) clearly demonstrates how re-framing a problem can lead to powerful insights that make a real difference. In addition, I think this is a great example where “doing the simple thing” first leads to better results in the future - I very much doubt that Li would ever have come up with Algorithm L had Algorithm R not been proposed first.
-
Jeffrey S. Vitter. 1985. Random sampling with a reservoir. ACM Trans. Math. Softw. 11, 1 (March 1985), 37–57. DOI:https://doi.org/10.1145/3147.3165 ↩︎
-
Donald E. Knuth. 1997. The art of computer programming, volume 2 (3rd ed.): Seminumerical Algorithms, 144. Addison-Wesley Longman Publishing Co., Inc., USA. ↩︎
-
FAN, C. W., MULLER, M. E., AND REZUCHA, I. Development of sampling plans by using sequential (item by item) selection techniques and digital computers, j. Am. Statist. Assoc. 57 (June 1962), 387-402. ↩︎ ↩︎
-
Devroye, Non-Uniform Random Variate Generation. New York, NY: Springer New York, 1986. ↩︎
-
Kim-Hung Li. 1994. Reservoir-sampling algorithms of time complexity O(n(1 + ~ log(N/n))). ACM Trans. Math. Softw. 20, 4 (Dec. 1994), 485–486. DOI:https://doi.org/10.1145/198429.198435 ↩︎ ↩︎
-
But I may write up a separate article for it! ↩︎
-
Given some input , the algorithm always produces some output . ↩︎