This commit is contained in:
Robert Schulze 2024-08-19 18:50:29 +00:00
parent e0044bbf32
commit f5f8a0bc39
No known key found for this signature in database
GPG Key ID: 26703B55FB13728A
18 changed files with 116 additions and 31 deletions

View File

@ -727,6 +727,17 @@ void LocalServer::processConfig()
}
global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio);
String skipping_index_cache_policy = server_settings.skipping_index_cache_policy;
size_t skipping_index_cache_size = server_settings.skipping_index_cache_size;
size_t skipping_index_cache_max_count = server_settings.skipping_index_cache_max_entries;
double skipping_index_cache_size_ratio = server_settings.skipping_index_cache_size_ratio;
if (skipping_index_cache_size > max_cache_size)
{
skipping_index_cache_size = max_cache_size;
LOG_INFO(log, "Lowered skipping index cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(skipping_index_cache_size));
}
global_context->setSkippingIndexCache(skipping_index_cache_policy, skipping_index_cache_size, skipping_index_cache_max_count, skipping_index_cache_size_ratio);
size_t mmap_cache_size = server_settings.mmap_cache_size;
if (mmap_cache_size > max_cache_size)
{

View File

@ -1434,6 +1434,17 @@ try
}
global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio);
String skipping_index_cache_policy = config().getString("skipping_index_cache_policy", DEFAULT_SKIPPING_INDEX_CACHE_POLICY);
size_t skipping_index_cache_size = config().getUInt64("skipping_index_cache_size", DEFAULT_SKIPPING_INDEX_CACHE_MAX_SIZE);
size_t skipping_index_cache_max_entries = config().getUInt64("skipping_index_cache_max_entries", DEFAULT_SKIPPING_INDEX_CACHE_MAX_ENTRIES);
double skipping_index_cache_size_ratio = config().getDouble("skipping_index_cache_size_ratio", DEFAULT_SKIPPING_INDEX_CACHE_SIZE_RATIO);
if (skipping_index_cache_size > max_cache_size)
{
skipping_index_cache_size = max_cache_size;
LOG_INFO(log, "Lowered skipping index cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(skipping_index_cache_size));
}
global_context->setSkippingIndexCache(skipping_index_cache_policy, skipping_index_cache_size, skipping_index_cache_max_entries, skipping_index_cache_size_ratio);
size_t mmap_cache_size = server_settings.mmap_cache_size;
if (mmap_cache_size > max_cache_size)
{

View File

@ -101,6 +101,11 @@ UInt64 BloomFilter::isEmpty() const
return true;
}
size_t BloomFilter::memoryUsageBytes() const
{
return filter.capacity() * sizeof(filter[0]);
}
bool operator== (const BloomFilter & a, const BloomFilter & b)
{
for (size_t i = 0; i < a.words; ++i)

View File

@ -55,6 +55,8 @@ public:
/// For debug.
UInt64 isEmpty() const;
size_t memoryUsageBytes() const;
friend bool operator== (const BloomFilter & a, const BloomFilter & b);
private:

View File

@ -77,6 +77,8 @@ public:
const GinSegmentWithRowIdRangeVector & getFilter() const { return rowid_ranges; }
GinSegmentWithRowIdRangeVector & getFilter() { return rowid_ranges; }
size_t memoryUsageBytes() const { return rowid_ranges.capacity() * sizeof(rowid_ranges[0]); }
private:
/// Filter parameters
const GinFilterParameters & params;

View File

@ -85,6 +85,14 @@ bool MergeTreeIndexGranuleBloomFilter::empty() const
return !total_rows;
}
size_t MergeTreeIndexGranuleBloomFilter::memoryUsageBytes() const
{
size_t sum = 0;
for (const auto & bloom_filter : bloom_filters)
sum += bloom_filter->memoryUsageBytes();
return sum;
}
void MergeTreeIndexGranuleBloomFilter::deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version)
{
if (version != 1)

View File

@ -23,6 +23,8 @@ public:
bool empty() const override;
size_t memoryUsageBytes() const override;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) override;

View File

@ -66,6 +66,15 @@ void MergeTreeIndexGranuleBloomFilterText::deserializeBinary(ReadBuffer & istr,
}
size_t MergeTreeIndexGranuleBloomFilterText::memoryUsageBytes() const
{
size_t sum = 0;
for (const auto & bloom_filter : bloom_filters)
sum += bloom_filter.memoryUsageBytes();
return sum;
}
MergeTreeIndexAggregatorBloomFilterText::MergeTreeIndexAggregatorBloomFilterText(
const Names & index_columns_,
const String & index_name_,

View File

@ -25,6 +25,8 @@ struct MergeTreeIndexGranuleBloomFilterText final : public IMergeTreeIndexGranul
bool empty() const override { return !has_elems; }
size_t memoryUsageBytes() const override;
const String index_name;
const BloomFilterParameters params;

View File

@ -86,6 +86,15 @@ void MergeTreeIndexGranuleFullText::deserializeBinary(ReadBuffer & istr, MergeTr
}
size_t MergeTreeIndexGranuleFullText::memoryUsageBytes() const
{
size_t sum = 0;
for (const auto & gin_filter : gin_filters)
sum += gin_filter.memoryUsageBytes();
return sum;
}
MergeTreeIndexAggregatorFullText::MergeTreeIndexAggregatorFullText(
GinIndexStorePtr store_,
const Names & index_columns_,

View File

@ -23,6 +23,8 @@ struct MergeTreeIndexGranuleFullText final : public IMergeTreeIndexGranule
bool empty() const override { return !has_elems; }
size_t memoryUsageBytes() const override;
const String index_name;
const GinFilterParameters params;
GinFilters gin_filters;

View File

@ -23,6 +23,8 @@ public:
bool empty() const override { return is_empty; }
size_t memoryUsageBytes() const override { return sizeof(*this); }
~MergeTreeIndexGranuleHypothesis() override = default;
const String & index_name;

View File

@ -23,6 +23,8 @@ struct MergeTreeIndexGranuleMinMax final : public IMergeTreeIndexGranule
bool empty() const override { return hyperrectangle.empty(); }
size_t memoryUsageBytes() const override { return hyperrectangle.capacity() * sizeof(hyperrectangle[0]); }
const String index_name;
const Block index_sample_block;

View File

@ -31,6 +31,8 @@ struct MergeTreeIndexGranuleSet final : public IMergeTreeIndexGranule
size_t size() const { return block.rows(); }
bool empty() const override { return !size(); }
size_t memoryUsageBytes() const override { return block.bytes(); }
~MergeTreeIndexGranuleSet() override = default;
const String index_name;

View File

@ -162,6 +162,12 @@ String USearchIndexWithSerialization::Statistics::toString() const
max_level, connectivity, size, capacity, ReadableSize(memory_usage), bytes_per_vector, scalar_words, nodes, edges, max_edges);
}
size_t USearchIndexWithSerialization::memoryUsageBytes() const
{
return Base::memory_usage();
}
MergeTreeIndexGranuleVectorSimilarity::MergeTreeIndexGranuleVectorSimilarity(
const String & index_name_,
const Block & index_sample_block_,

View File

@ -53,6 +53,8 @@ public:
};
Statistics getStatistics() const;
size_t memoryUsageBytes() const;
};
using USearchIndexWithSerializationPtr = std::shared_ptr<USearchIndexWithSerialization>;
@ -82,6 +84,8 @@ struct MergeTreeIndexGranuleVectorSimilarity final : public IMergeTreeIndexGranu
bool empty() const override { return !index || index->size() == 0; }
size_t memoryUsageBytes() const override { return index->memoryUsageBytes(); }
const String index_name;
const Block index_sample_block;
const unum::usearch::metric_kind_t metric_kind;

View File

@ -60,6 +60,8 @@ struct IMergeTreeIndexGranule
virtual void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) = 0;
virtual bool empty() const = 0;
virtual size_t memoryUsageBytes() const = 0;
};
using MergeTreeIndexGranulePtr = std::shared_ptr<IMergeTreeIndexGranule>;

View File

@ -1,64 +1,68 @@
#pragma once
#include <Common/CacheBase.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
namespace ProfileEvents
{
extern const Event SecondaryIndexCacheMisses;
extern const Event SecondaryIndexCacheHits;
extern const Event SecondaryIndexCacheBytesEvicted;
extern const Event SkippingIndexCacheMisses;
extern const Event SkippingIndexCacheHits;
extern const Event SkippingIndexCacheBytesEvicted;
}
namespace CurrentMetrics
{
extern const Metric SecondaryIndexCacheSize;
extern const Metric SkippingIndexCacheSize;
}
namespace DB
{
struct SecondaryIndexCacheCell
struct SkippingIndexCacheCell
{
MergeTreeIndexGranulePtr granule;
size_t memory_bytes;
static constexpr size_t ENTRY_OVERHEAD_BYTES_GUESS = 200;
SecondaryIndexCacheCell(MergeTreeIndexGranulePtr g)
MergeTreeIndexGranulePtr granule;
size_t memory_bytes;
explicit SkippingIndexCacheCell(MergeTreeIndexGranulePtr g)
: granule(std::move(g))
, memory_bytes(0)
/// , memory_bytes(granule->memoryUsageBytes() + ENTRY_OVERHEAD_BYTES_GUESS)
, memory_bytes(0)
{
CurrentMetrics::add(CurrentMetrics::SecondaryIndexCacheSize, memory_bytes);
CurrentMetrics::add(CurrentMetrics::SkippingIndexCacheSize, memory_bytes);
}
~SecondaryIndexCacheCell()
~SkippingIndexCacheCell()
{
CurrentMetrics::sub(CurrentMetrics::SecondaryIndexCacheSize, memory_bytes);
CurrentMetrics::sub(CurrentMetrics::SkippingIndexCacheSize, memory_bytes);
}
SecondaryIndexCacheCell(const SecondaryIndexCacheCell &) = delete;
SecondaryIndexCacheCell & operator=(const SecondaryIndexCacheCell &) = delete;
SkippingIndexCacheCell(const SkippingIndexCacheCell &) = delete;
SkippingIndexCacheCell & operator=(const SkippingIndexCacheCell &) = delete;
};
struct SecondaryIndexCacheWeightFunction
struct SkippingIndexCacheWeightFunction
{
size_t operator()(const SecondaryIndexCacheCell & x) const
size_t operator()(const SkippingIndexCacheCell & cell) const
{
return x.memory_bytes;
return cell.memory_bytes;
}
};
/// Cache of deserialized index granules.
class SecondaryIndexCache : public CacheBase<UInt128, SecondaryIndexCacheCell, UInt128TrivialHash, SecondaryIndexCacheWeightFunction>
/// Cache of deserialized skipping index granules.
class SkippingIndexCache : public CacheBase<UInt128, SkippingIndexCacheCell, UInt128TrivialHash, SkippingIndexCacheWeightFunction>
{
public:
using Base = CacheBase<UInt128, SecondaryIndexCacheCell, UInt128TrivialHash, SecondaryIndexCacheWeightFunction>;
using Base = CacheBase<UInt128, SkippingIndexCacheCell, UInt128TrivialHash, SkippingIndexCacheWeightFunction>;
SecondaryIndexCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_count, double size_ratio)
: Base(cache_policy, max_size_in_bytes, max_count, size_ratio) {}
SkippingIndexCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_count, double size_ratio)
: Base(cache_policy, max_size_in_bytes, max_count, size_ratio)
{}
static UInt128 hash(const String & path_to_data_part, const String & index_name, size_t index_mark)
{
@ -73,16 +77,16 @@ public:
template <typename LoadFunc>
MergeTreeIndexGranulePtr getOrSet(const Key & key, LoadFunc && load)
{
auto wrapped_load = [&]() -> std::shared_ptr<SecondaryIndexCacheCell> {
auto wrapped_load = [&]() -> std::shared_ptr<SkippingIndexCacheCell> {
MergeTreeIndexGranulePtr granule = load();
return std::make_shared<SecondaryIndexCacheCell>(std::move(granule));
return std::make_shared<SkippingIndexCacheCell>(std::move(granule));
};
auto result = Base::getOrSet(key, wrapped_load);
auto result = Base::getOrSet(key, wrapped_load);
if (result.second)
ProfileEvents::increment(ProfileEvents::SecondaryIndexCacheMisses);
ProfileEvents::increment(ProfileEvents::SkippingIndexCacheMisses);
else
ProfileEvents::increment(ProfileEvents::SecondaryIndexCacheHits);
ProfileEvents::increment(ProfileEvents::SkippingIndexCacheHits);
return result.first->granule;
}
@ -90,10 +94,10 @@ public:
private:
void onRemoveOverflowWeightLoss(size_t weight_loss) override
{
ProfileEvents::increment(ProfileEvents::SecondaryIndexCacheBytesEvicted, weight_loss);
ProfileEvents::increment(ProfileEvents::SkippingIndexCacheBytesEvicted, weight_loss);
}
};
using SecondaryIndexCachePtr = std::shared_ptr<SecondaryIndexCache>;
using SkippingIndexCachePtr = std::shared_ptr<SkippingIndexCache>;
}