Dedicated Mark/Uncompressed cache for skip indices

This commit is contained in:
Amos Bird 2021-08-22 01:27:22 +08:00
parent cfa571cac4
commit a2256b1307
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
12 changed files with 153 additions and 3 deletions

View File

@ -262,6 +262,17 @@ try
if (mark_cache_size) if (mark_cache_size)
global_context->setMarkCache(mark_cache_size); global_context->setMarkCache(mark_cache_size);
/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);
/// Size of cache for index marks (index of MergeTree skip indices). It is necessary.
/// Specify default value for index_mark_cache_size explicitly!
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);
/// A cache for mmapped files. /// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary. size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
if (mmap_cache_size) if (mmap_cache_size)

View File

@ -942,6 +942,17 @@ if (ThreadFuzzer::instance().isEffective())
} }
global_context->setMarkCache(mark_cache_size); global_context->setMarkCache(mark_cache_size);
/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);
if (index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(index_uncompressed_cache_size);
/// Size of cache for index marks (index of MergeTree skip indices). It is necessary.
/// Specify default value for index_mark_cache_size explicitly!
size_t index_mark_cache_size = config().getUInt64("index_mark_cache_size", 0);
if (index_mark_cache_size)
global_context->setIndexMarkCache(index_mark_cache_size);
/// A cache for mmapped files. /// A cache for mmapped files.
size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary. size_t mmap_cache_size = config().getUInt64("mmap_cache_size", 1000); /// The choice of default is arbitrary.
if (mmap_cache_size) if (mmap_cache_size)

View File

@ -546,6 +546,22 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
} }
} }
{
if (auto index_mark_cache = getContext()->getIndexMarkCache())
{
new_values["IndexMarkCacheBytes"] = index_mark_cache->weight();
new_values["IndexMarkCacheFiles"] = index_mark_cache->count();
}
}
{
if (auto index_uncompressed_cache = getContext()->getIndexUncompressedCache())
{
new_values["IndexUncompressedCacheBytes"] = index_uncompressed_cache->weight();
new_values["IndexUncompressedCacheCells"] = index_uncompressed_cache->count();
}
}
{ {
if (auto mmap_cache = getContext()->getMMappedFileCache()) if (auto mmap_cache = getContext()->getMMappedFileCache())
{ {

View File

@ -184,6 +184,8 @@ struct ContextSharedPart
std::unique_ptr<AccessControlManager> access_control_manager; std::unique_ptr<AccessControlManager> access_control_manager;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks. mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files. mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable UncompressedCachePtr index_uncompressed_cache; /// The cache of decompressed blocks for MergeTree indices.
mutable MarkCachePtr index_mark_cache; /// Cache of marks in compressed files of MergeTree indices.
mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads. mutable MMappedFileCachePtr mmap_cache; /// Cache of mmapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
ProcessList process_list; /// Executing queries at the moment. ProcessList process_list; /// Executing queries at the moment.
MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree) MergeList merge_list; /// The list of executable merge (for (Replicated)?MergeTree)
@ -1420,6 +1422,56 @@ void Context::dropMarkCache() const
} }
void Context::setIndexUncompressedCache(size_t max_size_in_bytes)
{
auto lock = getLock();
if (shared->index_uncompressed_cache)
throw Exception("Index uncompressed cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->index_uncompressed_cache = std::make_shared<UncompressedCache>(max_size_in_bytes);
}
UncompressedCachePtr Context::getIndexUncompressedCache() const
{
auto lock = getLock();
return shared->index_uncompressed_cache;
}
void Context::dropIndexUncompressedCache() const
{
auto lock = getLock();
if (shared->index_uncompressed_cache)
shared->index_uncompressed_cache->reset();
}
void Context::setIndexMarkCache(size_t cache_size_in_bytes)
{
auto lock = getLock();
if (shared->index_mark_cache)
throw Exception("Index mark cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->index_mark_cache = std::make_shared<MarkCache>(cache_size_in_bytes);
}
MarkCachePtr Context::getIndexMarkCache() const
{
auto lock = getLock();
return shared->index_mark_cache;
}
void Context::dropIndexMarkCache() const
{
auto lock = getLock();
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
}
void Context::setMMappedFileCache(size_t cache_size_in_num_entries) void Context::setMMappedFileCache(size_t cache_size_in_num_entries)
{ {
auto lock = getLock(); auto lock = getLock();
@ -1454,6 +1506,12 @@ void Context::dropCaches() const
if (shared->mark_cache) if (shared->mark_cache)
shared->mark_cache->reset(); shared->mark_cache->reset();
if (shared->index_uncompressed_cache)
shared->index_uncompressed_cache->reset();
if (shared->index_mark_cache)
shared->index_mark_cache->reset();
if (shared->mmap_cache) if (shared->mmap_cache)
shared->mmap_cache->reset(); shared->mmap_cache->reset();
} }

View File

@ -667,6 +667,16 @@ public:
std::shared_ptr<MarkCache> getMarkCache() const; std::shared_ptr<MarkCache> getMarkCache() const;
void dropMarkCache() const; void dropMarkCache() const;
/// Create a cache of index uncompressed blocks of specified size. This can be done only once.
void setIndexUncompressedCache(size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getIndexUncompressedCache() const;
void dropIndexUncompressedCache() const;
/// Create a cache of index marks of specified size. This can be done only once.
void setIndexMarkCache(size_t cache_size_in_bytes);
std::shared_ptr<MarkCache> getIndexMarkCache() const;
void dropIndexMarkCache() const;
/// Create a cache of mapped files to avoid frequent open/map/unmap/close and to reuse from several threads. /// Create a cache of mapped files to avoid frequent open/map/unmap/close and to reuse from several threads.
void setMMappedFileCache(size_t cache_size_in_num_entries); void setMMappedFileCache(size_t cache_size_in_num_entries);
std::shared_ptr<MMappedFileCache> getMMappedFileCache() const; std::shared_ptr<MMappedFileCache> getMMappedFileCache() const;

View File

@ -277,6 +277,14 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE); getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->dropUncompressedCache(); system_context->dropUncompressedCache();
break; break;
case Type::DROP_INDEX_MARK_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MARK_CACHE);
system_context->dropIndexMarkCache();
break;
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_UNCOMPRESSED_CACHE);
system_context->dropIndexUncompressedCache();
break;
case Type::DROP_MMAP_CACHE: case Type::DROP_MMAP_CACHE:
getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE); getContext()->checkAccess(AccessType::SYSTEM_DROP_MMAP_CACHE);
system_context->dropMMappedFileCache(); system_context->dropMMappedFileCache();
@ -721,6 +729,8 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_COMPILED_EXPRESSION_CACHE: [[fallthrough]]; case Type::DROP_COMPILED_EXPRESSION_CACHE: [[fallthrough]];
#endif #endif
case Type::DROP_UNCOMPRESSED_CACHE: case Type::DROP_UNCOMPRESSED_CACHE:
case Type::DROP_INDEX_MARK_CACHE:
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
{ {
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE); required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
break; break;

View File

@ -30,6 +30,10 @@ const char * ASTSystemQuery::typeToString(Type type)
return "DROP MARK CACHE"; return "DROP MARK CACHE";
case Type::DROP_UNCOMPRESSED_CACHE: case Type::DROP_UNCOMPRESSED_CACHE:
return "DROP UNCOMPRESSED CACHE"; return "DROP UNCOMPRESSED CACHE";
case Type::DROP_INDEX_MARK_CACHE:
return "DROP INDEX MARK CACHE";
case Type::DROP_INDEX_UNCOMPRESSED_CACHE:
return "DROP INDEX UNCOMPRESSED CACHE";
case Type::DROP_MMAP_CACHE: case Type::DROP_MMAP_CACHE:
return "DROP MMAP CACHE"; return "DROP MMAP CACHE";
#if USE_EMBEDDED_COMPILER #if USE_EMBEDDED_COMPILER

View File

@ -24,6 +24,8 @@ public:
DROP_DNS_CACHE, DROP_DNS_CACHE,
DROP_MARK_CACHE, DROP_MARK_CACHE,
DROP_UNCOMPRESSED_CACHE, DROP_UNCOMPRESSED_CACHE,
DROP_INDEX_MARK_CACHE,
DROP_INDEX_UNCOMPRESSED_CACHE,
DROP_MMAP_CACHE, DROP_MMAP_CACHE,
#if USE_EMBEDDED_COMPILER #if USE_EMBEDDED_COMPILER
DROP_COMPILED_EXPRESSION_CACHE, DROP_COMPILED_EXPRESSION_CACHE,

View File

@ -834,6 +834,9 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
if (settings.read_overflow_mode_leaf == OverflowMode::THROW && settings.max_rows_to_read_leaf) if (settings.read_overflow_mode_leaf == OverflowMode::THROW && settings.max_rows_to_read_leaf)
leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, 0, settings.read_overflow_mode_leaf); leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, 0, settings.read_overflow_mode_leaf);
auto mark_cache = context->getIndexMarkCache();
auto uncompressed_cache = context->getIndexUncompressedCache();
auto process_part = [&](size_t part_index) auto process_part = [&](size_t part_index)
{ {
auto & part = parts[part_index]; auto & part = parts[part_index];
@ -870,6 +873,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
reader_settings, reader_settings,
total_granules, total_granules,
granules_dropped, granules_dropped,
mark_cache.get(),
uncompressed_cache.get(),
log); log);
index_and_condition.total_granules.fetch_add(total_granules, std::memory_order_relaxed); index_and_condition.total_granules.fetch_add(total_granules, std::memory_order_relaxed);
@ -1408,6 +1413,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
const MergeTreeReaderSettings & reader_settings, const MergeTreeReaderSettings & reader_settings,
size_t & total_granules, size_t & total_granules,
size_t & granules_dropped, size_t & granules_dropped,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
Poco::Logger * log) Poco::Logger * log)
{ {
const std::string & path_prefix = part->getFullRelativePath() + index_helper->getFileName(); const std::string & path_prefix = part->getFullRelativePath() + index_helper->getFileName();
@ -1433,6 +1440,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
index_helper, part, index_helper, part,
index_marks_count, index_marks_count,
ranges, ranges,
mark_cache,
uncompressed_cache,
reader_settings); reader_settings);
MarkRanges res; MarkRanges res;

View File

@ -90,6 +90,8 @@ private:
const MergeTreeReaderSettings & reader_settings, const MergeTreeReaderSettings & reader_settings,
size_t & total_granules, size_t & total_granules,
size_t & granules_dropped, size_t & granules_dropped,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
Poco::Logger * log); Poco::Logger * log);
struct PartFilterCounters struct PartFilterCounters

View File

@ -11,13 +11,15 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
MergeTreeData::DataPartPtr part, MergeTreeData::DataPartPtr part,
size_t marks_count, size_t marks_count,
const MarkRanges & all_mark_ranges, const MarkRanges & all_mark_ranges,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings) MergeTreeReaderSettings settings)
{ {
return std::make_unique<MergeTreeReaderStream>( return std::make_unique<MergeTreeReaderStream>(
part->volume->getDisk(), part->volume->getDisk(),
part->getFullRelativePath() + index->getFileName(), extension, marks_count, part->getFullRelativePath() + index->getFileName(), extension, marks_count,
all_mark_ranges, all_mark_ranges,
std::move(settings), nullptr, nullptr, std::move(settings), mark_cache, uncompressed_cache,
part->getFileSizeOrZero(index->getFileName() + extension), part->getFileSizeOrZero(index->getFileName() + extension),
&part->index_granularity_info, &part->index_granularity_info,
ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE); ReadBufferFromFileBase::ProfileCallback{}, CLOCK_MONOTONIC_COARSE);
@ -29,14 +31,27 @@ namespace DB
{ {
MergeTreeIndexReader::MergeTreeIndexReader( MergeTreeIndexReader::MergeTreeIndexReader(
MergeTreeIndexPtr index_, MergeTreeData::DataPartPtr part_, size_t marks_count_, const MarkRanges & all_mark_ranges_, MergeTreeIndexPtr index_,
MergeTreeData::DataPartPtr part_,
size_t marks_count_,
const MarkRanges & all_mark_ranges_,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings) MergeTreeReaderSettings settings)
: index(index_) : index(index_)
{ {
const std::string & path_prefix = part_->getFullRelativePath() + index->getFileName(); const std::string & path_prefix = part_->getFullRelativePath() + index->getFileName();
auto index_format = index->getDeserializedFormat(part_->volume->getDisk(), path_prefix); auto index_format = index->getDeserializedFormat(part_->volume->getDisk(), path_prefix);
stream = makeIndexReader(index_format.extension, index_, part_, marks_count_, all_mark_ranges_, std::move(settings)); stream = makeIndexReader(
index_format.extension,
index_,
part_,
marks_count_,
all_mark_ranges_,
mark_cache,
uncompressed_cache,
std::move(settings));
version = index_format.version; version = index_format.version;
stream->seekToStart(); stream->seekToStart();

View File

@ -16,6 +16,8 @@ public:
MergeTreeData::DataPartPtr part_, MergeTreeData::DataPartPtr part_,
size_t marks_count_, size_t marks_count_,
const MarkRanges & all_mark_ranges_, const MarkRanges & all_mark_ranges_,
MarkCache * mark_cache,
UncompressedCache * uncompressed_cache,
MergeTreeReaderSettings settings); MergeTreeReaderSettings settings);
~MergeTreeIndexReader(); ~MergeTreeIndexReader();