From a0635ed390a267fd3ec274821f33f78fd05ee767 Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Mon, 3 Feb 2020 15:46:25 +0300 Subject: [PATCH] better marks reading --- dbms/src/DataStreams/MarkInCompressedFile.h | 11 ++- .../MergeTree/MergeTreeMarksLoader.cpp | 80 ++++++++++++++++--- .../Storages/MergeTree/MergeTreeMarksLoader.h | 23 +++--- .../MergeTree/MergeTreeReaderCompact.cpp | 56 ++----------- .../MergeTree/MergeTreeReaderCompact.h | 1 - .../MergeTree/MergeTreeReaderStream.cpp | 60 +------------- .../MergeTree/MergeTreeReaderStream.h | 2 - 7 files changed, 101 insertions(+), 132 deletions(-) diff --git a/dbms/src/DataStreams/MarkInCompressedFile.h b/dbms/src/DataStreams/MarkInCompressedFile.h index 46d078f2b76..62886ffad57 100644 --- a/dbms/src/DataStreams/MarkInCompressedFile.h +++ b/dbms/src/DataStreams/MarkInCompressedFile.h @@ -40,6 +40,15 @@ struct MarkInCompressedFile }; -using MarksInCompressedFile = PODArray; +class MarksInCompressedFile : public PODArray +{ +public: + MarksInCompressedFile(size_t n) : PODArray(n) {} + + void read(ReadBuffer & buffer, size_t from, size_t count) + { + buffer.readStrict(reinterpret_cast(data() + from), count * sizeof(MarkInCompressedFile)); + } +}; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index 8bdc8fc8307..452d46a4751 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -1,50 +1,106 @@ #include +#include +#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int CORRUPTED_DATA; + extern const int LOGICAL_ERROR; +} + MergeTreeMarksLoader::MergeTreeMarksLoader( MarkCache * mark_cache_, const String & mrk_path_, - const LoadFunc & load_func_, + size_t marks_count_, + const MergeTreeIndexGranularityInfo & index_granularity_info_, bool save_marks_in_cache_, - size_t columns_num_) + size_t columns_in_mark_) : mark_cache(mark_cache_) , mrk_path(mrk_path_) - , load_func(load_func_) + , marks_count(marks_count_) + , index_granularity_info(index_granularity_info_) , save_marks_in_cache(save_marks_in_cache_) - , columns_num(columns_num_) {} + , columns_in_mark(columns_in_mark_) {} const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, size_t column_index) { if (!marks) loadMarks(); - if (column_index >= columns_num) - throw Exception("Column index: " + toString(column_index) - + " is out of range [0, " + toString(columns_num) + ")", ErrorCodes::LOGICAL_ERROR); - return (*marks)[row_index * columns_num + column_index]; +#ifndef NDEBUG + if (column_index >= columns_in_mark) + throw Exception("Column index: " + toString(column_index) + + " is out of range [0, " + toString(columns_in_mark) + ")", ErrorCodes::LOGICAL_ERROR); +#endif + + return (*marks)[row_index * columns_in_mark + column_index]; +} + +MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() +{ + /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. + auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); + + size_t file_size = Poco::File(mrk_path).getSize(); + size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark); + size_t expected_file_size = mark_size * marks_count; + + if (expected_file_size != file_size) + throw Exception( + "Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size), + ErrorCodes::CORRUPTED_DATA); + + auto res = std::make_shared(marks_count * columns_in_mark); + + if (!index_granularity_info.is_adaptive) + { + /// Read directly to marks. + ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast(res->data())); + + if (buffer.eof() || buffer.buffer().size() != file_size) + throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); + } + else + { + ReadBufferFromFile buffer(mrk_path, file_size, -1); + size_t i = 0; + while (!buffer.eof()) + { + res->read(buffer, i * columns_in_mark, columns_in_mark); + buffer.seek(sizeof(size_t), SEEK_CUR); + ++i; + } + + if (i * mark_size != file_size) + throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); + } + res->protect(); + return res; } void MergeTreeMarksLoader::loadMarks() { - auto load = std::bind(load_func, mrk_path); if (mark_cache) { auto key = mark_cache->hash(mrk_path); if (save_marks_in_cache) { - marks = mark_cache->getOrSet(key, load); + auto callback = std::bind(&MergeTreeMarksLoader::loadMarksImpl, this); + marks = mark_cache->getOrSet(key, callback); } else { marks = mark_cache->get(key); if (!marks) - marks = load(); + marks = loadMarksImpl(); } } else - marks = load(); + marks = loadMarksImpl(); if (!marks) throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR); diff --git a/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.h b/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.h index 316be9d051d..7e5b75ed407 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.h @@ -3,19 +3,20 @@ namespace DB { +struct MergeTreeIndexGranularityInfo; + class MergeTreeMarksLoader { public: using MarksPtr = MarkCache::MappedPtr; - using LoadFunc = std::function; - MergeTreeMarksLoader() {} - - MergeTreeMarksLoader(MarkCache * mark_cache_, - const String & mrk_path_, - const LoadFunc & load_func_, + MergeTreeMarksLoader( + MarkCache * mark_cache_, + const String & mrk_path, + size_t marks_count_, + const MergeTreeIndexGranularityInfo & index_granularity_info_, bool save_marks_in_cache_, - size_t columns_num_ = 1); + size_t columns_num_in_mark_ = 1); const MarkInCompressedFile & getMark(size_t row_index, size_t column_index = 0); @@ -24,12 +25,14 @@ public: private: MarkCache * mark_cache = nullptr; String mrk_path; - LoadFunc load_func; + size_t marks_count; + const MergeTreeIndexGranularityInfo & index_granularity_info; bool save_marks_in_cache = false; - size_t columns_num; - MarksPtr marks; + size_t columns_num_in_mark_; + MarkCache::MappedPtr marks; void loadMarks(); + MarkCache::MappedPtr loadMarksImpl(); }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index e7027ec52d5..1d884bcab0e 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -19,11 +19,14 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(const MergeTreeData::DataPartPtr const NamesAndTypesList & columns_, UncompressedCache * uncompressed_cache_, MarkCache * mark_cache_, const MarkRanges & mark_ranges_, const MergeTreeReaderSettings & settings_, const ValueSizeMap & avg_value_size_hints_, const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_) - : IMergeTreeReader(data_part_, columns_ - , uncompressed_cache_, mark_cache_, mark_ranges_ - , settings_, avg_value_size_hints_) + : IMergeTreeReader(data_part_, columns_, + uncompressed_cache_, mark_cache_, mark_ranges_, + settings_, avg_value_size_hints_) + , marks_loader(mark_cache, + data_part->index_granularity_info.getMarksFilePath(path + MergeTreeDataPartCompact::DATA_FILE_NAME), + data_part->getMarksCount(), data_part->index_granularity_info, + settings.save_marks_in_cache, data_part->getColumns().size()) { - initMarksLoader(); size_t buffer_size = settings.max_read_buffer_size; const String full_data_path = path + MergeTreeDataPartCompact::DATA_FILE_NAME + MergeTreeDataPartCompact::DATA_FILE_EXTENSION; @@ -194,51 +197,6 @@ void MergeTreeReaderCompact::readData( } -void MergeTreeReaderCompact::initMarksLoader() -{ - if (marks_loader.initialized()) - return; - - size_t columns_num = data_part->getColumns().size(); - - auto load = [this, columns_num](const String & mrk_path) -> MarkCache::MappedPtr - { - size_t file_size = Poco::File(mrk_path).getSize(); - size_t marks_count = data_part->getMarksCount(); - size_t mark_size_in_bytes = data_part->index_granularity_info.getMarkSizeInBytes(columns_num); - - size_t expected_file_size = mark_size_in_bytes * marks_count; - if (expected_file_size != file_size) - throw Exception( - "Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size), - ErrorCodes::CORRUPTED_DATA); - - /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. - auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); - - auto res = std::make_shared(marks_count * columns_num); - - ReadBufferFromFile buffer(mrk_path, file_size); - size_t i = 0; - - while (!buffer.eof()) - { - buffer.readStrict(reinterpret_cast(res->data() + i * columns_num), sizeof(MarkInCompressedFile) * columns_num); - buffer.seek(sizeof(size_t), SEEK_CUR); - ++i; - } - - if (i * mark_size_in_bytes != file_size) - throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); - - res->protect(); - return res; - }; - - auto mrk_path = data_part->index_granularity_info.getMarksFilePath(path + MergeTreeDataPartCompact::DATA_FILE_NAME); - marks_loader = MergeTreeMarksLoader{mark_cache, std::move(mrk_path), load, settings.save_marks_in_cache, columns_num}; -} - void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index) { MarkInCompressedFile mark = marks_loader.getMark(row_index, column_index); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.h b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.h index 55a3ab15bac..29a97fe7c5a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.h +++ b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.h @@ -46,7 +46,6 @@ private: size_t next_mark = 0; std::optional> last_read_granule; - void initMarksLoader(); void seekToMark(size_t row_index, size_t column_index); void readData(const String & name, IColumn & column, const IDataType & type, diff --git a/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp index b70c3c8b573..b132bda0f9e 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp @@ -16,7 +16,7 @@ namespace ErrorCodes MergeTreeReaderStream::MergeTreeReaderStream( - const String & path_prefix_,const String & data_file_extension_, size_t marks_count_, + const String & path_prefix_, const String & data_file_extension_, size_t marks_count_, const MarkRanges & all_mark_ranges, const MergeTreeReaderSettings & settings, MarkCache * mark_cache_, @@ -26,15 +26,13 @@ MergeTreeReaderStream::MergeTreeReaderStream( : path_prefix(path_prefix_), data_file_extension(data_file_extension_), marks_count(marks_count_) , mark_cache(mark_cache_), save_marks_in_cache(settings.save_marks_in_cache) , index_granularity_info(index_granularity_info_) + , marks_loader(mark_cache, index_granularity_info->getMarksFilePath(path_prefix), + marks_count, *index_granularity_info, save_marks_in_cache) { /// Compute the size of the buffer. size_t max_mark_range_bytes = 0; size_t sum_mark_range_bytes = 0; - /// Care should be taken to not load marks when the part is empty (marks_count == 0). - - initMarksLoader(); - for (const auto & mark_range : all_mark_ranges) { size_t left_mark = mark_range.begin; @@ -106,58 +104,6 @@ MergeTreeReaderStream::MergeTreeReaderStream( } -void MergeTreeReaderStream::initMarksLoader() -{ - if (marks_loader.initialized()) - return; - - auto load = [this](const String & mrk_path) -> MarkCache::MappedPtr - { - /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. - auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); - - size_t file_size = Poco::File(mrk_path).getSize(); - size_t mark_size = index_granularity_info->getMarkSizeInBytes(); - - size_t expected_file_size = mark_size * marks_count; - if (expected_file_size != file_size) - throw Exception( - "Bad size of marks file '" + mrk_path + "': " + std::to_string(file_size) + ", must be: " + std::to_string(expected_file_size), - ErrorCodes::CORRUPTED_DATA); - - auto res = std::make_shared(marks_count); - - if (!index_granularity_info->is_adaptive) - { - /// Read directly to marks. - ReadBufferFromFile buffer(mrk_path, file_size, -1, reinterpret_cast(res->data())); - - if (buffer.eof() || buffer.buffer().size() != file_size) - throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); - } - else - { - ReadBufferFromFile buffer(mrk_path, file_size, -1); - size_t i = 0; - while (!buffer.eof()) - { - readIntBinary((*res)[i].offset_in_compressed_file, buffer); - readIntBinary((*res)[i].offset_in_decompressed_block, buffer); - buffer.seek(sizeof(size_t), SEEK_CUR); - ++i; - } - if (i * mark_size != file_size) - throw Exception("Cannot read all marks from file " + mrk_path, ErrorCodes::CANNOT_READ_ALL_DATA); - } - res->protect(); - return res; - }; - - auto mrk_path = index_granularity_info->getMarksFilePath(path_prefix); - marks_loader = MergeTreeMarksLoader{mark_cache, std::move(mrk_path), load, save_marks_in_cache}; -} - - void MergeTreeReaderStream::seekToMark(size_t index) { MarkInCompressedFile mark = marks_loader.getMark(index); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReaderStream.h b/dbms/src/Storages/MergeTree/MergeTreeReaderStream.h index 8356fed8382..ccadbcd8e4a 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReaderStream.h +++ b/dbms/src/Storages/MergeTree/MergeTreeReaderStream.h @@ -31,8 +31,6 @@ public: ReadBuffer * data_buffer; private: - void initMarksLoader(); - std::string path_prefix; std::string data_file_extension;