325 lines
13 KiB
C++
325 lines
13 KiB
C++
|
/*
|
||
|
* Copyright 2021 Google LLC
|
||
|
*
|
||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
* you may not use this file except in compliance with the License.
|
||
|
* You may obtain a copy of the License at
|
||
|
*
|
||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||
|
*
|
||
|
* Unless required by applicable law or agreed to in writing, software
|
||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
* See the License for the specific language governing permissions and
|
||
|
* limitations under the License.
|
||
|
*/
|
||
|
#include "compactor_stack.h"
|
||
|
|
||
|
#include <cmath>
|
||
|
#include <cstdint>
|
||
|
#include <type_traits>
|
||
|
#include <vector>
|
||
|
|
||
|
#include "gmock/gmock.h"
|
||
|
#include "random_generator.h"
|
||
|
|
||
|
namespace dist_proc {
|
||
|
namespace aggregation {
|
||
|
namespace internal {
|
||
|
|
||
|
namespace {
|
||
|
|
||
|
using ::testing::AnyOf;
|
||
|
using ::testing::Contains;
|
||
|
|
||
|
struct CompactorsTestParams {
|
||
|
int num_items;
|
||
|
int capacity;
|
||
|
};
|
||
|
|
||
|
std::vector<CompactorsTestParams> GenCompactorsTestParams() {
|
||
|
std::vector<CompactorsTestParams> testcases;
|
||
|
for (int num_items = 1; num_items < 1000; num_items += std::ceil(std::log(1 + num_items))) {
|
||
|
for (int capacity = 2; capacity < 1030; capacity *= 2) {
|
||
|
testcases.push_back({num_items, capacity});
|
||
|
}
|
||
|
}
|
||
|
return testcases;
|
||
|
}
|
||
|
|
||
|
class AddsToCompactorsTest : public ::testing::TestWithParam<CompactorsTestParams> {
|
||
|
protected:
|
||
|
AddsToCompactorsTest() : random_() {
|
||
|
}
|
||
|
MTRandomGenerator random_;
|
||
|
};
|
||
|
|
||
|
TEST_P(AddsToCompactorsTest, AddsToCompactorsTestWeightOne) {
|
||
|
// This test should not depend on the seed, since we only test
|
||
|
// num_stored_items, capacity and sampler weight.
|
||
|
std::random_device rd;
|
||
|
std::mt19937 gen(rd());
|
||
|
std::uniform_int_distribution<uint64_t> dis(0, std::numeric_limits<uint64_t>::max());
|
||
|
const CompactorsTestParams params = GetParam();
|
||
|
CompactorStack compactor_stack(1000, 100000, &random_);
|
||
|
KllSampler sampler(&compactor_stack);
|
||
|
while (sampler.capacity() < params.capacity) {
|
||
|
sampler.DoubleCapacity();
|
||
|
}
|
||
|
for (int i = 0; i < params.num_items; i++) {
|
||
|
sampler.Add(dis(gen));
|
||
|
}
|
||
|
EXPECT_EQ(compactor_stack.num_stored_items(), params.num_items / sampler.capacity());
|
||
|
if (sampler.sampled_item_and_weight().has_value()) {
|
||
|
EXPECT_EQ(sampler.sampled_item_and_weight().value().second,
|
||
|
params.num_items % sampler.capacity());
|
||
|
} else {
|
||
|
EXPECT_EQ(0, params.num_items % sampler.capacity());
|
||
|
}
|
||
|
}
|
||
|
|
||
|
TEST_P(AddsToCompactorsTest, AddWithWeightToCompactorsTest) {
|
||
|
// This test should not depend on the seed, since we only test
|
||
|
// num_stored_items, capacity and sampler weight.
|
||
|
std::random_device rd;
|
||
|
std::mt19937 gen(rd());
|
||
|
std::uniform_int_distribution<uint64_t> dis(0, std::numeric_limits<uint64_t>::max());
|
||
|
const CompactorsTestParams params = GetParam();
|
||
|
CompactorStack compactor_stack(1000, 100000, &random_);
|
||
|
KllSampler sampler(&compactor_stack);
|
||
|
while (sampler.capacity() < params.capacity) {
|
||
|
sampler.DoubleCapacity();
|
||
|
}
|
||
|
int remaining_num_items = params.num_items;
|
||
|
while (remaining_num_items > 0) {
|
||
|
// Add +1 to weight to avoid adding values with 0 weight.
|
||
|
int weight = random_.UnbiasedUniform(remaining_num_items) + 1;
|
||
|
sampler.AddWithWeight(dis(gen), weight);
|
||
|
remaining_num_items -= weight;
|
||
|
}
|
||
|
EXPECT_EQ(compactor_stack.num_stored_items(), params.num_items / sampler.capacity());
|
||
|
if (sampler.sampled_item_and_weight().has_value()) {
|
||
|
EXPECT_EQ(sampler.sampled_item_and_weight().value().second,
|
||
|
params.num_items % sampler.capacity());
|
||
|
} else {
|
||
|
EXPECT_EQ(0, params.num_items % sampler.capacity());
|
||
|
}
|
||
|
}
|
||
|
|
||
|
INSTANTIATE_TEST_SUITE_P(AddsToCompactorsTestCases, AddsToCompactorsTest,
|
||
|
::testing::ValuesIn(GenCompactorsTestParams()));
|
||
|
class KllQuantileUseSamplerTest : public ::testing::Test {
|
||
|
protected:
|
||
|
KllQuantileUseSamplerTest() {
|
||
|
}
|
||
|
~KllQuantileUseSamplerTest() override {
|
||
|
}
|
||
|
MTRandomGenerator random_;
|
||
|
};
|
||
|
|
||
|
TEST_F(KllQuantileUseSamplerTest, ZeroCapacityAfterReplacedWithSampler) {
|
||
|
CompactorStack compactor_stack(10, 10, &random_);
|
||
|
for (int i = 0; i < 200000; i++) {
|
||
|
compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
|
||
|
}
|
||
|
EXPECT_TRUE(compactor_stack.IsSamplerOn());
|
||
|
const auto& compactors = compactor_stack.compactors();
|
||
|
for (int i = 0; i < compactor_stack.lowest_active_level(); i++) {
|
||
|
EXPECT_EQ(compactors[i].capacity(), 0u);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
TEST_F(KllQuantileUseSamplerTest, NumStoredItemsWithSampledItem) {
|
||
|
CompactorStack compactor_stack(10, 10, &random_);
|
||
|
for (int i = 0; i < 2000; i++) {
|
||
|
compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
|
||
|
}
|
||
|
EXPECT_TRUE(compactor_stack.IsSamplerOn());
|
||
|
|
||
|
int num_items_in_compactors = 0;
|
||
|
for (const auto& compactor : compactor_stack.compactors()) {
|
||
|
if (compactor.capacity() > 0) {
|
||
|
num_items_in_compactors += compactor.size();
|
||
|
}
|
||
|
}
|
||
|
if (compactor_stack.sampled_item_and_weight().has_value()) {
|
||
|
EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors + 1);
|
||
|
} else {
|
||
|
EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors);
|
||
|
compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
|
||
|
EXPECT_EQ(compactor_stack.num_stored_items(), num_items_in_compactors + 1);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
TEST_F(KllQuantileUseSamplerTest, ResetWithSampler) {
|
||
|
// Set a fixed seed for this test, as it is not given that there are 40 items
|
||
|
// in the compactor stack after 2000 insertions.
|
||
|
MTRandomGenerator random(10);
|
||
|
CompactorStack compactor_stack(10, 10, &random);
|
||
|
for (int i = 0; i < 2000; i++) {
|
||
|
compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
|
||
|
}
|
||
|
EXPECT_TRUE(compactor_stack.IsSamplerOn());
|
||
|
EXPECT_GE(compactor_stack.num_stored_items(), 40);
|
||
|
EXPECT_LE(compactor_stack.num_stored_items(), 2000);
|
||
|
EXPECT_GE(compactor_stack.compactors().size(), 2u);
|
||
|
|
||
|
compactor_stack.Reset();
|
||
|
EXPECT_FALSE(compactor_stack.IsSamplerOn());
|
||
|
EXPECT_FALSE(compactor_stack.sampled_item_and_weight().has_value());
|
||
|
EXPECT_EQ(compactor_stack.num_stored_items(), 0);
|
||
|
EXPECT_EQ(compactor_stack.compactors().size(), 1u);
|
||
|
|
||
|
for (int i = 0; i < 2000; i++) {
|
||
|
compactor_stack.Add(random_.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
|
||
|
}
|
||
|
EXPECT_TRUE(compactor_stack.IsSamplerOn());
|
||
|
EXPECT_GE(compactor_stack.num_stored_items(), 40);
|
||
|
EXPECT_LE(compactor_stack.num_stored_items(), 2000);
|
||
|
EXPECT_GE(compactor_stack.compactors().size(), 2u);
|
||
|
}
|
||
|
|
||
|
struct AddWithSamplerParam {
|
||
|
int64_t inv_eps;
|
||
|
int64_t inv_delta;
|
||
|
int64_t num_items;
|
||
|
};
|
||
|
|
||
|
class AddWithSamplerTest : public ::testing::TestWithParam<AddWithSamplerParam> {};
|
||
|
|
||
|
TEST_P(AddWithSamplerTest, AddWithSampler) {
|
||
|
const AddWithSamplerParam params = GetParam();
|
||
|
std::random_device rd;
|
||
|
MTRandomGenerator random(rd());
|
||
|
CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
|
||
|
|
||
|
int upbound = (random.UnbiasedUniform(3) + 1) * params.num_items + 1;
|
||
|
for (int i = 0; i < upbound; i++) {
|
||
|
compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
|
||
|
}
|
||
|
EXPECT_TRUE(compactor_stack.IsSamplerOn());
|
||
|
|
||
|
int lowest_active_level = compactor_stack.lowest_active_level();
|
||
|
int size_before_add = compactor_stack.compactors()[lowest_active_level].size();
|
||
|
uint64_t previous_sampled_item = -1;
|
||
|
if (compactor_stack.sampled_item_and_weight().has_value()) {
|
||
|
previous_sampled_item = compactor_stack.sampled_item_and_weight()->first;
|
||
|
}
|
||
|
// Add additional value sufficiently many times to complete full sampler
|
||
|
// cycle.
|
||
|
for (int i = 0; i < compactor_stack.sampler_capacity(); i++) {
|
||
|
compactor_stack.Add(10);
|
||
|
}
|
||
|
|
||
|
int size_after_add = compactor_stack.compactors()[lowest_active_level].size();
|
||
|
if (size_after_add > 0) {
|
||
|
EXPECT_EQ(size_after_add, size_before_add + 1);
|
||
|
EXPECT_THAT(compactor_stack.compactors()[lowest_active_level],
|
||
|
AnyOf(Contains(10), Contains(previous_sampled_item)));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
TEST_P(AddWithSamplerTest, AddWithWeightWithSampler) {
|
||
|
// Set a fixed seed, since the tests depends on which level is propagated
|
||
|
// in the compactor stack.
|
||
|
uint64_t seed = 3;
|
||
|
const AddWithSamplerParam params = GetParam();
|
||
|
MTRandomGenerator random = MTRandomGenerator(seed);
|
||
|
CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
|
||
|
|
||
|
for (int i = 0; i < params.num_items; i++) {
|
||
|
compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint32_t>::max()));
|
||
|
}
|
||
|
ASSERT_TRUE(compactor_stack.IsSamplerOn());
|
||
|
if (!compactor_stack.sampled_item_and_weight().has_value()) {
|
||
|
compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
|
||
|
}
|
||
|
int lowest_active_level = compactor_stack.lowest_active_level();
|
||
|
int size_before_add_lowest_level = compactor_stack.compactors()[lowest_active_level].size();
|
||
|
int size_before_add_level_plus2 = compactor_stack.compactors()[lowest_active_level + 2].size();
|
||
|
int size_before_add_level_plus3 = compactor_stack.compactors()[lowest_active_level + 3].size();
|
||
|
uint64_t previous_sampled_item = -1;
|
||
|
if (compactor_stack.sampled_item_and_weight().has_value()) {
|
||
|
previous_sampled_item = compactor_stack.sampled_item_and_weight()->first;
|
||
|
}
|
||
|
|
||
|
// Expected additions: one item from the sampler to the lowest active level;
|
||
|
// one item to the lowest active level + 2.
|
||
|
int weight = 5 * (1 << lowest_active_level) - 1;
|
||
|
compactor_stack.AddWithWeight(10.0, weight);
|
||
|
|
||
|
int size_after_add_lowest_level = compactor_stack.compactors()[lowest_active_level].size();
|
||
|
// > 0 to make sure we are not checking compact case
|
||
|
if (size_after_add_lowest_level > 0) {
|
||
|
EXPECT_EQ(size_after_add_lowest_level, size_before_add_lowest_level + 1);
|
||
|
EXPECT_THAT(compactor_stack.compactors()[lowest_active_level],
|
||
|
AnyOf(Contains(10.0), Contains(previous_sampled_item)));
|
||
|
}
|
||
|
|
||
|
// Level+2 is expected to change.
|
||
|
// Exact behavior also subject to seed - a lower level might be compacted
|
||
|
// into this one, or this one might be compacted up.
|
||
|
// Test only checks the uncompacted case. (Different from Java version)
|
||
|
int size_after_add_level_plus2 = compactor_stack.compactors()[lowest_active_level + 2].size();
|
||
|
if (size_after_add_level_plus2 >
|
||
|
// make sure we are not checking comact case
|
||
|
size_before_add_level_plus2) {
|
||
|
EXPECT_EQ(size_after_add_level_plus2, size_before_add_level_plus2 + 1);
|
||
|
EXPECT_THAT(compactor_stack.compactors()[lowest_active_level + 2], Contains(10.0));
|
||
|
int size_after_add_level_plus3 =
|
||
|
compactor_stack.compactors()[lowest_active_level + 3].size();
|
||
|
EXPECT_EQ(size_after_add_level_plus3, size_before_add_level_plus3);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
TEST_P(AddWithSamplerTest, CompactorsStillBigXorSamplerIsOn) {
|
||
|
std::random_device rd;
|
||
|
uint64_t seed = rd();
|
||
|
const AddWithSamplerParam params = GetParam();
|
||
|
MTRandomGenerator random = MTRandomGenerator(seed);
|
||
|
CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
|
||
|
for (int i = 0; i < params.num_items; i++) {
|
||
|
compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
|
||
|
int lowest_compactor_capacity = compactor_stack.TargetCapacityAtLevel(0);
|
||
|
EXPECT_TRUE((lowest_compactor_capacity > 2) ^ (compactor_stack.IsSamplerOn()));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
TEST_P(AddWithSamplerTest, SampledItemIsPresent) {
|
||
|
std::random_device rd;
|
||
|
uint64_t seed = rd();
|
||
|
const AddWithSamplerParam params = GetParam();
|
||
|
MTRandomGenerator random = MTRandomGenerator(seed);
|
||
|
CompactorStack compactor_stack(params.inv_eps, params.inv_delta, &random);
|
||
|
|
||
|
for (int i = 0; i < params.num_items; i++) {
|
||
|
compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
|
||
|
}
|
||
|
int sampled_item_weight = (compactor_stack.sampled_item_and_weight().has_value())
|
||
|
? compactor_stack.sampled_item_and_weight().value().second
|
||
|
: 0;
|
||
|
compactor_stack.Add(random.UnbiasedUniform(std::numeric_limits<uint64_t>::max()));
|
||
|
int sampled_item_weight_after_add =
|
||
|
(compactor_stack.sampled_item_and_weight().has_value())
|
||
|
? compactor_stack.sampled_item_and_weight().value().second
|
||
|
: 0;
|
||
|
sampled_item_weight = std::max(sampled_item_weight, sampled_item_weight_after_add);
|
||
|
EXPECT_GT(sampled_item_weight, 0);
|
||
|
EXPECT_LE(sampled_item_weight, (1 << compactor_stack.lowest_active_level()));
|
||
|
}
|
||
|
|
||
|
INSTANTIATE_TEST_SUITE_P(AddWithSamplerTestCases, AddWithSamplerTest,
|
||
|
::testing::ValuesIn(std::vector<AddWithSamplerParam>{
|
||
|
{10, 10, 2400},
|
||
|
{10, 100, 4600},
|
||
|
{10, 1000, 5000},
|
||
|
{50, 10000, 5000000},
|
||
|
{100, 100, 1250000},
|
||
|
{100, 1000, 2000000}}));
|
||
|
|
||
|
} // namespace
|
||
|
|
||
|
} // namespace internal
|
||
|
} // namespace aggregation
|
||
|
} // namespace dist_proc
|