[tdigest] Remove code (#40933)

I added this in anticipation of wanting to use it, but that hasn't transpired... and we're seeing flaky tests. Removing for now.

Closes #40933

COPYBARA_INTEGRATE_REVIEW=https://github.com/grpc/grpc/pull/40933 from ctiller:tdigest-- ba911bfe96
PiperOrigin-RevId: 858731635
This commit is contained in:
Craig Tiller
2026-01-20 13:36:02 -08:00
committed by Copybara-Service
parent 525487ef32
commit b1eb9dea81
10 changed files with 0 additions and 1586 deletions

115
CMakeLists.txt generated
View File

@@ -1839,7 +1839,6 @@ if(gRPC_BUILD_TESTS)
add_dependencies(buildtests_cxx tcp_server_posix_test)
endif()
add_dependencies(buildtests_cxx tcp_socket_utils_test)
add_dependencies(buildtests_cxx tdigest_test)
add_dependencies(buildtests_cxx test_core_credentials_transport_ssl_ssl_credentials_test)
add_dependencies(buildtests_cxx test_core_credentials_transport_tls_tls_credentials_test)
add_dependencies(buildtests_cxx test_core_event_engine_posix_timer_heap_test)
@@ -33744,120 +33743,6 @@ target_link_libraries(tcp_socket_utils_test
)
endif()
if(gRPC_BUILD_TESTS)
add_executable(tdigest_test
src/core/channelz/channel_trace.cc
src/core/channelz/channelz.cc
src/core/channelz/channelz_registry.cc
src/core/channelz/property_list.cc
src/core/channelz/text_encode.cc
src/core/ext/upb-gen/google/protobuf/any.upb_minitable.c
src/core/ext/upb-gen/google/protobuf/duration.upb_minitable.c
src/core/ext/upb-gen/google/protobuf/empty.upb_minitable.c
src/core/ext/upb-gen/google/protobuf/timestamp.upb_minitable.c
src/core/ext/upb-gen/google/rpc/status.upb_minitable.c
src/core/ext/upb-gen/src/proto/grpc/channelz/v2/channelz.upb_minitable.c
src/core/ext/upb-gen/src/proto/grpc/channelz/v2/property_list.upb_minitable.c
src/core/ext/upb-gen/src/proto/grpc/channelz/v2/service.upb_minitable.c
src/core/ext/upbdefs-gen/google/protobuf/any.upbdefs.c
src/core/ext/upbdefs-gen/google/protobuf/duration.upbdefs.c
src/core/ext/upbdefs-gen/google/protobuf/empty.upbdefs.c
src/core/ext/upbdefs-gen/google/protobuf/timestamp.upbdefs.c
src/core/ext/upbdefs-gen/src/proto/grpc/channelz/v2/channelz.upbdefs.c
src/core/ext/upbdefs-gen/src/proto/grpc/channelz/v2/property_list.upbdefs.c
src/core/ext/upbdefs-gen/src/proto/grpc/channelz/v2/service.upbdefs.c
src/core/lib/address_utils/parse_address.cc
src/core/lib/address_utils/sockaddr_utils.cc
src/core/lib/channel/channel_args.cc
src/core/lib/debug/trace.cc
src/core/lib/debug/trace_flags.cc
src/core/lib/experiments/config.cc
src/core/lib/experiments/experiments.cc
src/core/lib/iomgr/closure.cc
src/core/lib/iomgr/combiner.cc
src/core/lib/iomgr/error.cc
src/core/lib/iomgr/exec_ctx.cc
src/core/lib/iomgr/iomgr_internal.cc
src/core/lib/iomgr/sockaddr_utils_posix.cc
src/core/lib/iomgr/socket_utils_windows.cc
src/core/lib/slice/percent_encoding.cc
src/core/lib/slice/slice.cc
src/core/lib/slice/slice_buffer.cc
src/core/lib/slice/slice_string_helpers.cc
src/core/lib/surface/channel_stack_type.cc
src/core/lib/transport/connectivity_state.cc
src/core/lib/transport/status_conversion.cc
src/core/telemetry/histogram_view.cc
src/core/telemetry/stats.cc
src/core/telemetry/stats_data.cc
src/core/util/backoff.cc
src/core/util/glob.cc
src/core/util/grpc_check.cc
src/core/util/grpc_if_nametoindex_posix.cc
src/core/util/grpc_if_nametoindex_unsupported.cc
src/core/util/json/json_reader.cc
src/core/util/json/json_writer.cc
src/core/util/latent_see.cc
src/core/util/per_cpu.cc
src/core/util/postmortem_emit.cc
src/core/util/ref_counted_string.cc
src/core/util/shared_bit_gen.cc
src/core/util/status_helper.cc
src/core/util/tdigest.cc
src/core/util/time.cc
src/core/util/uri.cc
src/core/util/work_serializer.cc
test/core/util/tdigest_test.cc
)
if(WIN32 AND MSVC)
if(BUILD_SHARED_LIBS)
target_compile_definitions(tdigest_test
PRIVATE
"GPR_DLL_IMPORTS"
)
endif()
endif()
target_compile_features(tdigest_test PUBLIC cxx_std_17)
target_include_directories(tdigest_test
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/include
${_gRPC_ADDRESS_SORTING_INCLUDE_DIR}
${_gRPC_RE2_INCLUDE_DIR}
${_gRPC_SSL_INCLUDE_DIR}
${_gRPC_UPB_GENERATED_DIR}
${_gRPC_UPB_GRPC_GENERATED_DIR}
${_gRPC_UPB_INCLUDE_DIR}
${_gRPC_XXHASH_INCLUDE_DIR}
${_gRPC_ZLIB_INCLUDE_DIR}
third_party/googletest/googletest/include
third_party/googletest/googletest
third_party/googletest/googlemock/include
third_party/googletest/googlemock
${_gRPC_PROTO_GENS_DIR}
)
target_link_libraries(tdigest_test
${_gRPC_ALLTARGETS_LIBRARIES}
gtest
upb_textformat_lib
absl::btree
absl::flat_hash_map
absl::inlined_vector
absl::function_ref
absl::hash
absl::type_traits
absl::statusor
absl::string_view
absl::span
absl::utility
${_gRPC_BENCHMARK_LIBRARIES}
gpr
)
endif()
if(gRPC_BUILD_TESTS)

181
build_autogenerated.yaml generated
View File

@@ -24228,187 +24228,6 @@ targets:
- gtest
- grpc
uses_polling: false
- name: tdigest_test
gtest: true
build: test
language: c++
headers:
- src/core/channelz/channel_trace.h
- src/core/channelz/channelz.h
- src/core/channelz/channelz_registry.h
- src/core/channelz/property_list.h
- src/core/channelz/text_encode.h
- src/core/ext/transport/chttp2/transport/http2_status.h
- src/core/ext/upb-gen/google/protobuf/any.upb.h
- src/core/ext/upb-gen/google/protobuf/any.upb_minitable.h
- src/core/ext/upb-gen/google/protobuf/duration.upb.h
- src/core/ext/upb-gen/google/protobuf/duration.upb_minitable.h
- src/core/ext/upb-gen/google/protobuf/empty.upb.h
- src/core/ext/upb-gen/google/protobuf/empty.upb_minitable.h
- src/core/ext/upb-gen/google/protobuf/timestamp.upb.h
- src/core/ext/upb-gen/google/protobuf/timestamp.upb_minitable.h
- src/core/ext/upb-gen/google/rpc/status.upb.h
- src/core/ext/upb-gen/google/rpc/status.upb_minitable.h
- src/core/ext/upb-gen/src/proto/grpc/channelz/v2/channelz.upb.h
- src/core/ext/upb-gen/src/proto/grpc/channelz/v2/channelz.upb_minitable.h
- src/core/ext/upb-gen/src/proto/grpc/channelz/v2/property_list.upb.h
- src/core/ext/upb-gen/src/proto/grpc/channelz/v2/property_list.upb_minitable.h
- src/core/ext/upb-gen/src/proto/grpc/channelz/v2/service.upb.h
- src/core/ext/upb-gen/src/proto/grpc/channelz/v2/service.upb_minitable.h
- src/core/ext/upbdefs-gen/google/protobuf/any.upbdefs.h
- src/core/ext/upbdefs-gen/google/protobuf/duration.upbdefs.h
- src/core/ext/upbdefs-gen/google/protobuf/empty.upbdefs.h
- src/core/ext/upbdefs-gen/google/protobuf/timestamp.upbdefs.h
- src/core/ext/upbdefs-gen/src/proto/grpc/channelz/v2/channelz.upbdefs.h
- src/core/ext/upbdefs-gen/src/proto/grpc/channelz/v2/property_list.upbdefs.h
- src/core/ext/upbdefs-gen/src/proto/grpc/channelz/v2/service.upbdefs.h
- src/core/lib/address_utils/parse_address.h
- src/core/lib/address_utils/sockaddr_utils.h
- src/core/lib/channel/channel_args.h
- src/core/lib/debug/trace.h
- src/core/lib/debug/trace_flags.h
- src/core/lib/debug/trace_impl.h
- src/core/lib/experiments/config.h
- src/core/lib/experiments/experiments.h
- src/core/lib/iomgr/closure.h
- src/core/lib/iomgr/combiner.h
- src/core/lib/iomgr/error.h
- src/core/lib/iomgr/exec_ctx.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/iomgr/port.h
- src/core/lib/iomgr/resolved_address.h
- src/core/lib/iomgr/sockaddr.h
- src/core/lib/iomgr/sockaddr_posix.h
- src/core/lib/iomgr/sockaddr_windows.h
- src/core/lib/iomgr/socket_utils.h
- src/core/lib/slice/percent_encoding.h
- src/core/lib/slice/slice.h
- src/core/lib/slice/slice_buffer.h
- src/core/lib/slice/slice_internal.h
- src/core/lib/slice/slice_refcount.h
- src/core/lib/slice/slice_string_helpers.h
- src/core/lib/surface/channel_stack_type.h
- src/core/lib/transport/connectivity_state.h
- src/core/lib/transport/status_conversion.h
- src/core/telemetry/histogram_view.h
- src/core/telemetry/stats.h
- src/core/telemetry/stats_data.h
- src/core/util/atomic_utils.h
- src/core/util/avl.h
- src/core/util/backoff.h
- src/core/util/bitset.h
- src/core/util/down_cast.h
- src/core/util/dual_ref_counted.h
- src/core/util/function_signature.h
- src/core/util/glob.h
- src/core/util/grpc_check.h
- src/core/util/grpc_if_nametoindex.h
- src/core/util/json/json.h
- src/core/util/json/json_reader.h
- src/core/util/json/json_writer.h
- src/core/util/latent_see.h
- src/core/util/manual_constructor.h
- src/core/util/match.h
- src/core/util/memory_usage.h
- src/core/util/notification.h
- src/core/util/orphanable.h
- src/core/util/overload.h
- src/core/util/per_cpu.h
- src/core/util/postmortem_emit.h
- src/core/util/ref_counted.h
- src/core/util/ref_counted_ptr.h
- src/core/util/ref_counted_string.h
- src/core/util/shared_bit_gen.h
- src/core/util/single_set_ptr.h
- src/core/util/spinlock.h
- src/core/util/status_helper.h
- src/core/util/tdigest.h
- src/core/util/time.h
- src/core/util/upb_utils.h
- src/core/util/uri.h
- src/core/util/work_serializer.h
- third_party/upb/upb/generated_code_support.h
src:
- src/core/channelz/channel_trace.cc
- src/core/channelz/channelz.cc
- src/core/channelz/channelz_registry.cc
- src/core/channelz/property_list.cc
- src/core/channelz/text_encode.cc
- src/core/ext/upb-gen/google/protobuf/any.upb_minitable.c
- src/core/ext/upb-gen/google/protobuf/duration.upb_minitable.c
- src/core/ext/upb-gen/google/protobuf/empty.upb_minitable.c
- src/core/ext/upb-gen/google/protobuf/timestamp.upb_minitable.c
- src/core/ext/upb-gen/google/rpc/status.upb_minitable.c
- src/core/ext/upb-gen/src/proto/grpc/channelz/v2/channelz.upb_minitable.c
- src/core/ext/upb-gen/src/proto/grpc/channelz/v2/property_list.upb_minitable.c
- src/core/ext/upb-gen/src/proto/grpc/channelz/v2/service.upb_minitable.c
- src/core/ext/upbdefs-gen/google/protobuf/any.upbdefs.c
- src/core/ext/upbdefs-gen/google/protobuf/duration.upbdefs.c
- src/core/ext/upbdefs-gen/google/protobuf/empty.upbdefs.c
- src/core/ext/upbdefs-gen/google/protobuf/timestamp.upbdefs.c
- src/core/ext/upbdefs-gen/src/proto/grpc/channelz/v2/channelz.upbdefs.c
- src/core/ext/upbdefs-gen/src/proto/grpc/channelz/v2/property_list.upbdefs.c
- src/core/ext/upbdefs-gen/src/proto/grpc/channelz/v2/service.upbdefs.c
- src/core/lib/address_utils/parse_address.cc
- src/core/lib/address_utils/sockaddr_utils.cc
- src/core/lib/channel/channel_args.cc
- src/core/lib/debug/trace.cc
- src/core/lib/debug/trace_flags.cc
- src/core/lib/experiments/config.cc
- src/core/lib/experiments/experiments.cc
- src/core/lib/iomgr/closure.cc
- src/core/lib/iomgr/combiner.cc
- src/core/lib/iomgr/error.cc
- src/core/lib/iomgr/exec_ctx.cc
- src/core/lib/iomgr/iomgr_internal.cc
- src/core/lib/iomgr/sockaddr_utils_posix.cc
- src/core/lib/iomgr/socket_utils_windows.cc
- src/core/lib/slice/percent_encoding.cc
- src/core/lib/slice/slice.cc
- src/core/lib/slice/slice_buffer.cc
- src/core/lib/slice/slice_string_helpers.cc
- src/core/lib/surface/channel_stack_type.cc
- src/core/lib/transport/connectivity_state.cc
- src/core/lib/transport/status_conversion.cc
- src/core/telemetry/histogram_view.cc
- src/core/telemetry/stats.cc
- src/core/telemetry/stats_data.cc
- src/core/util/backoff.cc
- src/core/util/glob.cc
- src/core/util/grpc_check.cc
- src/core/util/grpc_if_nametoindex_posix.cc
- src/core/util/grpc_if_nametoindex_unsupported.cc
- src/core/util/json/json_reader.cc
- src/core/util/json/json_writer.cc
- src/core/util/latent_see.cc
- src/core/util/per_cpu.cc
- src/core/util/postmortem_emit.cc
- src/core/util/ref_counted_string.cc
- src/core/util/shared_bit_gen.cc
- src/core/util/status_helper.cc
- src/core/util/tdigest.cc
- src/core/util/time.cc
- src/core/util/uri.cc
- src/core/util/work_serializer.cc
- test/core/util/tdigest_test.cc
deps:
- gtest
- upb_textformat_lib
- absl/container:btree
- absl/container:flat_hash_map
- absl/container:inlined_vector
- absl/functional:function_ref
- absl/hash:hash
- absl/meta:type_traits
- absl/status:statusor
- absl/strings:string_view
- absl/types:span
- absl/utility:utility
- benchmark
- gpr
benchmark: true
defaults: benchmark
uses_polling: false
- name: test_core_credentials_transport_ssl_ssl_credentials_test
gtest: true
build: test

View File

@@ -4402,24 +4402,6 @@ grpc_cc_library(
deps = ["//:gpr_platform"],
)
grpc_cc_library(
name = "tdigest",
srcs = [
"util/tdigest.cc",
],
hdrs = [
"util/tdigest.h",
],
external_deps = [
"absl/log",
"absl/status",
"absl/strings",
],
deps = [
"grpc_check",
],
)
grpc_cc_library(
name = "certificate_provider_factory",
hdrs = [

View File

@@ -1,420 +0,0 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/util/tdigest.h"
#include "src/core/util/grpc_check.h"
#include "absl/log/log.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
namespace grpc_core {
namespace {
constexpr double kNan = std::numeric_limits<double>::quiet_NaN();
constexpr double kMaxCompression = 1e6;
constexpr double kPi = 3.14159265358979323846;
// Returns the minimum of compression and kMaxCompression.
double BoundedCompression(double compression) {
static_assert(8 * kMaxCompression < std::numeric_limits<int64_t>::max(),
"kMaxCompression must be smaller than max_int64/8.");
return std::min(kMaxCompression, compression);
}
// Returns the maximum centroids that can be generated by the merging t-digest.
size_t MaxCentroids(double compression) {
compression = BoundedCompression(compression);
return 2 * static_cast<size_t>(std::ceil(compression));
}
double LinearInterpolate(double val1, double val2, double weight1,
double weight2) {
GRPC_DCHECK_GE(weight1, 0);
GRPC_DCHECK_GE(weight2, 0);
GRPC_DCHECK_GT(weight1 + weight2, 0);
return (val1 * weight1 + val2 * weight2) / (weight1 + weight2);
}
} // namespace
TDigest::TDigest(double compression) { Reset(compression); }
void TDigest::Reset(double compression) {
compression_ = BoundedCompression(compression);
// Set the default batch_size to 4 times the number of centroids.
batch_size_ = static_cast<int64_t>(4 * MaxCentroids(compression_));
GRPC_DCHECK(compression_ == 0.0 || batch_size_ > 0);
centroids_.reserve(MaxCentroids(compression_) + batch_size_);
centroids_.clear();
merged_ = 0;
unmerged_ = 0;
min_ = std::numeric_limits<double>::max();
max_ = std::numeric_limits<double>::lowest();
sum_ = 0;
count_ = 0;
}
void TDigest::Add(double val, int64_t count) {
if (count == 0) {
return;
}
// Single sample is considered discrete.
UpdateStats(/*min=*/val, /*max=*/val, /*sum=*/val * count, count);
AddUnmergedCentroid(CentroidPod(val, count));
}
void TDigest::AddUnmergedCentroid(const CentroidPod& centroid) {
GRPC_DCHECK_LT(unmerged_, batch_size_);
centroids_.push_back(centroid);
++unmerged_;
if (unmerged_ == batch_size_) {
DoMerge();
}
}
void TDigest::Merge(const TDigest& that) {
if (compression_ == 0.0) {
Reset(that.Compression());
}
UpdateStats(that.Min(), that.Max(), that.Sum(), that.Count());
for (const auto& centroid : that.centroids_) {
AddUnmergedCentroid(centroid);
}
}
double TDigest::QuantileToCentroid(double quantile) const {
return compression_ * (std::asin(2 * quantile - 1) + kPi / 2) / kPi;
}
double TDigest::CentroidToQuantile(double centroid) const {
centroid = std::min(centroid, compression_);
return (sin(centroid * kPi / compression_ - kPi / 2) + 1) / 2;
}
void TDigest::DoMerge() {
if (unmerged_ == 0) {
return;
}
// We first sort the centroids, and assume the first centroid is merged,
// and the rest are unmerged.
GRPC_DCHECK(!centroids_.empty());
std::sort(centroids_.begin(), centroids_.end());
unmerged_ += merged_ - 1;
merged_ = 1;
const int64_t total_count = count_;
double q0 = 0;
// This is actually S * q_{limit} in the paper, not exactly q_limit.
// But, keeping the scaled value results in eliminating the division in the
// hotpath. Also, it is closer to the reference implementation.
double q_limit = total_count * CentroidToQuantile(q0 + 1);
// When non-discrete, the sum value may change due to floating point errors
// every time centroids are merged. We must correct this each time by keeping
// it as much in sync with current centroids as possible to keep this error
// bounded.
sum_ = 0;
auto last_merged = centroids_.begin();
auto first_unmerged = last_merged + 1;
int64_t merged_count = last_merged->count;
for (; unmerged_ > 0; --unmerged_, ++first_unmerged) {
// Simply merge, if the last merged centroid has enough room for the last
// unmerged element.
if (first_unmerged->count + merged_count <= q_limit) {
// Note that here we use the Welford's method, and
// count must be updated before mean.
last_merged->count += first_unmerged->count;
last_merged->mean +=
((first_unmerged->mean - last_merged->mean) * first_unmerged->count) /
last_merged->count;
merged_count += first_unmerged->count;
continue;
}
// Now we need to move onto the next centroid to merge the first unmerged.
q0 = QuantileToCentroid(static_cast<double>(merged_count) / total_count);
q_limit = total_count * CentroidToQuantile(q0 + 1);
merged_count += first_unmerged->count;
sum_ += last_merged->mean * last_merged->count;
++merged_;
++last_merged;
*last_merged = *first_unmerged;
}
sum_ += last_merged->mean * last_merged->count;
unmerged_ = 0;
centroids_.resize(merged_);
if (!centroids_.empty()) {
min_ = std::min(min_, centroids_.front().mean);
max_ = std::max(max_, centroids_.back().mean);
}
GRPC_DCHECK_LE(centroids_.size(), MaxCentroids(compression_));
}
// We use linear interpolation between mid points of centroids when calculating
// Cdf() and Percentile(). All unmerged centoirds are merged first so that they
// are strongly ordered, then we use linear interpolation with points:
//
// (percentile, value) = (0, min), (count[0] / 2, mean[0]), ..
// ((count[i-1]+count[i])/2, mean[i]), ..
// (count[last], max)
//
// the CDF from centroids with interpolation points marked with '*':
//
// count
// |
// +c[2]| --------*
// (=tot)| | |
// | * |
// +c[1]| --------| |
// | | |
// | * |
// +c[0]| -----| |
// | | |
// | * |
// | | |
// 0 *----------------------------- value
// min m[0] m[1] m[2] max
//
double TDigest::Cdf(double val) {
DoMerge();
if (merged_ == 0) {
return kNan;
}
if (val < min_) {
return 0;
}
// We diverge from the spec here. If value == max == min, we return 1.
if (val >= max_) {
return 1;
}
GRPC_DCHECK_NE(min_, max_);
if (merged_ == 1) {
return (val - min_) / (min_ - max_);
}
if (val < centroids_[0].mean) {
return LinearInterpolate(
0.0, static_cast<double>(centroids_[0].count) / count_ / 2.0,
centroids_[0].mean - val, val - min_);
}
if (val >= centroids_.back().mean) {
return LinearInterpolate(
1.0 - static_cast<double>(centroids_.back().count) / count_ / 2.0, 1,
max_ - val, val - centroids_.back().mean);
}
double accum_count = centroids_[0].count / 2.0;
for (size_t i = 0; i < centroids_.size(); ++i) {
if (centroids_[i].mean == val) {
double prev_accum_count = accum_count;
// We may have centroids of the same mean. We need to sum their counts
// and then interpolate.
for (; centroids_[i + 1].mean == val; ++i) {
accum_count += centroids_[i].count + centroids_[i + 1].count;
}
return (prev_accum_count + accum_count) / 2.0 / count_;
}
if (centroids_[i].mean <= val && val < centroids_[i + 1].mean) {
auto mean1 = centroids_[i].mean;
auto mean2 = centroids_[i + 1].mean;
double mean_ratio;
// guard against double madness.
if (mean2 <= mean1) {
mean_ratio = 1;
} else {
mean_ratio = (val - mean1) / (mean2 - mean1);
}
double delta_count =
(centroids_[i].count + centroids_[i + 1].count) / 2.0;
return (accum_count + delta_count * mean_ratio) / count_;
}
accum_count += (centroids_[i].count + centroids_[i + 1].count) / 2.0;
}
LOG(DFATAL) << "Cannot measure CDF for: " << val;
return kNan;
}
double TDigest::Quantile(double quantile) {
GRPC_DCHECK_LE(quantile, 1);
GRPC_DCHECK_GE(quantile, 0);
DoMerge();
if (merged_ == 0) {
return kNan;
}
if (merged_ == 1) {
return centroids_[0].mean;
}
const double quantile_count = quantile * count_;
double prev_count = 0;
double prev_val = min_;
double this_count = centroids_[0].count / 2.0;
double this_val = centroids_[0].mean;
for (size_t i = 0; i < centroids_.size(); ++i) {
if (quantile_count < this_count) {
break;
}
prev_count = this_count;
prev_val = this_val;
if (i == centroids_.size() - 1) {
// Interpolate between max and the last centroid.
this_count = count_;
this_val = max_;
} else {
this_count += (centroids_[i].count + centroids_[i + 1].count) / 2.0;
this_val = centroids_[i + 1].mean;
}
}
return LinearInterpolate(prev_val, this_val, this_count - quantile_count,
quantile_count - prev_count);
}
std::string TDigest::ToString() {
std::string str = absl::StrCat(compression_);
if (count_ <= 1) {
if (count_ == 0) {
// Note the string representation serializes min/max = 0 when empty.
return absl::StrAppendFormat(&str, "/0/0/0/0");
}
return absl::StrAppendFormat(&str, "/%0.17g", centroids_.front().mean);
}
DoMerge();
absl::StrAppendFormat(&str, "/%0.17g/%0.17g/%0.17g/%d", min_, max_, sum_,
count_);
for (auto& centroid : centroids_) {
absl::StrAppendFormat(&str, "/%0.17g:%d", centroid.mean, centroid.count);
}
return str;
}
absl::Status TDigest::FromString(absl::string_view string) {
// Accept an empty string as 'not set'.
// Although ToString() never produces an empty string, an empty string is
// still expected when a t-Digest is missing.
if (string.empty()) {
Reset(0);
return absl::OkStatus();
}
const std::vector<absl::string_view> tokens = absl::StrSplit(string, '/');
auto iter = tokens.begin();
// First token (compression and discrete).
if (iter == tokens.end() || iter->empty()) {
return absl::InvalidArgumentError("No compression/discrete.");
}
double double_val;
if (!absl::SimpleAtod(iter->substr(0, iter->length()), &double_val) ||
double_val < 0) {
return absl::InvalidArgumentError(
absl::StrCat("Invalid double_val/discrete: ", *iter));
}
Reset(double_val);
if (++iter == tokens.end()) {
return absl::InvalidArgumentError("Unexpected end of string.");
}
// Single-valued t-Digest.
if ((iter + 1) == tokens.end()) {
if (!absl::SimpleAtod(*iter, &double_val)) {
return absl::InvalidArgumentError(
absl::StrCat("Invalid single-value: ", *iter));
}
Add(double_val, 1);
return absl::OkStatus();
}
// Parse min/max/sum/count.
double min = 0.0, max = 0.0, sum = 0.0;
int64_t count = 0;
if (iter == tokens.end() || !absl::SimpleAtod(*iter, &min) ||
++iter == tokens.end() || !absl::SimpleAtod(*iter, &max) ||
++iter == tokens.end() || !absl::SimpleAtod(*iter, &sum) ||
++iter == tokens.end() || !absl::SimpleAtoi(*iter, &count)) {
return absl::InvalidArgumentError("Invalid min, max, sum, or count.");
}
// Empty. Note the string representation serializes min/max = 0 when empty.
if (++iter == tokens.end()) {
if (min != 0 || max != 0 || count != 0 || sum != 0) {
return absl::InvalidArgumentError(
"Empty t-Digest with non-zero min, max, sum, or count.");
}
return absl::OkStatus();
}
// Parse centroids.
int64_t int_val = 0;
for (; iter != tokens.end(); ++iter) {
const auto pos = iter->find_first_of(':');
if (pos == absl::string_view::npos ||
!absl::SimpleAtod(iter->substr(0, pos), &double_val) ||
!absl::SimpleAtoi(iter->substr(pos + 1), &int_val)) {
return absl::InvalidArgumentError(
absl::StrCat("Invalid centroid: ", *iter));
}
Add(double_val, int_val);
}
DoMerge();
min_ = min;
max_ = max;
if (centroids_.empty()) {
return absl::OkStatus();
}
// Validate min/max/sum/count.
GRPC_DCHECK_LT(std::abs(sum - sum_), 1e-10) << "Invalid sum value.";
if (count != count_) {
return absl::InvalidArgumentError("Invalid count value.");
}
return absl::OkStatus();
}
size_t TDigest::MemUsageBytes() const {
return sizeof(TDigest) + (sizeof(CentroidPod) * centroids_.capacity());
}
} // namespace grpc_core

View File

@@ -1,197 +0,0 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef GRPC_SRC_CORE_UTIL_TDIGEST_H
#define GRPC_SRC_CORE_UTIL_TDIGEST_H
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
namespace grpc_core {
// Represents a t-digest [1].
//
// t-digest is a structure that can store accurate accumulation of quantiles
// and other rank-based statistics, over a stream of data.
//
// There are different flavors of t-digest, but here we only implement a merging
// t-digest.
//
// Users can add values to a t-digest, and also merge t-digests.
//
// [1] Ted Dunning and Otmar Ertl, "COMPUTING EXTREMELY ACCURATE QUANTILES USING
// t-DIGESTS", go/tdigest.
//
// Note on thread-safety: This class provides no thread-safety guarantee. Access
// to the methods of this class must be synchronized externally by the user.
class TDigest final {
public:
TDigest(const TDigest&) = delete;
TDigest(TDigest&&) = delete;
TDigest& operator=(const TDigest&) = delete;
TDigest& operator=(TDigest&&) = delete;
// Creates a t-digest with the given compression factor (aka delta).
//
// The number of centroids kept in a t-digest is in O(compression).
// A t-digest should keep less than 2*compression.
explicit TDigest(double compression);
void Reset(double compression);
// Adds `count` number of `val` to t-digest.
void Add(double val, int64_t count);
// Adds a single value with a count of 1 to the t-digest.
void Add(double val) { Add(val, 1); }
// Merges `that` t-digest into `this` t-digest.
void Merge(const TDigest& that);
// Returns an approximate quantile of values stored in the t-digest. Inclusive
// i.e. largest value that <= quantile.
//
// `quantile` can be any real value between 0 and 1. For example, 0.99 would
// return the 99th percentile.
double Quantile(double quantile);
// Returns the cumulative probability corresponding to the given value.
// Inclusive i.e. probabiliy that <= val.
double Cdf(double val);
// Returns the minimum of all values added to the t-digest.
double Min() const { return min_; }
// Returns the maximum of all values added to the t-digest.
double Max() const { return max_; }
// Returns the sum of all values added to the t-digest.
double Sum() const { return sum_; }
// Returns the count of all values added to the t-digest.
int64_t Count() const { return count_; }
// Returns the compression factor of the t-digest.
double Compression() const { return compression_; }
// Returns the string representation of this t-digest. The string format is
// external and compatible with all implementations of this library.
std::string ToString();
// Restores the t-digest from the string representation.
// Returns an error if `string` is mal-formed where the state of this t-digest
// is undefined.
absl::Status FromString(absl::string_view string);
// Returns the (approximate) size in bytes of storing this t-digest in RAM.
// Useful when a TDigest is used as the accumulator in a Flume AccumulateFn.
size_t MemUsageBytes() const;
void Swap(TDigest& that) {
std::swap(compression_, that.compression_);
std::swap(batch_size_, that.batch_size_);
std::swap(centroids_, that.centroids_);
std::swap(merged_, that.merged_);
std::swap(unmerged_, that.unmerged_);
std::swap(min_, that.min_);
std::swap(max_, that.max_);
std::swap(sum_, that.sum_);
std::swap(count_, that.count_);
}
private:
// Centroid the primitive construct in t-digest.
// A centroid has a mean and a count.
struct CentroidPod {
CentroidPod() : CentroidPod(0, 0) {}
CentroidPod(double mean, int64_t count) : mean(mean), count(count) {}
double mean;
int64_t count;
bool operator<(const CentroidPod& that) const {
// For centroids with the same mean, we want to have the centroids
// with a larger mass in front of the queue.
//
// See http://github.com/tdunning/t-digest/issues/78 for the discussion.
return mean < that.mean || (mean == that.mean && count > that.count);
}
};
// Adds a centroid to the unmerged list, and merge the unemerged centroids
// when we have `batch_size` of unmerged centroids.
void AddUnmergedCentroid(const CentroidPod& centroid);
// Merges the batch of unmerged points and centroids.
//
// This is an in-place implementation of the progressive merging algorithm,
// and does work solely using the centroids_ vector.
void DoMerge();
// Converts a quantile to the approximate centroid index.
//
// This is the k(q,delta) function in the t-digest paper.
// See Figure 1 for more details.
double QuantileToCentroid(double quantile) const;
// Converts a centroid index to an approximate quantile.
//
// This is the _inverse_ of k(q,delta) function in the t-digest paper.
// See Figure 1 for more details.
double CentroidToQuantile(double centroid) const;
// Updates min, max, sum, count.
void UpdateStats(double min, double max, double sum, int64_t count) {
if (count <= 0) return;
if (min < min_) min_ = min;
if (max > max_) max_ = max;
count_ += count;
sum_ += sum;
}
// Compression factor (aka delta).
//
// When zero, to be determined from the first merge.
double compression_;
// Maximum number of unmerged elements.
int64_t batch_size_;
// All centroids merged and unmerged. Unmerged centroids can actually be a
// value or a centroid.
std::vector<CentroidPod> centroids_;
// Number of centroids that are already merged.
int64_t merged_;
// Number of centroids and values that are added but not merged yet.
int64_t unmerged_;
// Minimum of all values and centroid means.
double min_;
// Maximum of all values and centroid means.
double max_;
// Sum of all values and centroid means added.
double sum_;
// Count of all values and centroids added.
int64_t count_;
};
} // namespace grpc_core
#endif // GRPC_SRC_CORE_UTIL_TDIGEST_H

View File

@@ -63,31 +63,6 @@ grpc_cc_test(
],
)
grpc_cc_test(
name = "tdigest_test",
srcs = ["tdigest_test.cc"],
external_deps = [
"gtest",
"absl/random",
"absl/container:flat_hash_map",
"benchmark",
],
uses_event_engine = False,
uses_polling = False,
deps = [
"//src/core:tdigest",
],
)
grpc_cc_benchmark(
name = "bm_tdigest",
srcs = ["bm_tdigest.cc"],
monitoring = HISTORY,
deps = [
"//src/core:tdigest",
],
)
grpc_cc_benchmark(
name = "bm_latent_see",
srcs = ["bm_latent_see.cc"],
@@ -906,17 +881,6 @@ grpc_cc_test(
],
)
grpc_fuzz_test(
name = "tdigest_fuzztest",
srcs = ["tdigest_fuzztest.cc"],
external_deps = [
"fuzztest",
"fuzztest_main",
"gtest",
],
deps = ["//src/core:tdigest"],
)
grpc_fuzz_test(
name = "useful_fuzztest",
srcs = ["useful_fuzztest.cc"],

View File

@@ -1,59 +0,0 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <benchmark/benchmark.h>
#include <random>
#include "src/core/util/tdigest.h"
namespace grpc_core {
static void BM_AddWithCompression(benchmark::State& state) {
// kNumValues is 512 with a 4k page.
const size_t kNumValues = sysconf(_SC_PAGE_SIZE) / sizeof(double);
std::vector<double> vals;
vals.reserve(kNumValues);
std::mt19937 gen(1234);
std::exponential_distribution<double> exp_dist;
for (int idx = 0; idx < kNumValues; idx++) {
vals.push_back(exp_dist(gen));
}
TDigest tdigest(/*compression=*/state.range(0));
while (state.KeepRunningBatch(kNumValues)) {
for (double val : vals) {
tdigest.Add(val);
}
}
state.SetItemsProcessed(state.iterations());
}
BENCHMARK(BM_AddWithCompression)->Arg(1)->Arg(10)->Arg(100)->Arg(1000);
} // namespace grpc_core
// Some distros have RunSpecifiedBenchmarks under the benchmark namespace,
// and others do not. This allows us to support both modes.
namespace benchmark {
void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
} // namespace benchmark
int main(int argc, char** argv) {
::benchmark::Initialize(&argc, argv);
benchmark::RunTheBenchmarksNamespaced();
return 0;
}

View File

@@ -1,56 +0,0 @@
// Copyright 2023 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <utility>
#include <vector>
#include "fuzztest/fuzztest.h"
#include "src/core/util/tdigest.h"
#include "gtest/gtest.h"
using fuzztest::InRange;
using fuzztest::VectorOf;
namespace grpc_core {
double GetTrueQuantile(const std::vector<double>& samples, double quantile) {
std::vector<double> s = samples;
double true_idx = static_cast<double>(s.size()) * quantile - 1;
double idx_left = std::floor(true_idx);
if (idx_left < 0.0) return 0.0;
double idx_right = std::ceil(true_idx);
std::sort(s.begin(), s.end());
if (idx_left == idx_right) {
return s[idx_left];
}
return s[idx_left] * (idx_right - true_idx) +
s[idx_right] * (true_idx - idx_left);
}
void QuantilesMatch(std::vector<double> values, double compression,
double quantile) {
TDigest digest(compression);
for (auto value : values) {
digest.Add(value);
}
EXPECT_NEAR(digest.Quantile(quantile), GetTrueQuantile(values, quantile),
1.0);
}
FUZZ_TEST(MyTestSuite, QuantilesMatch)
.WithDomains(VectorOf(InRange(0.0, 10.0)).WithMinSize(100),
InRange(20, 2000), InRange(0, 1));
} // namespace grpc_core

View File

@@ -1,480 +0,0 @@
// Copyright 2024 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/core/util/tdigest.h"
#include <algorithm>
#include <cmath>
#include <functional>
#include <limits>
#include <random>
#include <string>
#include <utility>
#include <vector>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "absl/container/flat_hash_map.h"
#include "absl/random/random.h"
using testing::DoubleNear;
using testing::NanSensitiveDoubleEq;
namespace grpc_core {
namespace {
constexpr double kNan = std::numeric_limits<double>::quiet_NaN();
double GetTrueQuantile(const std::vector<double>& samples, double quantile) {
std::vector<double> s = samples;
double true_idx = static_cast<double>(s.size()) * quantile - 1;
double idx_left = std::floor(true_idx);
double idx_right = std::ceil(true_idx);
std::sort(s.begin(), s.end());
if (idx_left == idx_right) {
return s[idx_left];
}
return s[idx_left] * (idx_right - true_idx) +
s[idx_right] * (true_idx - idx_left);
}
double GetTrueCdf(const std::vector<double>& samples, double val) {
std::vector<double> s = samples;
std::sort(s.begin(), s.end());
if (val < s.front()) {
return 0;
}
if (val >= s.back()) {
return 1;
}
int true_idx = 0;
for (double v : s) {
if (v > val) {
break;
}
++true_idx;
}
return static_cast<double>(true_idx) / static_cast<double>(samples.size());
}
} // namespace
TEST(TDigestTest, Reset) {
TDigest tdigest(100);
EXPECT_EQ(tdigest.Compression(), 100);
tdigest.Reset(50);
EXPECT_EQ(tdigest.Compression(), 50);
tdigest.Reset(20);
EXPECT_EQ(tdigest.Compression(), 20);
}
TEST(TDigestTest, Stats) {
TDigest tdigest(100);
tdigest.Add(10);
EXPECT_EQ(1, tdigest.Count());
EXPECT_EQ(10, tdigest.Min());
EXPECT_EQ(10, tdigest.Max());
EXPECT_EQ(10, tdigest.Sum());
EXPECT_EQ(100, tdigest.Compression());
tdigest.Add(20);
EXPECT_EQ(2, tdigest.Count());
EXPECT_EQ(10, tdigest.Min());
EXPECT_EQ(20, tdigest.Max());
EXPECT_EQ(30, tdigest.Sum());
EXPECT_EQ(100, tdigest.Compression());
}
TEST(TDigestTest, MergeMultipleIntoSingleValued) {
constexpr double kMerges = 100;
constexpr double kCompression = 100;
TDigest tdigest(kCompression);
auto p01 = tdigest.Quantile(.01);
auto p50 = tdigest.Quantile(.50);
auto p99 = tdigest.Quantile(.99);
EXPECT_THAT(p01, NanSensitiveDoubleEq(kNan));
EXPECT_THAT(p50, NanSensitiveDoubleEq(kNan));
EXPECT_THAT(p99, NanSensitiveDoubleEq(kNan));
for (int i = 0; i < kMerges; i++) {
TDigest new_tdigest(kCompression);
new_tdigest.Add(10);
new_tdigest.Merge(tdigest);
new_tdigest.Swap(tdigest);
}
EXPECT_EQ(kMerges, tdigest.Count());
p01 = tdigest.Quantile(.01);
p50 = tdigest.Quantile(.50);
p99 = tdigest.Quantile(.99);
EXPECT_THAT(p01, NanSensitiveDoubleEq(10));
EXPECT_THAT(p50, NanSensitiveDoubleEq(10));
EXPECT_THAT(p99, NanSensitiveDoubleEq(10));
}
TEST(TDigestTest, MergeSingleIntoMultipleValued) {
constexpr double kMerges = 100;
constexpr double kCompression = 100;
TDigest tdigest(kCompression);
auto p01 = tdigest.Quantile(.01);
auto p50 = tdigest.Quantile(.50);
auto p99 = tdigest.Quantile(.99);
EXPECT_THAT(p01, NanSensitiveDoubleEq(kNan));
EXPECT_THAT(p50, NanSensitiveDoubleEq(kNan));
EXPECT_THAT(p99, NanSensitiveDoubleEq(kNan));
for (int i = 0; i < kMerges; i++) {
TDigest new_tdigest(kCompression);
new_tdigest.Add(10);
tdigest.Merge(new_tdigest);
}
EXPECT_EQ(kMerges, tdigest.Count());
p01 = tdigest.Quantile(.01);
p50 = tdigest.Quantile(.50);
p99 = tdigest.Quantile(.99);
EXPECT_THAT(p01, NanSensitiveDoubleEq(10));
EXPECT_THAT(p50, NanSensitiveDoubleEq(10));
EXPECT_THAT(p99, NanSensitiveDoubleEq(10));
}
TEST(TDigestTest, CdfBetweenLastCentroidAndMax) {
// Make sure we return the correct CDF value for an element between the last
// centroid and maximum.
constexpr double kCompression = 10;
TDigest tdigest(kCompression);
for (int i = 1; i <= 100; i++) {
tdigest.Add(i);
}
for (int i = 1; i <= 100; i++) {
tdigest.Add(i * 100);
}
for (int i = 1; i <= 100; i++) {
tdigest.Add(i * 200);
}
auto cdf_min = tdigest.Cdf(1);
EXPECT_THAT(cdf_min, NanSensitiveDoubleEq(0));
auto cdf_max = tdigest.Cdf(20000);
EXPECT_THAT(cdf_max, NanSensitiveDoubleEq(1));
auto cdf_tail = tdigest.Cdf(20000 - 1);
EXPECT_THAT(cdf_tail, DoubleNear(0.9999, 1e-4));
}
TEST(TDigestTest, CdfMostlyMin) {
// Make sure we return the correct CDF value when most samples are the
// minimum value.
constexpr double kCompression = 10;
constexpr double kMin = 0;
constexpr double kMax = 1;
TDigest tdigest(kCompression);
for (int i = 0; i < 100; i++) {
tdigest.Add(kMin);
}
tdigest.Add(kMax);
auto cdf_min = tdigest.Cdf(kMin);
EXPECT_THAT(cdf_min, DoubleNear(0.98, 1e-3));
auto cdf_max = tdigest.Cdf(kMax);
EXPECT_THAT(cdf_max, NanSensitiveDoubleEq(1));
}
TEST(TDigestTest, SingletonInACrowd) {
// Add a test case similar to what is reported upstream:
// https://github.com/tdunning/t-digest/issues/89
//
// We want to make sure when we have 10k samples of a specific number,
// we do not lose information about a single sample at the tail.
constexpr int kCrowdSize = 10000;
constexpr double kCompression = 100;
TDigest tdigest(kCompression);
for (int i = 0; i < kCrowdSize; i++) {
tdigest.Add(10);
}
tdigest.Add(20);
EXPECT_THAT(tdigest.Quantile(0), NanSensitiveDoubleEq(10));
EXPECT_THAT(tdigest.Quantile(0.5), NanSensitiveDoubleEq(10));
EXPECT_THAT(tdigest.Quantile(0.8), NanSensitiveDoubleEq(10));
EXPECT_THAT(tdigest.Quantile(0.9), NanSensitiveDoubleEq(10));
EXPECT_THAT(tdigest.Quantile(0.99), NanSensitiveDoubleEq(10));
EXPECT_THAT(tdigest.Quantile(0.999), NanSensitiveDoubleEq(10));
EXPECT_THAT(tdigest.Quantile(1), NanSensitiveDoubleEq(20));
}
TEST(TDigestTest, MergeDifferentCompressions) {
TDigest tdigest_1(100);
TDigest tdigest_2(200);
for (int i = 0; i < 10; ++i) {
tdigest_1.Add(1);
tdigest_2.Add(2);
}
{
// Compression/discrete should remain "to be determined".
TDigest tdigest_0(0);
tdigest_0.Merge(tdigest_0);
EXPECT_EQ(0, tdigest_0.Compression());
}
{
// Should take compression/discrete of tdigest_1.
TDigest tdigest_0(0);
tdigest_0.Merge(tdigest_1);
EXPECT_EQ(100, tdigest_0.Compression());
EXPECT_EQ(1, tdigest_0.Max());
EXPECT_EQ(10, tdigest_0.Count());
}
{
// Should take compression/discrete of tdigest_2.
TDigest tdigest_0(0);
tdigest_0.Merge(tdigest_2);
EXPECT_EQ(200, tdigest_0.Compression());
EXPECT_EQ(2, tdigest_0.Max());
EXPECT_EQ(10, tdigest_0.Count());
// Now should succeed without changing compression/discrete.
tdigest_0.Merge(tdigest_1);
EXPECT_EQ(200, tdigest_0.Compression());
EXPECT_EQ(2, tdigest_0.Max());
EXPECT_EQ(20, tdigest_0.Count());
}
}
// Sample generators for Cdf() and Percentile() precision tests.
// Returns a vector of `val` repeated `count` times.
std::vector<double> ConstantSamples(int count, double val) {
std::vector<double> v;
v.reserve(count);
for (int i = 0; i < count; ++i) {
v.push_back(val);
}
return v;
}
// Returns a vector of `count` number of samples from Normal(mean, stdev.
std::vector<double> NormalSamples(int count, double mean, double stdev) {
std::vector<double> v;
v.reserve(count);
absl::BitGen rng;
for (int i = 0; i < count; i++) {
v.push_back(absl::Gaussian(rng, mean, stdev));
}
return v;
}
// Returns a vector of `count` number of samples drawn uniformly randomly in
// range [from, to].
template <typename T = double>
std::vector<double> UniformSamples(int count, double from, double to) {
std::vector<double> v;
v.reserve(count);
absl::BitGen rng;
for (int i = 0; i < count; i++) {
v.push_back(static_cast<T>(absl::Uniform(rng, from, to)));
}
return v;
}
struct PrecisionTestParam {
// Function to get samples vector.
std::function<std::vector<double>()> generate_samples;
// Map of {percentile or val, max error bound}.
absl::flat_hash_map<double, double> max_errors;
};
class QuantilePrecisionTest
: public ::testing::TestWithParam<PrecisionTestParam> {
public:
static constexpr double kCompression = 100;
static constexpr double kMaxCentroids = 200;
};
// Tests max and average Percentile() errors against the true percentile.
TEST_P(QuantilePrecisionTest, QuantilePrecisionTest) {
// We expect higher precision near both ends.
const absl::flat_hash_map<double, double>& max_error_bounds =
GetParam().max_errors;
const std::vector<double> samples = GetParam().generate_samples();
TDigest tdigest(kCompression);
for (double i : samples) {
tdigest.Add(i);
}
for (const auto& p : max_error_bounds) {
double quantile = p.first;
double error =
abs(tdigest.Quantile(quantile) - GetTrueQuantile(samples, quantile));
EXPECT_LE(error, p.second)
<< "(quantile=" << quantile << ") actual:" << tdigest.Quantile(quantile)
<< " expected:" << GetTrueQuantile(samples, quantile);
}
}
// Same as above but merge every 1000 samples.
TEST_P(QuantilePrecisionTest, MergeQuantilePrecisionTest) {
// We expect higher precision near both ends.
const absl::flat_hash_map<double, double>& max_error_bounds =
GetParam().max_errors;
const std::vector<double> samples = GetParam().generate_samples();
TDigest tdigest(kCompression);
TDigest temp(kCompression);
for (double i : samples) {
temp.Add(i);
if (temp.Count() == 1000) {
tdigest.Merge(temp);
temp.Reset(kCompression);
}
}
tdigest.Merge(temp);
ASSERT_EQ(tdigest.Count(), samples.size());
for (const auto& p : max_error_bounds) {
double quantile = p.first;
double error =
abs(tdigest.Quantile(quantile) - GetTrueQuantile(samples, quantile));
EXPECT_LE(error, p.second)
<< "(quantile=" << quantile << ") actual:" << tdigest.Quantile(quantile)
<< " expected:" << GetTrueQuantile(samples, quantile);
}
}
INSTANTIATE_TEST_SUITE_P(
QuantilePrecisionTest, QuantilePrecisionTest,
::testing::Values(
// Constant
PrecisionTestParam{[]() { return ConstantSamples(100000, 10); },
{{0.01, 0}, {0.5, 0}, {0.99, 0}}},
// Continuous samples
PrecisionTestParam{[]() { return NormalSamples(100000, 0, 5); },
{{0.01, 0.5}, {0.5, 1}, {0.99, 0.5}}},
PrecisionTestParam{[]() { return UniformSamples(100000, -5000, 5000); },
{{0.01, 22}, {0.5, 70}, {0.99, 22}}}));
class CdfPrecisionTest : public ::testing::TestWithParam<PrecisionTestParam> {
public:
static constexpr double kCompression = 100;
static constexpr double kMaxCentroids = 200;
};
// Tests max and average Percentile() errors against the true percentile.
TEST_P(CdfPrecisionTest, CdfPrecisionTest) {
// We expect higher precision near both ends.
const absl::flat_hash_map<double, double>& max_error_bounds =
GetParam().max_errors;
const std::vector<double> samples = GetParam().generate_samples();
TDigest tdigest(kCompression);
for (double i : samples) {
tdigest.Add(i);
}
ASSERT_EQ(tdigest.Count(), samples.size());
for (const auto& p : max_error_bounds) {
double val = p.first;
double error = abs(tdigest.Cdf(val) - GetTrueCdf(samples, val));
EXPECT_LE(error, p.second)
<< "(val=" << val << ") actual:" << tdigest.Cdf(val)
<< " expected:" << GetTrueCdf(samples, val);
}
}
// Same as above but merge every 1000 samples.
TEST_P(CdfPrecisionTest, MergeCdfPrecisionTest) {
// We expect higher precision near both ends.
const absl::flat_hash_map<double, double>& max_error_bounds =
GetParam().max_errors;
const std::vector<double> samples = GetParam().generate_samples();
TDigest tdigest(kCompression);
TDigest temp(kCompression);
for (double i : samples) {
temp.Add(i);
if (temp.Count() == 1000) {
tdigest.Merge(temp);
temp.Reset(kCompression);
}
}
tdigest.Merge(temp);
ASSERT_EQ(tdigest.Count(), samples.size());
for (const auto& p : max_error_bounds) {
double val = p.first;
double error = abs(tdigest.Cdf(val) - GetTrueCdf(samples, val));
EXPECT_LE(error, p.second)
<< "(val=" << val << ") actual:" << tdigest.Cdf(val)
<< " expected:" << GetTrueCdf(samples, val);
}
}
INSTANTIATE_TEST_SUITE_P(
CdfPrecisionTest, CdfPrecisionTest,
::testing::Values(
// Constant.
PrecisionTestParam{[]() { return ConstantSamples(100000, 10); },
{{9, 0}, {10, 0}, {11, 0}}},
// Continuous samples.
PrecisionTestParam{[]() { return NormalSamples(100000, 0, 5); },
{{-10, 0.005}, {0.0, 0.007}, {10, 0.005}}},
PrecisionTestParam{[]() { return UniformSamples(100000, -5000, 5000); },
{{-5000.1, 0},
{-4900, 0.005},
{0, 0.007},
{4900, 0.005},
{5000, 0}}},
// Dense and sparse samples.
PrecisionTestParam{
[]() { return UniformSamples<int>(100000, 0, 10); },
{{-0.01, 0}, {0.01, 0.03}, {5.0, 0.05}, {9.99, 0.03}, {10, 0}}},
PrecisionTestParam{
[]() { return UniformSamples<int>(100000, -10000, 10000); },
{{-10001, 0},
{-9900, 0.005},
{0, 0.008},
{9900, 0.005},
{10000, 0}}}));
TEST(TDigestEmptyStringTest, Test) {
TDigest tdigest(0);
ASSERT_TRUE(tdigest.FromString("").ok());
EXPECT_EQ(tdigest.ToString(), "0/0/0/0/0");
}
} // namespace grpc_core
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@@ -9271,30 +9271,6 @@
],
"uses_polling": false
},
{
"args": [],
"benchmark": true,
"ci_platforms": [
"linux",
"mac",
"posix",
"windows"
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"flaky": false,
"gtest": true,
"language": "c++",
"name": "tdigest_test",
"platforms": [
"linux",
"mac",
"posix",
"windows"
],
"uses_polling": false
},
{
"args": [],
"benchmark": false,