diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 957bda4d75c..5e9b1570b66 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -262,6 +262,17 @@ try if (mark_cache_size) global_context->setMarkCache(mark_cache_size); + /// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled. + size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0); + if (index_uncompressed_cache_size) + global_context->setIndexUncompressedCache(index_uncompressed_cache_size); + + /// Size of cache for index marks (index of MergeTree skip indices). It is necessary. + /// Specify default value for index_mark_cache_size explicitly! + size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0); + if (index_mark_cache_size) + global_context->setIndexMarkCache(index_mark_cache_size); + /// A cache for mmapped files. size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary. if (mmap_cache_size) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 7c1527bf4b0..55283e7cf46 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -942,6 +942,17 @@ if (ThreadFuzzer::instance().isEffective()) } global_context->setMarkCache(mark_cache_size); + /// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled. + size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0); + if (index_uncompressed_cache_size) + global_context->setIndexUncompressedCache(index_uncompressed_cache_size); + + /// Size of cache for index marks (index of MergeTree skip indices). It is necessary. + /// Specify default value for index_mark_cache_size explicitly! + size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0); + if (index_mark_cache_size) + global_context->setIndexMarkCache(index_mark_cache_size); + /// A cache for mmapped files. size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary. if (mmap_cache_size) diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index fd02aa4abec..19c45257846 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -546,6 +546,22 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti } } + { + if (auto index_mark_cache = getContext()->getIndexMarkCache()) + { + new_values["IndexMarkCacheBytes"] = index_mark_cache->weight(); + new_values["IndexMarkCacheFiles"] = index_mark_cache->count(); + } + } + + { + if (auto index_uncompressed_cache = getContext()->getIndexUncompressedCache()) + { + new_values["IndexUncompressedCacheBytes"] = index_uncompressed_cache->weight(); + new_values["IndexUncompressedCacheCells"] = index_uncompressed_cache->count(); + } + } + { if (auto mmap_cache = getContext()->getMMappedFileCache()) { diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 583f59c20b5..e11d5eb1c14 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -184,6 +184,8 @@ struct ContextSharedPart std::unique_ptr access_control_manager; mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks. mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files. + mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices. + mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices. mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads. ProcessList process_list; /// Executing queries at the moment. MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree) @@ -1420,6 +1422,56 @@ void Context::dropMarkCache() const } +void Context::setIndexUncompressedCache(size_t max_size_in_bytes) +{ + auto lock = getLock(); + + if (shared->index_uncompressed_cache) + throw Exception("Index uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR); + + shared->index_uncompressed_cache = std::make_shared(max_size_in_bytes); +} + + +UncompressedCachePtr Context::getIndexUncompressedCache() const +{ + auto lock = getLock(); + return shared->index_uncompressed_cache; +} + + +void Context::dropIndexUncompressedCache() const +{ + auto lock = getLock(); + if (shared->index_uncompressed_cache) + shared->index_uncompressed_cache->reset(); +} + + +void Context::setIndexMarkCache(size_t cache_size_in_bytes) +{ + auto lock = getLock(); + + if (shared->index_mark_cache) + throw Exception("Index mark cache has been already created.", ErrorCodes::LOGICAL_ERROR); + + shared->index_mark_cache = std::make_shared(cache_size_in_bytes); +} + +MarkCachePtr Context::getIndexMarkCache() const +{ + auto lock = getLock(); + return shared->index_mark_cache; +} + +void Context::dropIndexMarkCache() const +{ + auto lock = getLock(); + if (shared->index_mark_cache) + shared->index_mark_cache->reset(); +} + + void Context::setMMappedFileCache(size_t cache_size_in_num_entries) { auto lock = getLock(); @@ -1454,6 +1506,12 @@ void Context::dropCaches() const if (shared->mark_cache) shared->mark_cache->reset(); + if (shared->index_uncompressed_cache) + shared->index_uncompressed_cache->reset(); + + if (shared->index_mark_cache) + shared->index_mark_cache->reset(); + if (shared->mmap_cache) shared->mmap_cache->reset(); } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 1b636deb532..9928022b841 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -667,6 +667,16 @@ public: std::shared_ptr getMarkCache() const; void dropMarkCache() const; + /// Create a cache of index uncompressed blocks of specified size. This can be done only once. + void setIndexUncompressedCache(size_t max_size_in_bytes); + std::shared_ptr getIndexUncompressedCache() const; + void dropIndexUncompressedCache() const; + + /// Create a cache of index marks of specified size. This can be done only once. + void setIndexMarkCache(size_t cache_size_in_bytes); + std::shared_ptr getIndexMarkCache() const; + void dropIndexMarkCache() const; + /// Create a cache of mapped files to avoid frequent open/map/unmap/close and to reuse from several threads. void setMMappedFileCache(size_t cache_size_in_num_entries); std::shared_ptr getMMappedFileCache() const; diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index d4ac555add0..03a23e9eb35 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -277,6 +277,14 @@ BlockIO InterpreterSystemQuery::execute() getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE); system_context->dropUncompressedCache(); break; + case Type::DROP_INDEX_MARK_CACHE: + getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE); + system_context->dropIndexMarkCache(); + break; + case Type::DROP_INDEX_UNCOMPRESSED_CACHE: + getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE); + system_context->dropIndexUncompressedCache(); + break; case Type::DROP_MMAP_CACHE: getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE); system_context->dropMMappedFileCache(); @@ -721,6 +729,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster() case Type::DROP_COMPILED_EXPRESSION_CACHE: [[fallthrough]]; #endif case Type::DROP_UNCOMPRESSED_CACHE: + case Type::DROP_INDEX_MARK_CACHE: + case Type::DROP_INDEX_UNCOMPRESSED_CACHE: { required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE); break; diff --git a/src/Parsers/ASTSystemQuery.cpp b/src/Parsers/ASTSystemQuery.cpp index 5d01e124b0e..0f52df2f4ae 100644 --- a/src/Parsers/ASTSystemQuery.cpp +++ b/src/Parsers/ASTSystemQuery.cpp @@ -30,6 +30,10 @@ const char * ASTSystemQuery::typeToString(Type type) return "DROP MARK CACHE"; case Type::DROP_UNCOMPRESSED_CACHE: return "DROP UNCOMPRESSED CACHE"; + case Type::DROP_INDEX_MARK_CACHE: + return "DROP INDEX MARK CACHE"; + case Type::DROP_INDEX_UNCOMPRESSED_CACHE: + return "DROP INDEX UNCOMPRESSED CACHE"; case Type::DROP_MMAP_CACHE: return "DROP MMAP CACHE"; #if USE_EMBEDDED_COMPILER diff --git a/src/Parsers/ASTSystemQuery.h b/src/Parsers/ASTSystemQuery.h index fa7b6ece59a..c88fd4bf7ad 100644 --- a/src/Parsers/ASTSystemQuery.h +++ b/src/Parsers/ASTSystemQuery.h @@ -24,6 +24,8 @@ public: DROP_DNS_CACHE, DROP_MARK_CACHE, DROP_UNCOMPRESSED_CACHE, + DROP_INDEX_MARK_CACHE, + DROP_INDEX_UNCOMPRESSED_CACHE, DROP_MMAP_CACHE, #if USE_EMBEDDED_COMPILER DROP_COMPILED_EXPRESSION_CACHE, diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index f5c1890154a..b36b64d5651 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -834,6 +834,9 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd if (settings.read_overflow_mode_leaf == OverflowMode::THROW && settings.max_rows_to_read_leaf) leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, 0, settings.read_overflow_mode_leaf); + auto mark_cache = context->getIndexMarkCache(); + auto uncompressed_cache = context->getIndexUncompressedCache(); + auto process_part = [&](size_t part_index) { auto & part = parts[part_index]; @@ -870,6 +873,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd reader_settings, total_granules, granules_dropped, + mark_cache.get(), + uncompressed_cache.get(), log); index_and_condition.total_granules.fetch_add(total_granules, std::memory_order_relaxed); @@ -1408,6 +1413,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( const MergeTreeReaderSettings & reader_settings, size_t & total_granules, size_t & granules_dropped, + MarkCache * mark_cache, + UncompressedCache * uncompressed_cache, Poco::Logger * log) { const std::string & path_prefix = part->getFullRelativePath() + index_helper->getFileName(); @@ -1433,6 +1440,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( index_helper, part, index_marks_count, ranges, + mark_cache, + uncompressed_cache, reader_settings); MarkRanges res; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 92c4382dc90..b114f498bf0 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -90,6 +90,8 @@ private: const MergeTreeReaderSettings & reader_settings, size_t & total_granules, size_t & granules_dropped, + MarkCache * mark_cache, + UncompressedCache * uncompressed_cache, Poco::Logger * log); struct PartFilterCounters diff --git a/src/Storages/MergeTree/MergeTreeIndexReader.cpp b/src/Storages/MergeTree/MergeTreeIndexReader.cpp index 0a0f2511914..fd7358967f3 100644 --- a/src/Storages/MergeTree/MergeTreeIndexReader.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexReader.cpp @@ -11,13 +11,15 @@ std::unique_ptr makeIndexReader( MergeTreeData::DataPartPtr part, size_t marks_count, const MarkRanges & all_mark_ranges, + MarkCache * mark_cache, + UncompressedCache * uncompressed_cache, MergeTreeReaderSettings settings) { return std::make_unique( part->volume->getDisk(), part->getFullRelativePath() + index->getFileName(), extension, marks_count, all_mark_ranges, - std::move(settings), nullptr, nullptr, + std::move(settings), mark_cache, uncompressed_cache, part->getFileSizeOrZero(index->getFileName() + extension), &part->index_granularity_info, ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE); @@ -29,14 +31,27 @@ namespace DB { MergeTreeIndexReader::MergeTreeIndexReader( - MergeTreeIndexPtr index_, MergeTreeData::DataPartPtr part_, size_t marks_count_, const MarkRanges & all_mark_ranges_, + MergeTreeIndexPtr index_, + MergeTreeData::DataPartPtr part_, + size_t marks_count_, + const MarkRanges & all_mark_ranges_, + MarkCache * mark_cache, + UncompressedCache * uncompressed_cache, MergeTreeReaderSettings settings) : index(index_) { const std::string & path_prefix = part_->getFullRelativePath() + index->getFileName(); auto index_format = index->getDeserializedFormat(part_->volume->getDisk(), path_prefix); - stream = makeIndexReader(index_format.extension, index_, part_, marks_count_, all_mark_ranges_, std::move(settings)); + stream = makeIndexReader( + index_format.extension, + index_, + part_, + marks_count_, + all_mark_ranges_, + mark_cache, + uncompressed_cache, + std::move(settings)); version = index_format.version; stream->seekToStart(); diff --git a/src/Storages/MergeTree/MergeTreeIndexReader.h b/src/Storages/MergeTree/MergeTreeIndexReader.h index 4facd43c175..799dae154bf 100644 --- a/src/Storages/MergeTree/MergeTreeIndexReader.h +++ b/src/Storages/MergeTree/MergeTreeIndexReader.h @@ -16,6 +16,8 @@ public: MergeTreeData::DataPartPtr part_, size_t marks_count_, const MarkRanges & all_mark_ranges_, + MarkCache * mark_cache, + UncompressedCache * uncompressed_cache, MergeTreeReaderSettings settings); ~MergeTreeIndexReader();