From 8d37c71f4178cc9a31938c61dd826ab2b729c1f1 Mon Sep 17 00:00:00 2001 From: Alexey Zatelepin Date: Fri, 20 Apr 2018 22:11:20 +0300 Subject: [PATCH] execute mutation merge entries --- .../Storages/MergeTree/MergeTreeMutation.cpp | 3 + .../MergeTree/ReplicatedMergeTreeQueue.cpp | 21 +++ .../MergeTree/ReplicatedMergeTreeQueue.h | 2 + .../Storages/StorageReplicatedMergeTree.cpp | 155 +++++++++++++++--- .../src/Storages/StorageReplicatedMergeTree.h | 4 +- 5 files changed, 165 insertions(+), 20 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeMutation.cpp b/dbms/src/Storages/MergeTree/MergeTreeMutation.cpp index 780fdaba999..5e897e25a25 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeMutation.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeMutation.cpp @@ -100,6 +100,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeMutation::executeOnPart(const MergeTr in->readSuffix(); out.writeSuffixAndFinalizePart(new_data_part); + if (!new_data_part->rows_count) + return nullptr; + return new_data_part; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 72ea0cadb14..ffded83805c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -976,6 +976,27 @@ bool ReplicatedMergeTreeQueue::canMutatePart(const MergeTreePartInfo & part_info return true; } +MutationCommands ReplicatedMergeTreeQueue::getMutationCommands( + const MergeTreePartInfo & part_info, Int64 desired_mutation_version) const +{ + std::lock_guard lock(mutex); + + Int64 data_version = part_info.version ? part_info.version : part_info.min_block; + auto begin = mutations_by_block_number.upper_bound(data_version); + + auto end = mutations_by_block_number.find(desired_mutation_version); + if (end == mutations_by_block_number.end()) + throw Exception("Mutation with version " + toString(desired_mutation_version) + " not found", + ErrorCodes::LOGICAL_ERROR); + ++end; + + std::vector commands; + for (auto it = begin; it != end; ++it) + commands.insert(commands.end(), it->second->commands.commands.begin(), it->second->commands.commands.end()); + + return MutationCommands{commands}; +} + void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name) { virtual_parts.add(part_name); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 6a446d9665d..4e425142f1a 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -217,6 +217,8 @@ public: bool canMutatePart(const MergeTreePartInfo & part_info, Int64 & desired_mutation_version) const; + MutationCommands getMutationCommands(const MergeTreePartInfo & part_info, Int64 desired_mutation_version) const; + /// Prohibit merges in the specified range. void disableMergesInRange(const String & part_name); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index bea832b35c2..7a0e91d21c8 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include @@ -1042,7 +1043,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) } if (entry.type == LogEntry::GET_PART || - entry.type == LogEntry::MERGE_PARTS) + entry.type == LogEntry::MERGE_PARTS || + entry.type == LogEntry::MUTATE_PART) { /// If we already have this part or a part covering it, we do not need to do anything. /// The part may be still in the PreCommitted -> Committed transition so we first search @@ -1079,7 +1081,11 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) } else if (entry.type == LogEntry::MERGE_PARTS) { - tryExecuteMerge(entry, do_fetch); + do_fetch = !tryExecuteMerge(entry); + } + else if (entry.type == LogEntry::MUTATE_PART) + { + do_fetch = !tryExecutePartMutation(entry); } else { @@ -1093,12 +1099,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(const LogEntry & entry) } -void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTree::LogEntry & entry, bool & do_fetch) +bool StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTree::LogEntry & entry) { - /// The caller has already decided to make the fetch - if (do_fetch) - return; - // Log source part names just in case { std::stringstream log_message; @@ -1133,8 +1135,8 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre if (!have_all_parts) { /// If you do not have all the necessary parts, try to take some already merged part from someone. - do_fetch = true; LOG_DEBUG(log, "Don't have all parts for merge " << entry.new_part_name << "; will try to fetch it instead"); + return false; } else if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr)) { @@ -1150,15 +1152,12 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove. if (!replica.empty()) { - do_fetch = true; LOG_DEBUG(log, "Prefer to fetch " << entry.new_part_name << " from replica " << replica); + return false; } } } - if (do_fetch) - return; - /// Start to make the main work size_t estimated_space_for_merge = MergeTreeDataMerger::estimateDiskSpaceForMerge(parts); @@ -1169,7 +1168,6 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre auto table_lock = lockStructure(false, __PRETTY_FUNCTION__); MergeList::EntryPtr merge_entry = context.getMergeList().insert(database_name, table_name, entry.new_part_name, parts); - MergeTreeData::Transaction transaction; size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io; MergeTreeDataMerger::FuturePart future_merged_part(parts); @@ -1179,11 +1177,11 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre + entry.new_part_name + "`", ErrorCodes::BAD_DATA_PART_NAME); } - /// Logging - Stopwatch stopwatch; - ExecutionStatus execution_status; + MergeTreeData::Transaction transaction; MergeTreeData::MutableDataPartPtr part; + /// Logging + Stopwatch stopwatch; auto write_part_log = [&] (const ExecutionStatus & execution_status) { try @@ -1242,7 +1240,6 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre { if (MergeTreeDataPartChecksums::isBadChecksumsErrorCode(e.code())) { - do_fetch = true; transaction.rollback(); ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica); @@ -1261,13 +1258,14 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre "We will download merged part from replica to force byte-identical result."); write_part_log(ExecutionStatus::fromCurrentException()); - return; + + return false; } throw; } - /** Removing old chunks from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts. + /** Removing old parts from ZK and from the disk is delayed - see ReplicatedMergeTreeCleanupThread, clearOldParts. */ /** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts. @@ -1277,6 +1275,125 @@ void StorageReplicatedMergeTree::tryExecuteMerge(const StorageReplicatedMergeTre ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges); write_part_log({}); + + return true; + } + catch (...) + { + write_part_log(ExecutionStatus::fromCurrentException()); + throw; + } +} + + +bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedMergeTree::LogEntry & entry) +{ + const String & source_part_name = entry.parts_to_merge.at(0); + LOG_TRACE(log, "Executing log entry to mutate part " << source_part_name << " to " << entry.new_part_name); + + MergeTreeData::DataPartPtr source_part = data.getActiveContainingPart(source_part_name); + if (!source_part) + { + LOG_DEBUG(log, "Source part for " << entry.new_part_name << " is not ready; will try to fetch it instead"); + return false; + } + + if (source_part->name != source_part_name) + { + throw Exception("Part " + source_part_name + " is covered by " + source_part->name + + " but should be mutated to " + entry.new_part_name + ". This is a bug.", + ErrorCodes::LOGICAL_ERROR); + } + + /// TODO - some better heuristic? + size_t estimated_space_for_result = source_part->bytes_on_disk; + + if (entry.create_time + data.settings.prefer_fetch_merged_part_time_threshold.totalSeconds() <= time(nullptr) + && estimated_space_for_result >= data.settings.prefer_fetch_merged_part_size_threshold) + { + /// If entry is old enough, and have enough size, and some replica has the desired part, + /// then prefer fetching from replica. + String replica = findReplicaHavingPart(entry.new_part_name, true); /// NOTE excessive ZK requests for same data later, may remove. + if (!replica.empty()) + { + LOG_DEBUG(log, "Prefer to fetch " << entry.new_part_name << " from replica " << replica); + return false; + } + } + + MergeTreePartInfo new_part_info = MergeTreePartInfo::fromPartName( + entry.new_part_name, data.format_version); + MutationCommands commands = queue.getMutationCommands(source_part->info, new_part_info.version); + + /// Can throw an exception. + DiskSpaceMonitor::ReservationPtr reserved_space = DiskSpaceMonitor::reserve(full_path, estimated_space_for_result); + + auto table_lock = lockStructure(false, __PRETTY_FUNCTION__); + + MergeTreeData::MutableDataPartPtr new_part; + MergeTreeData::Transaction transaction; + + /// Logging + Stopwatch stopwatch; + auto write_part_log = [&] (const ExecutionStatus & /* execution_status */) + { + try + { + /// TODO + return; + } + catch (...) + { + tryLogCurrentException(log, __PRETTY_FUNCTION__); + } + }; + + try + { + MergeTreeMutation mutation_executor(data, new_part_info.version, commands.commands); + new_part = mutation_executor.executeOnPart(source_part, context); + + if (new_part) + data.renameTempPartAndReplace(new_part, nullptr, &transaction); + else + throw Exception("The part was fully deleted, this case is not implemented yet", + ErrorCodes::NOT_IMPLEMENTED); /// TODO + + try + { + checkPartChecksumsAndCommit(transaction, new_part); + } + catch (const Exception & e) + { + if (MergeTreeDataPartChecksums::isBadChecksumsErrorCode(e.code())) + { + transaction.rollback(); + + ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica); + + LOG_ERROR(log, getCurrentExceptionMessage(false) << ". " + "Data after mutation is not byte-identical to data on another replicas. " + "We will download merged part from replica to force byte-identical result."); + + write_part_log(ExecutionStatus::fromCurrentException()); + + return false; + } + + throw; + } + + /// TODO immediately delete the old part so that it doesn't waste space. + + /** With `ZSESSIONEXPIRED` or `ZOPERATIONTIMEOUT`, we can inadvertently roll back local changes to the parts. + * This is not a problem, because in this case the entry will remain in the queue, and we will try again. + */ + merge_selecting_event.set(); + /// TODO metrics (ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges);) + + write_part_log({}); + + return true; } catch (...) { diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 8b86c219b2d..cefc4040f1e 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -359,7 +359,9 @@ private: void executeDropRange(const LogEntry & entry); /// Do the merge or recommend to make the fetch instead of the merge - void tryExecuteMerge(const LogEntry & entry, bool & do_fetch); + bool tryExecuteMerge(const LogEntry & entry); + + bool tryExecutePartMutation(const LogEntry & entry); bool executeFetch(const LogEntry & entry);