From b26a8b5622d6516c92a0d045ab6d1e63b488f8bb Mon Sep 17 00:00:00 2001 From: CurtizJ Date: Tue, 11 Feb 2020 16:41:26 +0300 Subject: [PATCH] choose part type while selecting parts to merge --- .../Storages/MergeTree/IMergeTreeDataPart.cpp | 21 +-------- .../Storages/MergeTree/IMergeTreeDataPart.h | 3 +- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 2 +- .../MergeTree/MergeTreeDataMergerMutator.cpp | 21 ++++----- .../MergeTree/MergeTreeDataMergerMutator.h | 1 + .../MergeTree/MergeTreeDataPartCompact.cpp | 2 +- .../MergeTree/MergeTreeDataPartType.cpp | 43 +++++++++++++++++++ .../MergeTree/MergeTreeDataPartType.h | 33 ++++++++++++-- .../MergeTree/ReplicatedMergeTreeLogEntry.cpp | 11 ++++- .../MergeTree/ReplicatedMergeTreeLogEntry.h | 2 + dbms/src/Storages/StorageMergeTree.cpp | 1 + .../Storages/StorageReplicatedMergeTree.cpp | 11 +++-- .../src/Storages/StorageReplicatedMergeTree.h | 1 + 13 files changed, 111 insertions(+), 41 deletions(-) create mode 100644 dbms/src/Storages/MergeTree/MergeTreeDataPartType.cpp diff --git a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp index c6dfe4a76cb..a2667712356 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -340,7 +340,7 @@ void IMergeTreeDataPart::assertOnDisk() const { if (!isStoredOnDisk()) throw Exception("Data part '" + name + "' with type '" - + typeToString(getType()) + "' is not stored on disk", ErrorCodes::LOGICAL_ERROR); + + getType().toString() + "' is not stored on disk", ErrorCodes::LOGICAL_ERROR); } @@ -416,7 +416,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks void IMergeTreeDataPart::loadIndexGranularity() { - throw Exception("Method 'loadIndexGranularity' is not implemented for part with type " + typeToString(getType()), ErrorCodes::NOT_IMPLEMENTED); + throw Exception("Method 'loadIndexGranularity' is not implemented for part with type " + getType().toString(), ErrorCodes::NOT_IMPLEMENTED); } void IMergeTreeDataPart::loadIndex() @@ -779,23 +779,6 @@ void IMergeTreeDataPart::remove() const } } -String IMergeTreeDataPart::typeToString(Type type) -{ - switch (type) - { - case Type::WIDE: - return "Wide"; - case Type::COMPACT: - return "Compact"; - case Type::IN_MEMORY: - return "InMemory"; - case Type::UNKNOWN: - return "Unknown"; - } - - __builtin_unreachable(); -} - String IMergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix) const { /// Do not allow underscores in the prefix because they are used as separators. diff --git a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h index fb91a46ca74..65b22d90734 100644 --- a/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/dbms/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -113,8 +113,7 @@ public: Type getType() const { return part_type; } - static String typeToString(Type type); - String getTypeName() const { return typeToString(getType()); } + String getTypeName() const { return getType().toString(); } void setColumns(const NamesAndTypesList & new_columns); diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 7ef141327fc..2673a9826f5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1704,7 +1704,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart( const String & name, const MergeTreePartInfo & part_info, const DiskPtr & disk, const String & relative_path) const { - auto type = MergeTreeDataPartType::UNKNOWN; + MergeTreeDataPartType type; auto full_path = getFullPathOnDisk(disk) + relative_path + "/"; auto mrk_ext = MergeTreeIndexGranularityInfo::getMrkExtensionFromFS(full_path); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 6ad04e4e0f2..6d93809817e 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -89,19 +89,25 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_) UInt32 max_level = 0; Int64 max_mutation = 0; + size_t sum_rows = 0; + size_t sum_bytes_uncompressed = 0; for (const auto & part : parts) { max_level = std::max(max_level, part->info.level); max_mutation = std::max(max_mutation, part->info.mutation); + sum_rows += part->rows_count; + sum_bytes_uncompressed += part->getTotalColumnsSize().data_uncompressed; } + const auto & storage = parts.front()->storage; + type = storage.choosePartType(sum_bytes_uncompressed, sum_rows); part_info.partition_id = parts.front()->info.partition_id; part_info.min_block = parts.front()->info.min_block; part_info.max_block = parts.back()->info.max_block; part_info.level = max_level + 1; part_info.mutation = max_mutation; - if (parts.front()->storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) + if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING) { DayNum min_date = DayNum(std::numeric_limits::max()); DayNum max_date = DayNum(std::numeric_limits::min()); @@ -576,20 +582,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor all_columns, data.sorting_key_expr, data.skip_indices, data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names); - size_t sum_input_rows_upper_bound = merge_entry->total_rows_count; - size_t estimated_bytes_uncompressed = 0; - for (const auto & part : parts) - estimated_bytes_uncompressed += part->getTotalColumnsSize().data_uncompressed; - MergeTreeData::MutableDataPartPtr new_data_part = data.createPart( future_part.name, + future_part.type, future_part.part_info, space_reservation->getDisk(), - all_columns, - estimated_bytes_uncompressed, - sum_input_rows_upper_bound, TMP_PREFIX + future_part.name); + new_data_part->setColumns(all_columns); new_data_part->partition.assign(future_part.getPartition()); new_data_part->is_temp = true; @@ -607,6 +607,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor need_remove_expired_values = false; } + size_t sum_input_rows_upper_bound = merge_entry->total_rows_count; MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values); LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal")); @@ -990,7 +991,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor auto new_data_part = data.createPart( future_part.name, - source_part->getType(), + future_part.type, future_part.part_info, space_reservation->getDisk(), "tmp_mut_" + future_part.name); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index e475037f32b..36e0687a19b 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -18,6 +18,7 @@ struct FutureMergedMutatedPart { String name; String path; + MergeTreeDataPartType type; MergeTreePartInfo part_info; MergeTreeData::DataPartsVector parts; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp index 8f36ce5e828..aab80365fe1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp @@ -78,7 +78,7 @@ ColumnSize MergeTreeDataPartCompact::getTotalColumnsSize() const if (bin_checksum != checksums.files.end()) { total_size.data_compressed += bin_checksum->second.file_size; - total_size.data_compressed += bin_checksum->second.uncompressed_size; + total_size.data_uncompressed += bin_checksum->second.uncompressed_size; } auto mrk_checksum = checksums.files.find(DATA_FILE_NAME + index_granularity_info.marks_file_extension); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartType.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPartType.cpp new file mode 100644 index 00000000000..f8a38af5496 --- /dev/null +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartType.cpp @@ -0,0 +1,43 @@ +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_PART_TYPE; +} + +void MergeTreeDataPartType::fromString(const String & str) +{ + if (str == "Wide") + value = WIDE; + else if (str == "Compact") + value = COMPACT; + else if (str == "InMemory") + value = IN_MEMORY; + else if (str == "Unknown") + value = UNKNOWN; + else + throw DB::Exception("Unexpected string for part type: " + str, ErrorCodes::UNKNOWN_PART_TYPE); +} + +String MergeTreeDataPartType::toString() const +{ + switch (value) + { + case WIDE: + return "Wide"; + case COMPACT: + return "Compact"; + case IN_MEMORY: + return "InMemory"; + case UNKNOWN: + return "Unknown"; + } + + __builtin_unreachable(); +} + +} diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartType.h b/dbms/src/Storages/MergeTree/MergeTreeDataPartType.h index ed229414e3b..a7d176f7baa 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartType.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartType.h @@ -1,11 +1,17 @@ #pragma once +#include + namespace DB { - /// Types of data part format. - enum class MergeTreeDataPartType + +/// Types of data part format. +class MergeTreeDataPartType +{ +public: + enum Value { - /// Data of each is stored in one or several (for complex types) files. + /// Data of each column is stored in one or several (for complex types) files. /// Every data file is followed by marks file. WIDE, @@ -17,4 +23,25 @@ namespace DB UNKNOWN, }; + + MergeTreeDataPartType() : value(UNKNOWN) {} + MergeTreeDataPartType(Value value_) : value(value_) {} + + bool operator==(const MergeTreeDataPartType & other) const + { + return value == other.value; + } + + bool operator!=(const MergeTreeDataPartType & other) const + { + return !(*this == other); + } + + void fromString(const String & str); + String toString() const; + +private: + Value value; +}; + } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index 29878cc064e..b871f75130b 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -12,7 +12,7 @@ namespace DB void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const { - out << "format version: 4\n" + out << "format version: 5\n" << "create_time: " << LocalDateTime(create_time ? create_time : time(nullptr)) << "\n" << "source replica: " << source_replica << '\n' << "block_id: " << escape << block_id << '\n'; @@ -29,6 +29,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const out << s << '\n'; out << "into\n" << new_part_name; out << "\ndeduplicate: " << deduplicate; + out << "\npart_type: " << new_part_type.toString(); break; case DROP_RANGE: @@ -82,7 +83,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) in >> "format version: " >> format_version >> "\n"; - if (format_version < 1 || format_version > 4) + if (format_version < 1 || format_version > 5) throw Exception("Unknown ReplicatedMergeTreeLogEntry format version: " + DB::toString(format_version), ErrorCodes::UNKNOWN_FORMAT_VERSION); if (format_version >= 2) @@ -120,6 +121,12 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) in >> new_part_name; if (format_version >= 4) in >> "\ndeduplicate: " >> deduplicate; + if (format_version >= 5) + { + String part_type_str; + in >> "\npart_type: " >> type_str; + new_part_type.fromString(type_str); + } } else if (type_str == "drop" || type_str == "detach") { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index ca8c9315fa9..d761e958828 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -72,6 +73,7 @@ struct ReplicatedMergeTreeLogEntryData /// The name of resulting part for GET_PART and MERGE_PARTS /// Part range for DROP_RANGE and CLEAR_COLUMN String new_part_name; + MergeTreeDataPartType new_part_type; String block_id; /// For parts of level zero, the block identifier for deduplication (node name in /blocks/). mutable String actual_new_part_name; /// GET_PART could actually fetch a part covering 'new_part_name'. diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 199b6bfc149..1e5f72f9307 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -745,6 +745,7 @@ bool StorageMergeTree::tryMutatePart() future_part.parts.push_back(part); future_part.part_info = new_part_info; future_part.name = part->getNewName(new_part_info); + future_part.type = part->getType(); tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true); break; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 439849fd8e5..5267f19b7df 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1220,6 +1220,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM future_mutated_part.part_info = new_part_info; future_mutated_part.name = entry.new_part_name; future_mutated_part.updatePath(*this, reserved_space); + future_mutated_part.type = source_part->getType(); auto table_id = getStorageID(); MergeList::EntryPtr merge_entry = global_context.getMergeList().insert( @@ -2279,7 +2280,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred)) { success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, - future_merged_part.name, deduplicate, force_ttl); + future_merged_part.name, future_merged_part.type, deduplicate, force_ttl); } /// If there are many mutations in queue it may happen, that we cannot enqueue enough merges to merge all new parts else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0 @@ -2344,6 +2345,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts( zkutil::ZooKeeperPtr & zookeeper, const DataPartsVector & parts, const String & merged_name, + const MergeTreeDataPartType & merged_part_type, bool deduplicate, bool force_ttl, ReplicatedMergeTreeLogEntryData * out_log_entry) @@ -2380,12 +2382,15 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts( entry.type = LogEntry::MERGE_PARTS; entry.source_replica = replica_name; entry.new_part_name = merged_name; + entry.new_part_type = merged_part_type; entry.deduplicate = deduplicate; entry.force_ttl = force_ttl; entry.create_time = time(nullptr); for (const auto & part : parts) + { entry.source_parts.push_back(part->name); + } String path_created = zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); @@ -3152,7 +3157,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p future_merged_part, disk_space, can_merge, partition_id, true, nullptr); ReplicatedMergeTreeLogEntryData merge_entry; if (selected && !createLogEntryToMergeParts(zookeeper, future_merged_part.parts, - future_merged_part.name, deduplicate, force_ttl, &merge_entry)) + future_merged_part.name, future_merged_part.type, deduplicate, force_ttl, &merge_entry)) return handle_noop("Can't create merge queue node in ZooKeeper"); if (merge_entry.type != ReplicatedMergeTreeLogEntryData::Type::EMPTY) merge_entries.push_back(std::move(merge_entry)); @@ -3189,7 +3194,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p ReplicatedMergeTreeLogEntryData merge_entry; if (!createLogEntryToMergeParts(zookeeper, future_merged_part.parts, - future_merged_part.name, deduplicate, force_ttl, &merge_entry)) + future_merged_part.name, future_merged_part.type, deduplicate, force_ttl, &merge_entry)) return handle_noop("Can't create merge queue node in ZooKeeper"); if (merge_entry.type != ReplicatedMergeTreeLogEntryData::Type::EMPTY) merge_entries.push_back(std::move(merge_entry)); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 9c7c36a19b8..080a19b66d5 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -438,6 +438,7 @@ private: zkutil::ZooKeeperPtr & zookeeper, const DataPartsVector & parts, const String & merged_name, + const MergeTreeDataPartType & merged_part_type, bool deduplicate, bool force_ttl, ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr);