Merge pull request #27961 from amosbird/indexcache

Dedicated Mark/Uncompressed cache for skip indices
This commit is contained in:
Maksim Kita 2021-10-15 13:31:44 +03:00 committed by GitHub
commit 97fa53d6b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 149 additions and 3 deletions

View File

@ -544,6 +544,17 @@ void LocalServer::processConfig()
if (mark_cache_size)
global_context->setMarkCache(mark_cache_size);
/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);
/// Size of cache for index marks (index of MergeTree skip indices). It is necessary.
/// Specify default value for index_mark_cache_size explicitly!
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);
/// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
if (mmap_cache_size)

View File

@ -940,6 +940,17 @@ if (ThreadFuzzer::instance().isEffective())
}
global_context->setMarkCache(mark_cache_size);
/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);
/// Size of cache for index marks (index of MergeTree skip indices). It is necessary.
/// Specify default value for index_mark_cache_size explicitly!
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);
/// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
if (mmap_cache_size)

View File

@ -546,6 +546,22 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
}
}
{
if (auto index_mark_cache = getContext()->getIndexMarkCache())
{
new_values["IndexMarkCacheBytes"] = index_mark_cache->weight();
new_values["IndexMarkCacheFiles"] = index_mark_cache->count();
}
}
{
if (auto index_uncompressed_cache = getContext()->getIndexUncompressedCache())
{
new_values["IndexUncompressedCacheBytes"] = index_uncompressed_cache->weight();
new_values["IndexUncompressedCacheCells"] = index_uncompressed_cache->count();
}
}
{
if (auto mmap_cache = getContext()->getMMappedFileCache())
{

View File

@ -211,6 +211,8 @@ struct ContextSharedPart
std::unique_ptr<AccessControlManager> access_control_manager;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
ProcessList process_list; /// Executing queries at the moment.
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
@ -1573,6 +1575,56 @@ void Context::dropMarkCache() const
}
void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
{
auto lock = getLock();
if (shared->index_uncompressed_cache)
throw Exception("Index uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->index_uncompressed_cache = std::make_shared<UncompressedCache>(max_size_in_bytes);
}
UncompressedCachePtr Context::getIndexUncompressedCache() const
{
auto lock = getLock();
return shared->index_uncompressed_cache;
}
void Context::dropIndexUncompressedCache() const
{
auto lock = getLock();
if (shared->index_uncompressed_cache)
shared->index_uncompressed_cache->reset();
}
void Context::setIndexMarkCache(size_t cache_size_in_bytes)
{
auto lock = getLock();
if (shared->index_mark_cache)
throw Exception("Index mark cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->index_mark_cache = std::make_shared<MarkCache>(cache_size_in_bytes);
}
MarkCachePtr Context::getIndexMarkCache() const
{
auto lock = getLock();
return shared->index_mark_cache;
}
void Context::dropIndexMarkCache() const
{
auto lock = getLock();
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
}
void Context::setMMappedFileCache(size_t cache_size_in_num_entries)
{
auto lock = getLock();
@ -1607,6 +1659,12 @@ void Context::dropCaches() const
if (shared->mark_cache)
shared->mark_cache->reset();
if (shared->index_uncompressed_cache)
shared->index_uncompressed_cache->reset();
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
if (shared->mmap_cache)
shared->mmap_cache->reset();
}

View File

@ -696,6 +696,16 @@ public:
std::shared_ptr<MarkCache> getMarkCache() const;
void dropMarkCache() const;
/// Create a cache of index uncompressed blocks of specified size. This can be done only once.
void setIndexUncompressedCache(size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getIndexUncompressedCache() const;
void dropIndexUncompressedCache() const;
/// Create a cache of index marks of specified size. This can be done only once.
void setIndexMarkCache(size_t cache_size_in_bytes);
std::shared_ptr<MarkCache> getIndexMarkCache() const;
void dropIndexMarkCache() const;
/// Create a cache of mapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
void setMMappedFileCache(size_t cache_size_in_num_entries);
std::shared_ptr<MMappedFileCache> getMMappedFileCache() const;

View File

@ -280,6 +280,14 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->dropUncompressedCache();
break;
case Type::DROP_INDEX_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->dropIndexMarkCache();
break;
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->dropIndexUncompressedCache();
break;
case Type::DROP_MMAP_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE);
system_context->dropMMappedFileCache();
@ -746,6 +754,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_COMPILED_EXPRESSION_CACHE: [[fallthrough]];
#endif
case Type::DROP_UNCOMPRESSED_CACHE:
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
break;

View File

@ -24,6 +24,8 @@ public:
DROP_DNS_CACHE,
DROP_MARK_CACHE,
DROP_UNCOMPRESSED_CACHE,
DROP_INDEX_MARK_CACHE,
DROP_INDEX_UNCOMPRESSED_CACHE,
DROP_MMAP_CACHE,
#if USE_EMBEDDED_COMPILER
DROP_COMPILED_EXPRESSION_CACHE,

View File

@ -849,6 +849,9 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
if (settings.read_overflow_mode_leaf == OverflowMode::THROW && settings.max_rows_to_read_leaf)
leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, 0, settings.read_overflow_mode_leaf);
auto mark_cache = context->getIndexMarkCache();
auto uncompressed_cache = context->getIndexUncompressedCache();
auto process_part = [&](size_t part_index)
{
auto & part = parts[part_index];
@ -885,6 +888,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
reader_settings,
total_granules,
granules_dropped,
mark_cache.get(),
uncompressed_cache.get(),
log);
index_and_condition.total_granules.fetch_add(total_granules, std::memory_order_relaxed);
@ -1424,6 +1429,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
const MergeTreeReaderSettings & reader_settings,
size_t & total_granules,
size_t & granules_dropped,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
Poco::Logger * log)
{
const std::string & path_prefix = part->getFullRelativePath() + index_helper->getFileName();
@ -1449,6 +1456,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
index_helper, part,
index_marks_count,
ranges,
mark_cache,
uncompressed_cache,
reader_settings);
MarkRanges res;

View File

@ -90,6 +90,8 @@ private:
const MergeTreeReaderSettings & reader_settings,
size_t & total_granules,
size_t & granules_dropped,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
Poco::Logger * log);
struct PartFilterCounters

View File

@ -11,13 +11,15 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
MergeTreeData::DataPartPtr part,
size_t marks_count,
const MarkRanges & all_mark_ranges,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings)
{
return std::make_unique<MergeTreeReaderStream>(
part->volume->getDisk(),
part->getFullRelativePath() + index->getFileName(), extension, marks_count,
all_mark_ranges,
std::move(settings), nullptr, nullptr,
std::move(settings), mark_cache, uncompressed_cache,
part->getFileSizeOrZero(index->getFileName() + extension),
&part->index_granularity_info,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE);
@ -29,14 +31,27 @@ namespace DB
{
MergeTreeIndexReader::MergeTreeIndexReader(
MergeTreeIndexPtr index_, MergeTreeData::DataPartPtr part_, size_t marks_count_, const MarkRanges & all_mark_ranges_,
MergeTreeIndexPtr index_,
MergeTreeData::DataPartPtr part_,
size_t marks_count_,
const MarkRanges & all_mark_ranges_,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings)
: index(index_)
{
const std::string & path_prefix = part_->getFullRelativePath() + index->getFileName();
auto index_format = index->getDeserializedFormat(part_->volume->getDisk(), path_prefix);
stream = makeIndexReader(index_format.extension, index_, part_, marks_count_, all_mark_ranges_, std::move(settings));
stream = makeIndexReader(
index_format.extension,
index_,
part_,
marks_count_,
all_mark_ranges_,
mark_cache,
uncompressed_cache,
std::move(settings));
version = index_format.version;
stream->seekToStart();

View File

@ -16,6 +16,8 @@ public:
MergeTreeData::DataPartPtr part_,
size_t marks_count_,
const MarkRanges & all_mark_ranges_,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings);
~MergeTreeIndexReader();