This commit is contained in:
Robert Schulze 2024-08-27 17:29:12 -07:00 committed by GitHub
commit b34a261c14
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 258 additions and 1 deletions

View File

@ -727,6 +727,17 @@ void LocalServer::processConfig()
}
global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio);
String skipping_index_cache_policy = server_settings.skipping_index_cache_policy;
size_t skipping_index_cache_size = server_settings.skipping_index_cache_size;
size_t skipping_index_cache_max_count = server_settings.skipping_index_cache_max_entries;
double skipping_index_cache_size_ratio = server_settings.skipping_index_cache_size_ratio;
if (skipping_index_cache_size > max_cache_size)
{
skipping_index_cache_size = max_cache_size;
LOG_INFO(log, "Lowered skipping index cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(skipping_index_cache_size));
}
global_context->setSkippingIndexCache(skipping_index_cache_policy, skipping_index_cache_size, skipping_index_cache_max_count, skipping_index_cache_size_ratio);
size_t mmap_cache_size = server_settings.mmap_cache_size;
if (mmap_cache_size > max_cache_size)
{

View File

@ -1435,6 +1435,17 @@ try
}
global_context->setIndexMarkCache(index_mark_cache_policy, index_mark_cache_size, index_mark_cache_size_ratio);
String skipping_index_cache_policy = config().getString("skipping_index_cache_policy", DEFAULT_SKIPPING_INDEX_CACHE_POLICY);
size_t skipping_index_cache_size = config().getUInt64("skipping_index_cache_size", DEFAULT_SKIPPING_INDEX_CACHE_MAX_SIZE);
size_t skipping_index_cache_max_entries = config().getUInt64("skipping_index_cache_max_entries", DEFAULT_SKIPPING_INDEX_CACHE_MAX_ENTRIES);
double skipping_index_cache_size_ratio = config().getDouble("skipping_index_cache_size_ratio", DEFAULT_SKIPPING_INDEX_CACHE_SIZE_RATIO);
if (skipping_index_cache_size > max_cache_size)
{
skipping_index_cache_size = max_cache_size;
LOG_INFO(log, "Lowered skipping index cache size to {} because the system has limited RAM", formatReadableSizeWithBinarySuffix(skipping_index_cache_size));
}
global_context->setSkippingIndexCache(skipping_index_cache_policy, skipping_index_cache_size, skipping_index_cache_max_entries, skipping_index_cache_size_ratio);
size_t mmap_cache_size = server_settings.mmap_cache_size;
if (mmap_cache_size > max_cache_size)
{

View File

@ -161,6 +161,7 @@ enum class AccessType : uint8_t
M(SYSTEM_DROP_CONNECTIONS_CACHE, "SYSTEM DROP CONNECTIONS CACHE, DROP CONNECTIONS CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MARK_CACHE, "SYSTEM DROP MARK, DROP MARK CACHE, DROP MARKS", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_UNCOMPRESSED_CACHE, "SYSTEM DROP UNCOMPRESSED, DROP UNCOMPRESSED CACHE, DROP UNCOMPRESSED", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_SKIPPING_INDEX_CACHE, "SYSTEM DROP SKIPPING INDEX CACHE, DROP SKIPPING INDEX CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_MMAP_CACHE, "SYSTEM DROP MMAP, DROP MMAP CACHE, DROP MMAP", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_QUERY_CACHE, "SYSTEM DROP QUERY, DROP QUERY CACHE, DROP QUERY", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_COMPILED_EXPRESSION_CACHE, "SYSTEM DROP COMPILED EXPRESSION, DROP COMPILED EXPRESSION CACHE, DROP COMPILED EXPRESSIONS", GLOBAL, SYSTEM_DROP_CACHE) \

View File

@ -265,6 +265,7 @@
M(FilesystemCacheDelayedCleanupElements, "Filesystem cache elements in background cleanup queue") \
M(FilesystemCacheHoldFileSegments, "Filesystem cache file segment which are currently hold as unreleasable") \
M(AsyncInsertCacheSize, "Number of async insert hash id in cache") \
M(SkippingIndexCacheSize, "Size of the skipping index cache in bytes") \
M(S3Requests, "S3 requests count") \
M(KeeperAliveConnections, "Number of alive connections") \
M(KeeperOutstandingRequests, "Number of outstanding requests") \

View File

@ -63,6 +63,9 @@
M(TableFunctionExecute, "Number of table function calls.") \
M(MarkCacheHits, "Number of times an entry has been found in the mark cache, so we didn't have to load a mark file.") \
M(MarkCacheMisses, "Number of times an entry has not been found in the mark cache, so we had to load a mark file in memory, which is a costly operation, adding to query latency.") \
M(SkippingIndexCacheHits, "Number of times an index granule has been found in the skipping index cache.") \
M(SkippingIndexCacheMisses, "Number of times an index granule has not been found in the skipping index cache and had to be read from disk.") \
M(SkippingIndexCacheBytesEvicted, "Approximate total size (memory usage) of index granules evicted from the secondary index cache.") \
M(QueryCacheHits, "Number of times a query result has been found in the query cache (and query computation was avoided). Only updated for SELECT queries with SETTING use_query_cache = 1.") \
M(QueryCacheMisses, "Number of times a query result has not been found in the query cache (and required query computation). Only updated for SELECT queries with SETTING use_query_cache = 1.") \
/* Each page cache chunk access increments exactly one of the following 5 PageCacheChunk* counters. */ \

View File

@ -98,6 +98,10 @@ static constexpr auto DEFAULT_INDEX_UNCOMPRESSED_CACHE_SIZE_RATIO = 0.5;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_INDEX_MARK_CACHE_MAX_SIZE = 5_GiB;
static constexpr auto DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO = 0.3;
static constexpr auto DEFAULT_SKIPPING_INDEX_CACHE_POLICY = "SLRU";
static constexpr auto DEFAULT_SKIPPING_INDEX_CACHE_MAX_SIZE = 5_GiB;
static constexpr auto DEFAULT_SKIPPING_INDEX_CACHE_SIZE_RATIO = 0.5;
static constexpr auto DEFAULT_SKIPPING_INDEX_CACHE_MAX_ENTRIES = 10'000'000;
static constexpr auto DEFAULT_MMAP_CACHE_MAX_SIZE = 1_KiB; /// chosen by rolling dice
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_SIZE = 128_MiB;
static constexpr auto DEFAULT_COMPILED_EXPRESSION_CACHE_MAX_ENTRIES = 10'000;

View File

@ -93,6 +93,10 @@ namespace DB
M(String, index_mark_cache_policy, DEFAULT_INDEX_MARK_CACHE_POLICY, "Secondary index mark cache policy name.", 0) \
M(UInt64, index_mark_cache_size, DEFAULT_INDEX_MARK_CACHE_MAX_SIZE, "Size of cache for secondary index marks. Zero means disabled.", 0) \
M(Double, index_mark_cache_size_ratio, DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO, "The size of the protected queue in the secondary index mark cache relative to the cache's total size.", 0) \
M(String, skipping_index_cache_policy, DEFAULT_SKIPPING_INDEX_CACHE_POLICY, "Skipping index cache policy name.", 0) \
M(UInt64, skipping_index_cache_size, DEFAULT_SKIPPING_INDEX_CACHE_MAX_SIZE, "Size of the cache for skipping index granules (in bytes). Zero means disabled.", 0) \
M(Double, skipping_index_cache_size_ratio, DEFAULT_SKIPPING_INDEX_CACHE_SIZE_RATIO, "The size of the protected queue in the skipping index cache relative to the cache's total size.", 0) \
M(UInt64, skipping_index_cache_max_entries, DEFAULT_SKIPPING_INDEX_CACHE_MAX_ENTRIES, "The maximum number of entries in the skipping index cache.", 0) \
M(UInt64, page_cache_chunk_size, 2 << 20, "Bytes per chunk in userspace page cache. Rounded up to a multiple of page size (typically 4 KiB) or huge page size (typically 2 MiB, only if page_cache_use_thp is enabled).", 0) \
M(UInt64, page_cache_mmap_size, 1 << 30, "Bytes per memory mapping in userspace page cache. Not important.", 0) \
M(UInt64, page_cache_size, 0, "Amount of virtual memory to map for userspace page cache. If page_cache_use_madv_free is enabled, it's recommended to set this higher than the machine's RAM size. Use 0 to disable userspace page cache.", 0) \

View File

@ -101,6 +101,11 @@ UInt64 BloomFilter::isEmpty() const
return true;
}
size_t BloomFilter::memoryUsageBytes() const
{
return filter.capacity() * sizeof(filter[0]);
}
bool operator== (const BloomFilter & a, const BloomFilter & b)
{
for (size_t i = 0; i < a.words; ++i)

View File

@ -55,6 +55,8 @@ public:
/// For debug.
UInt64 isEmpty() const;
size_t memoryUsageBytes() const;
friend bool operator== (const BloomFilter & a, const BloomFilter & b);
private:

View File

@ -31,6 +31,7 @@
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/SkippingIndexCache.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/CompressionCodecSelector.h>
#include <IO/S3Settings.h>
@ -297,6 +298,7 @@ struct ContextSharedPart : boost::noncopyable
mutable OnceFlag prefetch_threadpool_initialized;
mutable std::unique_ptr<ThreadPool> prefetch_threadpool; /// Threadpool for loading marks cache.
mutable UncompressedCachePtr index_uncompressed_cache TSA_GUARDED_BY(mutex); /// The cache of decompressed blocks for MergeTree indices.
mutable SkippingIndexCachePtr skipping_index_cache TSA_GUARDED_BY(mutex); /// Cache of deserialized secondary index granules.
mutable QueryCachePtr query_cache TSA_GUARDED_BY(mutex); /// Cache of query results.
mutable MarkCachePtr index_mark_cache TSA_GUARDED_BY(mutex); /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache TSA_GUARDED_BY(mutex); /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
@ -3200,6 +3202,41 @@ void Context::clearMMappedFileCache() const
shared->mmap_cache->clear();
}
void Context::setSkippingIndexCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_count, double size_ratio)
{
std::lock_guard lock(shared->mutex);
if (shared->skipping_index_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Skipping index cache has been already created.");
shared->skipping_index_cache = std::make_shared<SkippingIndexCache>(cache_policy, max_size_in_bytes, max_count, size_ratio);
}
void Context::updateSkippingIndexCacheConfiguration(const Poco::Util::AbstractConfiguration & config)
{
std::lock_guard lock(shared->mutex);
if (!shared->skipping_index_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Skipping index cache was not created yet.");
size_t max_size_in_bytes = config.getUInt64("skipping_index_cache_size", DEFAULT_SKIPPING_INDEX_CACHE_MAX_SIZE);
shared->skipping_index_cache->setMaxSizeInBytes(max_size_in_bytes);
}
SkippingIndexCachePtr Context::getSkippingIndexCache() const
{
SharedLockGuard lock(shared->mutex);
return shared->skipping_index_cache;
}
void Context::clearSkippingIndexCache() const
{
std::lock_guard lock(shared->mutex);
if (shared->skipping_index_cache)
shared->skipping_index_cache->clear();
}
void Context::setQueryCache(size_t max_size_in_bytes, size_t max_entries, size_t max_entry_size_in_bytes, size_t max_entry_size_in_rows)
{
std::lock_guard lock(shared->mutex);

View File

@ -85,6 +85,7 @@ class MarkCache;
class PageCache;
class MMappedFileCache;
class UncompressedCache;
class SkippingIndexCache;
class ProcessList;
class QueryStatus;
using QueryStatusPtr = std::shared_ptr<QueryStatus>;
@ -1066,6 +1067,11 @@ public:
std::shared_ptr<MarkCache> getIndexMarkCache() const;
void clearIndexMarkCache() const;
void setSkippingIndexCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_entries, double size_ratio);
void updateSkippingIndexCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<SkippingIndexCache> getSkippingIndexCache() const;
void clearSkippingIndexCache() const;
void setMMappedFileCache(size_t max_cache_size_in_num_entries);
void updateMMappedFileCacheConfiguration(const Poco::Util::AbstractConfiguration & config);
std::shared_ptr<MMappedFileCache> getMMappedFileCache() const;

View File

@ -77,6 +77,8 @@ public:
const GinSegmentWithRowIdRangeVector & getFilter() const { return rowid_ranges; }
GinSegmentWithRowIdRangeVector & getFilter() { return rowid_ranges; }
size_t memoryUsageBytes() const { return rowid_ranges.capacity() * sizeof(rowid_ranges[0]); }
private:
/// Filter parameters
const GinFilterParameters & params;

View File

@ -364,6 +364,10 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->clearIndexUncompressedCache();
break;
case Type::DROP_SKIPPING_INDEX_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_SKIPPING_INDEX_CACHE);
system_context->clearSkippingIndexCache();
break;
case Type::DROP_MMAP_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE);
system_context->clearMMappedFileCache();
@ -1288,6 +1292,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_UNCOMPRESSED_CACHE:
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
case Type::DROP_SKIPPING_INDEX_CACHE:
case Type::DROP_FILESYSTEM_CACHE:
case Type::SYNC_FILESYSTEM_CACHE:
case Type::DROP_PAGE_CACHE:

View File

@ -410,6 +410,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_UNCOMPRESSED_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
case Type::DROP_SKIPPING_INDEX_CACHE:
case Type::DROP_COMPILED_EXPRESSION_CACHE:
case Type::DROP_S3_CLIENT_CACHE:
case Type::RESET_COVERAGE:

View File

@ -27,6 +27,7 @@ public:
DROP_UNCOMPRESSED_CACHE,
DROP_INDEX_MARK_CACHE,
DROP_INDEX_UNCOMPRESSED_CACHE,
DROP_SKIPPING_INDEX_CACHE,
DROP_MMAP_CACHE,
DROP_QUERY_CACHE,
DROP_COMPILED_EXPRESSION_CACHE,

View File

@ -658,6 +658,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
{
auto mark_cache = context->getIndexMarkCache();
auto uncompressed_cache = context->getIndexUncompressedCache();
auto skipping_index_cache = context->getSkippingIndexCache();
auto process_part = [&](size_t part_index)
{
@ -713,6 +714,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
reader_settings,
mark_cache.get(),
uncompressed_cache.get(),
skipping_index_cache.get(),
log);
stat.granules_dropped.fetch_add(total_granules - ranges.ranges.getNumberOfMarks(), std::memory_order_relaxed);
@ -734,7 +736,7 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
indices_and_condition.indices, indices_and_condition.condition,
part, ranges.ranges,
settings, reader_settings,
mark_cache.get(), uncompressed_cache.get(), log);
mark_cache.get(), uncompressed_cache.get(), skipping_index_cache.get(), log);
stat.total_granules.fetch_add(total_granules, std::memory_order_relaxed);
stat.granules_dropped.fetch_add(total_granules - ranges.ranges.getNumberOfMarks(), std::memory_order_relaxed);
@ -1352,6 +1354,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
const MergeTreeReaderSettings & reader_settings,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
SkippingIndexCache * skipping_index_cache,
LoggerPtr log)
{
if (!index_helper->getDeserializedFormat(part->getDataPartStorage(), index_helper->getFileName()))
@ -1388,6 +1391,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
index_ranges,
mark_cache,
uncompressed_cache,
skipping_index_cache,
reader_settings);
MarkRanges res;
@ -1468,6 +1472,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingMergedIndex(
const MergeTreeReaderSettings & reader_settings,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
SkippingIndexCache * skipping_index_cache,
LoggerPtr log)
{
for (const auto & index_helper : indices)
@ -1502,6 +1507,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingMergedIndex(
ranges,
mark_cache,
uncompressed_cache,
skipping_index_cache,
reader_settings));
}

View File

@ -93,6 +93,7 @@ private:
const MergeTreeReaderSettings & reader_settings,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
SkippingIndexCache * skipping_index_cache,
LoggerPtr log);
static MarkRanges filterMarksUsingMergedIndex(
@ -104,6 +105,7 @@ private:
const MergeTreeReaderSettings & reader_settings,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
SkippingIndexCache * skipping_index_cache,
LoggerPtr log);
struct PartFilterCounters

View File

@ -85,6 +85,14 @@ bool MergeTreeIndexGranuleBloomFilter::empty() const
return !total_rows;
}
size_t MergeTreeIndexGranuleBloomFilter::memoryUsageBytes() const
{
size_t sum = 0;
for (const auto & bloom_filter : bloom_filters)
sum += bloom_filter->memoryUsageBytes();
return sum;
}
void MergeTreeIndexGranuleBloomFilter::deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version)
{
if (version != 1)

View File

@ -23,6 +23,8 @@ public:
bool empty() const override;
size_t memoryUsageBytes() const override;
void serializeBinary(WriteBuffer & ostr) const override;
void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) override;

View File

@ -66,6 +66,15 @@ void MergeTreeIndexGranuleBloomFilterText::deserializeBinary(ReadBuffer & istr,
}
size_t MergeTreeIndexGranuleBloomFilterText::memoryUsageBytes() const
{
size_t sum = 0;
for (const auto & bloom_filter : bloom_filters)
sum += bloom_filter.memoryUsageBytes();
return sum;
}
MergeTreeIndexAggregatorBloomFilterText::MergeTreeIndexAggregatorBloomFilterText(
const Names & index_columns_,
const String & index_name_,

View File

@ -25,6 +25,8 @@ struct MergeTreeIndexGranuleBloomFilterText final : public IMergeTreeIndexGranul
bool empty() const override { return !has_elems; }
size_t memoryUsageBytes() const override;
const String index_name;
const BloomFilterParameters params;

View File

@ -86,6 +86,15 @@ void MergeTreeIndexGranuleFullText::deserializeBinary(ReadBuffer & istr, MergeTr
}
size_t MergeTreeIndexGranuleFullText::memoryUsageBytes() const
{
size_t sum = 0;
for (const auto & gin_filter : gin_filters)
sum += gin_filter.memoryUsageBytes();
return sum;
}
MergeTreeIndexAggregatorFullText::MergeTreeIndexAggregatorFullText(
GinIndexStorePtr store_,
const Names & index_columns_,

View File

@ -23,6 +23,8 @@ struct MergeTreeIndexGranuleFullText final : public IMergeTreeIndexGranule
bool empty() const override { return !has_elems; }
size_t memoryUsageBytes() const override;
const String index_name;
const GinFilterParameters params;
GinFilters gin_filters;

View File

@ -23,6 +23,8 @@ public:
bool empty() const override { return is_empty; }
size_t memoryUsageBytes() const override { return sizeof(*this); }
~MergeTreeIndexGranuleHypothesis() override = default;
const String & index_name;

View File

@ -23,6 +23,8 @@ struct MergeTreeIndexGranuleMinMax final : public IMergeTreeIndexGranule
bool empty() const override { return hyperrectangle.empty(); }
size_t memoryUsageBytes() const override { return hyperrectangle.capacity() * sizeof(hyperrectangle[0]); }
const String index_name;
const Block index_sample_block;

View File

@ -18,6 +18,7 @@ public:
const MarkRanges & all_mark_ranges_,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
SkippingIndexCache * skipping_index_cache,
MergeTreeReaderSettings settings);
~MergeTreeIndexReader();

View File

@ -31,6 +31,8 @@ struct MergeTreeIndexGranuleSet final : public IMergeTreeIndexGranule
size_t size() const { return block.rows(); }
bool empty() const override { return !size(); }
size_t memoryUsageBytes() const override { return block.bytes(); }
~MergeTreeIndexGranuleSet() override = default;
const String index_name;

View File

@ -162,6 +162,12 @@ String USearchIndexWithSerialization::Statistics::toString() const
max_level, connectivity, size, capacity, ReadableSize(memory_usage), bytes_per_vector, scalar_words, nodes, edges, max_edges);
}
size_t USearchIndexWithSerialization::memoryUsageBytes() const
{
return Base::memory_usage();
}
MergeTreeIndexGranuleVectorSimilarity::MergeTreeIndexGranuleVectorSimilarity(
const String & index_name_,
const Block & index_sample_block_,

View File

@ -53,6 +53,8 @@ public:
};
Statistics getStatistics() const;
size_t memoryUsageBytes() const;
};
using USearchIndexWithSerializationPtr = std::shared_ptr<USearchIndexWithSerialization>;
@ -82,6 +84,8 @@ struct MergeTreeIndexGranuleVectorSimilarity final : public IMergeTreeIndexGranu
bool empty() const override { return !index || index->size() == 0; }
size_t memoryUsageBytes() const override { return index->memoryUsageBytes(); }
const String index_name;
const Block index_sample_block;
const unum::usearch::metric_kind_t metric_kind;

View File

@ -60,6 +60,9 @@ struct IMergeTreeIndexGranule
virtual void deserializeBinary(ReadBuffer & istr, MergeTreeIndexVersion version) = 0;
virtual bool empty() const = 0;
/// The in-memory size of the granule. Not expected to be 100% accurate.
virtual size_t memoryUsageBytes() const = 0;
};
using MergeTreeIndexGranulePtr = std::shared_ptr<IMergeTreeIndexGranule>;

View File

@ -0,0 +1,103 @@
#pragma once
#include <Common/CacheBase.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
namespace ProfileEvents
{
extern const Event SkippingIndexCacheMisses;
extern const Event SkippingIndexCacheHits;
extern const Event SkippingIndexCacheBytesEvicted;
}
namespace CurrentMetrics
{
extern const Metric SkippingIndexCacheSize;
}
namespace DB
{
struct SkippingIndexCacheCell
{
static constexpr size_t ENTRY_OVERHEAD_BYTES_GUESS = 200;
MergeTreeIndexGranulePtr granule;
size_t memory_bytes;
explicit SkippingIndexCacheCell(MergeTreeIndexGranulePtr g)
: granule(std::move(g))
/// , memory_bytes(granule->memoryUsageBytes() + ENTRY_OVERHEAD_BYTES_GUESS)
, memory_bytes(0)
{
CurrentMetrics::add(CurrentMetrics::SkippingIndexCacheSize, memory_bytes);
}
~SkippingIndexCacheCell()
{
CurrentMetrics::sub(CurrentMetrics::SkippingIndexCacheSize, memory_bytes);
}
SkippingIndexCacheCell(const SkippingIndexCacheCell &) = delete;
SkippingIndexCacheCell & operator=(const SkippingIndexCacheCell &) = delete;
};
struct SkippingIndexCacheWeightFunction
{
size_t operator()(const SkippingIndexCacheCell & cell) const
{
return cell.memory_bytes;
}
};
/// Cache of deserialized skipping index granules.
class SkippingIndexCache : public CacheBase<UInt128, SkippingIndexCacheCell, UInt128TrivialHash, SkippingIndexCacheWeightFunction>
{
public:
using Base = CacheBase<UInt128, SkippingIndexCacheCell, UInt128TrivialHash, SkippingIndexCacheWeightFunction>;
SkippingIndexCache(const String & cache_policy, size_t max_size_in_bytes, size_t max_count, double size_ratio)
: Base(cache_policy, max_size_in_bytes, max_count, size_ratio)
{}
static UInt128 hash(const String & path_to_data_part, const String & index_name, size_t index_mark)
{
SipHash hash;
hash.update(path_to_data_part.data(), path_to_data_part.size() + 1);
hash.update(index_name.data(), index_name.size() + 1);
hash.update(index_mark);
return hash.get128();
}
/// LoadFunc should have signature () -> MergeTreeIndexGranulePtr.
template <typename LoadFunc>
MergeTreeIndexGranulePtr getOrSet(const Key & key, LoadFunc && load)
{
auto wrapped_load = [&]() -> std::shared_ptr<SkippingIndexCacheCell> {
MergeTreeIndexGranulePtr granule = load();
return std::make_shared<SkippingIndexCacheCell>(std::move(granule));
};
auto result = Base::getOrSet(key, wrapped_load);
if (result.second)
ProfileEvents::increment(ProfileEvents::SkippingIndexCacheMisses);
else
ProfileEvents::increment(ProfileEvents::SkippingIndexCacheHits);
return result.first->granule;
}
private:
void onRemoveOverflowWeightLoss(size_t weight_loss) override
{
ProfileEvents::increment(ProfileEvents::SkippingIndexCacheBytesEvicted, weight_loss);
}
};
using SkippingIndexCachePtr = std::shared_ptr<SkippingIndexCache>;
}