From 526428ffe0f58782d5fdd8c4f9cf3952d0098783 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 12 Feb 2020 13:00:24 +0300 Subject: [PATCH] Better --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 18 +++++++------- .../MergeTree/ReplicatedQueueAlterState.h | 24 ++++++++++++------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 6b0322d1e7e..1a1e7fd4aee 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -288,7 +288,7 @@ void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name) { MutationStatus & status = *it->second; - LOG_DEBUG(log, "Removing part name:" << part_name << " from mutation:" << status.entry->znode_name); + LOG_DEBUG(log, "Removing part name:" << part_name << " from mutation:" << status.entry->znode_name << " block number :" << status.entry->block_numbers.begin()->second); status.parts_to_do.removePartAndCoveredParts(part_name); if (status.parts_to_do.size() == 0) some_mutations_are_probably_done = true; @@ -995,19 +995,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( MergeTreeData & data, std::lock_guard & state_lock) const { - if (entry.type == LogEntry::GET_PART) - { - if (!entry.actual_new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.actual_new_part_name, format_version, state_lock)) - return false; - - if (!entry.new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock)) - return false; - } - if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::MUTATE_PART) { + if (!entry.actual_new_part_name.empty() + && !alter_sequence.canExecuteGetEntry(entry.actual_new_part_name, format_version, state_lock)) + return false; + + if (!entry.new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock)) + return false; + for (const String & new_part_name : entry.getBlockingPartNames()) { if (!isNotCoveredByFuturePartsImpl(new_part_name, out_postpone_reason, state_lock)) diff --git a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h index 7e10a81247c..d065ee0a350 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h +++ b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h @@ -55,7 +55,7 @@ public: void addMetadataAlter(int alter_version, std::lock_guard & /*state_lock*/) { - LOG_DEBUG(log, "Adding meta with alter version:" << alter_version); + //LOG_DEBUG(log, "Adding meta with alter version:" << alter_version); if (!queue_state.count(alter_version)) queue_state.emplace(alter_version, AlterInQueue({}, false)); else @@ -73,14 +73,17 @@ public: for (const auto & [block_number, state] : queue_state) { - LOG_DEBUG(log, "Looking at block:" << block_number << " with part name:" << part_name); - if (state.block_numbers.count(info.partition_id)) + LOG_DEBUG(log, "Looking at alter:" << block_number << " with part name:" << part_name); + if (!state.block_numbers.empty()) { - LOG_DEBUG(log, "Block number:" << block_number << " has part name " << part_name << " version " << state.block_numbers.at(info.partition_id)); - return info.getDataVersion() < state.block_numbers.at(info.partition_id); + LOG_DEBUG(log, "Block number:" << block_number << " has part name " << part_name << " version " << state.block_numbers.at(info.partition_id) << " metadata is done:" << state.metadata_finished); + if (!state.metadata_finished) + return info.getDataVersion() < state.block_numbers.at(info.partition_id); + else + return info.getDataVersion() <= state.block_numbers.at(info.partition_id); } } - LOG_DEBUG(log, "Nobody has block number for part " << part_name); + //LOG_DEBUG(log, "Nobody has block number for part " << part_name); return true; } @@ -95,10 +98,11 @@ public: if (queue_state.begin()->first != alter_version) { - LOG_DEBUG(log, "Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first)); + //LOG_DEBUG(log, "Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first)); throw Exception("Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first), ErrorCodes::LOGICAL_ERROR); } + LOG_DEBUG(log, "FINISH METADATA ALTER: " << alter_version); if (!have_data_alter) { queue_state.erase(alter_version); @@ -115,12 +119,14 @@ public: /// queue can be empty after load of finished mutation without move of mutation pointer if (queue_state.empty()) { - LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE EMPTY"); + //LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE EMPTY"); return; } LOG_DEBUG(log, "FINISH DATA ALTER: " << alter_version); + if (!queue_state.count(alter_version)) + std::terminate(); queue_state.erase(alter_version); } @@ -133,6 +139,8 @@ public: } if (alter_version < queue_state.begin()->first) return true; + if (!queue_state.count(alter_version)) + std::terminate(); return queue_state.at(alter_version).metadata_finished; } bool canExecuteMetaAlter(int alter_version, std::lock_guard & /*state_lock*/) const