From 9e7adf4cbe66aa63910ea4926b4a31df7d9298db Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Mon, 25 Nov 2019 23:19:43 +0300 Subject: [PATCH] polymorphic parts (development) --- .../MergeTree/IMergeTreeDataPartWriter.cpp | 2 +- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 23 +++++----- dbms/src/Storages/MergeTree/MergeTreeData.h | 5 ++- .../MergeTree/MergeTreeDataMergerMutator.cpp | 43 ++++++++++--------- .../MergeTree/MergeTreeDataWriter.cpp | 9 ++-- .../MergeTree/MergeTreeIndexReader.cpp | 1 + .../MergeTree/MergeTreeMarksLoader.cpp | 7 +-- .../Storages/MergeTree/MergeTreeMarksLoader.h | 2 +- .../MergeTree/MergeTreeReaderCompact.cpp | 9 ++-- .../MergeTree/MergeTreeReaderStream.cpp | 8 ++-- .../MergeTree/MergedBlockOutputStream.cpp | 2 +- 11 files changed, 58 insertions(+), 53 deletions(-) diff --git a/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp b/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp index dbf12110987..e44ace2e521 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp +++ b/dbms/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp @@ -126,7 +126,7 @@ void fillIndexGranularityImpl( index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block); /// FIXME correct index granularity for compact - index_granularity_for_block = rows_in_block; + // index_granularity_for_block = rows_in_block; /// FIXME: split/join last mark for compact parts for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block) diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index a833cb66f98..dba837a80df 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1579,10 +1579,10 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name } -MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_on_disk, size_t rows_count) const +MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_uncompressed, size_t rows_count) const { const auto settings = getSettings(); - if (bytes_on_disk < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part) + if (bytes_uncompressed < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part) return MergeTreeDataPartType::COMPACT; return MergeTreeDataPartType::WIDE; @@ -1601,15 +1601,18 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name, throw Exception("Unknown part type", ErrorCodes::LOGICAL_ERROR); } -MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name, - const MergeTreePartInfo & part_info, const DiskSpace::DiskPtr & disk, - size_t bytes_on_disk, size_t rows_count, const String & relative_path) const +MergeTreeData::MutableDataPartPtr MergeTreeData::createPart( + const String & name, + const MergeTreePartInfo & part_info, + const DiskSpace::DiskPtr & disk, + const NamesAndTypesList & columns, + size_t bytes_uncompressed, + size_t rows_count, + const String & relative_path) const { - auto part = createPart(name, choosePartType(bytes_on_disk, rows_count), part_info, disk, relative_path); - - part->bytes_on_disk = bytes_on_disk; - part->rows_count = rows_count; - + auto part = createPart(name, choosePartType(bytes_uncompressed, rows_count), part_info, disk, relative_path); + part->setColumns(columns); + /// Don't save rows_count count here as it can change later return part; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 183027a179d..be1fa069f1b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -180,10 +180,11 @@ public: MergeTreeDataPartType choosePartType(size_t bytes_on_disk, size_t rows_count) const; - - + /// After this methods setColumns must be called + /// FIXME make this inside this function MutableDataPartPtr createPart(const String & name, const MergeTreePartInfo & part_info,const DiskSpace::DiskPtr & disk, + const NamesAndTypesList & columns, size_t bytes_on_disk, size_t rows_num, const String & relative_path) const; MutableDataPartPtr createPart(const String & name, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 185cf510301..9f65a2fe964 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -573,11 +573,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor future_part.name, future_part.part_info, space_reservation->getDisk(), + all_columns, estimated_bytes_uncompressed, sum_input_rows_upper_bound, TMP_PREFIX + future_part.name); - new_data_part->setColumns(all_columns); new_data_part->partition.assign(future_part.getPartition()); new_data_part->is_temp = true; @@ -958,15 +958,32 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor else LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation); + auto in = mutations_interpreter.execute(table_lock_holder); + const auto & updated_header = mutations_interpreter.getUpdatedHeader(); - MergeTreeData::MutableDataPartPtr new_data_part = data.createPart( - future_part.name, source_part->getType(), - future_part.part_info, space_reservation->getDisk(), + NamesAndTypesList all_columns = data.getColumns().getAllPhysical(); + + const auto & source_column_names = source_part->columns.getNames(); + const auto & updated_column_names = updated_header.getNames(); + + NameSet new_columns_set(source_column_names.begin(), source_column_names.end()); + new_columns_set.insert(updated_column_names.begin(), updated_column_names.end()); + auto new_columns = all_columns.filter(new_columns_set); + + auto new_data_part = data.createPart( + future_part.name, + future_part.part_info, + space_reservation->getDisk(), + std::move(new_columns), + source_part->bytes_on_disk, + source_part->rows_count, "tmp_mut_" + future_part.name); new_data_part->is_temp = true; new_data_part->ttl_infos = source_part->ttl_infos; - new_data_part->index_granularity_info = source_part->index_granularity_info; + + /// FIXME Now it's wrong code. Check if nothing will break + // new_data_part->index_granularity_info = source_part->index_granularity_info; String new_part_tmp_path = new_data_part->getFullPath(); @@ -981,10 +998,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor Poco::File(new_part_tmp_path).createDirectories(); - auto in = mutations_interpreter.execute(table_lock_holder); - const auto & updated_header = mutations_interpreter.getUpdatedHeader(); - - NamesAndTypesList all_columns = data.getColumns().getAllPhysical(); const auto data_settings = data.getSettings(); Block in_header = in->getHeader(); @@ -1141,18 +1154,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor WriteBufferFromFile out_checksums(new_part_tmp_path + "checksums.txt", 4096); new_data_part->checksums.write(out_checksums); } - - /// Write the columns list of the resulting part in the same order as all_columns. - Names source_column_names = source_part->columns.getNames(); - NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end()); - for (auto it = all_columns.begin(); it != all_columns.end();) - { - if (source_columns_name_set.count(it->name) || updated_header.has(it->name)) - ++it; - else - it = new_data_part->columns.erase(it); - } - new_data_part->setColumns(all_columns); { /// Write a file with a description of columns. WriteBufferFromFile out_columns(new_part_tmp_path + "columns.txt", 4096); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 81acbb5e600..5eea3f38c46 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -203,10 +203,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa size_t expected_size = block.bytes(); auto reservation = data.reserveSpace(expected_size); + NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames()); + auto new_data_part = data.createPart( part_name, new_part_info, reservation->getDisk(), - expected_size, block.rows(), + columns, + expected_size, + block.rows(), TMP_PREFIX + part_name); new_data_part->partition = std::move(partition); @@ -263,9 +267,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa /// either default lz4 or compression method with zero thresholds on absolute and relative part size. auto compression_codec = data.global_context.chooseCompressionCodec(0, 0); - NamesAndTypesList columns = data.getColumns().getAllPhysical().filter(block.getNames()); - new_data_part->setColumns(columns); - MergedBlockOutputStream out(new_data_part, columns, compression_codec); out.writePrefix(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeIndexReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeIndexReader.cpp index ce852d1fa46..af899bba402 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeIndexReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeIndexReader.cpp @@ -26,6 +26,7 @@ MergeTreeIndexGranulePtr MergeTreeIndexReader::read() { auto granule = index->createIndexGranule(); granule->deserializeBinary(*stream.data_buffer); + std::cerr << "(MergeTreeIndexReader) granule.empty(): " << granule->empty() << "\n"; return granule; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index 0ae72df739d..2dbdc6d7ad6 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -27,22 +27,23 @@ const MarkInCompressedFile & MergeTreeMarksLoader::getMark(size_t row_index, siz void MergeTreeMarksLoader::loadMarks() { + auto load = std::bind(load_func, mrk_path); if (mark_cache) { auto key = mark_cache->hash(mrk_path); if (save_marks_in_cache) { - marks = mark_cache->getOrSet(key, load_func); + marks = mark_cache->getOrSet(key, load); } else { marks = mark_cache->get(key); if (!marks) - marks = load_func(); + marks = load(); } } else - marks = load_func(); + marks = load(); if (!marks) throw Exception("Failed to load marks: " + mrk_path, ErrorCodes::LOGICAL_ERROR); diff --git a/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.h b/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.h index 8c3bc1d5dff..aefcaff7aa1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeMarksLoader.h @@ -7,7 +7,7 @@ class MergeTreeMarksLoader { public: using MarksPtr = MarkCache::MappedPtr; - using LoadFunc = std::function; + using LoadFunc = std::function; MergeTreeMarksLoader() {} diff --git a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp index fa42ada3fed..517c6212b1b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReaderCompact.cpp @@ -129,11 +129,9 @@ void MergeTreeReaderCompact::initMarksLoader() if (marks_loader.initialized()) return; - std::string mrk_path = data_part->index_granularity_info.getMarksFilePath(path + NAME_OF_FILE_WITH_DATA); size_t columns_num = data_part->columns.size(); - /// FIXME pass mrk_path as argument - auto load = [this, columns_num, mrk_path]() -> MarkCache::MappedPtr + auto load = [this, columns_num](const String & mrk_path) -> MarkCache::MappedPtr { size_t file_size = Poco::File(mrk_path).getSize(); size_t marks_count = data_part->getMarksCount(); @@ -178,9 +176,8 @@ void MergeTreeReaderCompact::initMarksLoader() return res; }; - marks_loader = MergeTreeMarksLoader{mark_cache, mrk_path, load, settings.save_marks_in_cache, columns_num}; - - std::cerr << "(MergeTreeReaderCompact::loadMarks) end marks load..." << "\n"; + auto mrk_path = data_part->index_granularity_info.getMarksFilePath(path + NAME_OF_FILE_WITH_DATA); + marks_loader = MergeTreeMarksLoader{mark_cache, std::move(mrk_path), load, settings.save_marks_in_cache, columns_num}; } void MergeTreeReaderCompact::seekToMark(size_t row_index, size_t column_index) diff --git a/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp index 45f509db78f..8fde48c6791 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReaderStream.cpp @@ -108,13 +108,13 @@ void MergeTreeReaderStream::initMarksLoader() if (marks_loader.initialized()) return; - auto load = [this]() -> MarkCache::MappedPtr + auto load = [this](const String & mrk_path) -> MarkCache::MappedPtr { + std::cerr << "reading marks from path: " << mrk_path << "\n"; + std::cerr << "marks: " << marks_count << "\n"; /// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache. auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); - std::string mrk_path = index_granularity_info->getMarksFilePath(path_prefix); - size_t file_size = Poco::File(mrk_path).getSize(); size_t expected_file_size = index_granularity_info->mark_size_in_bytes * marks_count; if (expected_file_size != file_size) @@ -151,7 +151,7 @@ void MergeTreeReaderStream::initMarksLoader() }; auto mrk_path = index_granularity_info->getMarksFilePath(path_prefix); - marks_loader = MergeTreeMarksLoader{mark_cache, mrk_path, load, save_marks_in_cache}; + marks_loader = MergeTreeMarksLoader{mark_cache, std::move(mrk_path), load, save_marks_in_cache}; } diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 2cb650c177d..9cfc920c8e7 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -146,7 +146,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart( new_part->rows_count = rows_count; new_part->modification_time = time(nullptr); - new_part->setColumns(*total_column_list); + // new_part->setColumns(*total_column_list); new_part->index = writer->releaseIndexColumns(); new_part->checksums = checksums; new_part->bytes_on_disk = checksums.getTotalSizeOnDisk();