diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAltersSequence.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAltersSequence.cpp new file mode 100644 index 00000000000..a5efab2bbb3 --- /dev/null +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAltersSequence.cpp @@ -0,0 +1,84 @@ +#include +#include + +namespace DB +{ + +int ReplicatedMergeTreeAltersSequence::getHeadAlterVersion(std::lock_guard & /*state_lock*/) const +{ + /// If queue empty, than we don't have version + if (!queue_state.empty()) + return queue_state.begin()->first; + return -1; +} + +void ReplicatedMergeTreeAltersSequence::addMutationForAlter(int alter_version, std::lock_guard & /*state_lock*/) +{ + /// Metadata alter can be added before, or + /// maybe already finished if we startup after metadata alter was finished. + if (!queue_state.count(alter_version)) + queue_state.emplace(alter_version, AlterState{.metadata_finished=true, .data_finished=false}); + else + queue_state[alter_version].data_finished = false; +} + +void ReplicatedMergeTreeAltersSequence::addMetadataAlter( + int alter_version, bool have_mutation, std::lock_guard & /*state_lock*/) +{ + if (!queue_state.count(alter_version)) + queue_state.emplace(alter_version, AlterState{.metadata_finished=false, .data_finished=!have_mutation}); + else /// Data alter can be added before. + queue_state[alter_version].metadata_finished = false; +} + +void ReplicatedMergeTreeAltersSequence::finishMetadataAlter(int alter_version, std::unique_lock & /*state_lock*/) +{ + assert(!queue_state.empty()); + assert(queue_state.begin()->first == alter_version); + + /// If metadata stage finished (or was never added) than we can remove this alter + if (queue_state[alter_version].data_finished) + queue_state.erase(alter_version); + else + queue_state[alter_version].metadata_finished = true; +} + +void ReplicatedMergeTreeAltersSequence::finishDataAlter(int alter_version, std::lock_guard & /*state_lock*/) +{ + /// queue can be empty after load of finished mutation without move of mutation pointer + if (queue_state.empty()) + return; + + /// Mutations may finish multiple times (for example, after server restart, before update of mutation pointer) + if (alter_version >= queue_state.begin()->first) + { + /// All alter versions bigger than head have to be present in queue. + assert(queue_state.count(alter_version)); + if (queue_state[alter_version].metadata_finished) + queue_state.erase(alter_version); + else + queue_state[alter_version].data_finished = true; + } +} + +bool ReplicatedMergeTreeAltersSequence::canExecuteDataAlter(int alter_version, std::lock_guard & /*state_lock*/) const +{ + if (queue_state.empty()) + return true; + + /// All versions smaller than head, can be executed + if (alter_version < queue_state.begin()->first) + return true; + + return queue_state.at(alter_version).metadata_finished; +} + +bool ReplicatedMergeTreeAltersSequence::canExecuteMetaAlter(int alter_version, std::lock_guard & /*state_lock*/) const +{ + if (queue_state.empty()) + return true; + + /// We can execute only alters of metadata which are in head. + return queue_state.begin()->first == alter_version; +} +} diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAltersSequence.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAltersSequence.h new file mode 100644 index 00000000000..85d4b2a9b96 --- /dev/null +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeAltersSequence.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ +/// ALTERs in StorageReplicatedMergeTree have to be executed sequentially (one by one). +/// But ReplicatedMergeTreeQueue execute all entries almost concurrently. The only depency between +/// entries is data parts, but they are not suitable in alters case. +/// +/// This class stores information about current alters in ReplicatedMergeTreeQueue, and control their order of execution. +/// All methods have to be called under ReplicatedMergeTreeQueue state lock. +class ReplicatedMergeTreeAltersSequence +{ +private: + /// In general case alter consist of two stages + /// Alter data and alter metadata. First we alter storage metadata + /// and then we can apply corresponding data changes (MUTATE_PART). + /// After that, we can remove alter from this sequence (alter is processed). + struct AlterState + { + bool metadata_finished = false; + bool data_finished = false; + }; + +private: + /// alter_version -> AlterState. + std::map queue_state; + +public: + + /// Add mutation for alter (alter data stage). + void addMutationForAlter(int alter_version, std::lock_guard & /*state_lock*/); + + /// Add metadata for alter (alter metadata stage). If have_mutation=true, than we expect, that + /// corresponding mutation will be added. + void addMetadataAlter(int alter_version, bool have_mutation, std::lock_guard & /*state_lock*/); + + /// Finish metadata alter. If corresponding data alter finished, than we can remove + /// alter from sequence. + void finishMetadataAlter(int alter_version, std::unique_lock & /*state_lock*/); + + /// Finish data alter. If corresponding metadata alter finished, than we can remove + /// alter from sequence. + void finishDataAlter(int alter_version, std::lock_guard & /*state_lock*/); + + /// Check that we can execute this data alter. If it's metadata stage finished. + bool canExecuteDataAlter(int alter_version, std::lock_guard & /*state_lock*/) const; + + /// Check that we can execute metadata alter with version. + bool canExecuteMetaAlter(int alter_version, std::lock_guard & /*state_lock*/) const; + + /// Just returns smallest alter version in sequence (first entry) + int getHeadAlterVersion(std::lock_guard & /*state_lock*/) const; +}; + +} diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h index 432cca5e906..2449daeb8ce 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h @@ -17,7 +17,7 @@ class WriteBuffer; /// in patitions. We will mutatate all parts with left number less than this numbers. /// /// These entries processed separately from main replication /log, and produce other entries -/// -- MUTATE_PART. +/// -- MUTATE_PART in main replication log. struct ReplicatedMergeTreeMutationEntry { void writeText(WriteBuffer & out) const; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 2a49a366bbb..36915301d14 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -152,7 +152,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked( if (entry->type == LogEntry::ALTER_METADATA) { LOG_TRACE(log, "Adding alter metadata version " << entry->alter_version << " to the queue"); - alter_chain.addMetadataAlter(entry->alter_version, entry->have_mutation, state_lock); + alter_sequence.addMetadataAlter(entry->alter_version, entry->have_mutation, state_lock); } } @@ -231,7 +231,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( if (entry->type == LogEntry::ALTER_METADATA) { LOG_TRACE(log, "Finishing metadata alter with version " << entry->alter_version); - alter_chain.finishMetadataAlter(entry->alter_version, state_lock); + alter_sequence.finishMetadataAlter(entry->alter_version, state_lock); } } else @@ -701,7 +701,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C if (entry->isAlterMutation() && entry->znode_name > mutation_pointer) { LOG_TRACE(log, "Adding mutation " << entry->znode_name << " with alter version " << entry->alter_version << " to the queue"); - alter_chain.addMutationForAlter(entry->alter_version, state_lock); + alter_sequence.addMutationForAlter(entry->alter_version, state_lock); } } } @@ -746,7 +746,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation( if (entry->isAlterMutation()) { LOG_DEBUG(log, "Removed alter " << entry->alter_version << " because mutation " + entry->znode_name + " were killed."); - alter_chain.finishDataAlter(entry->alter_version, state_lock); + alter_sequence.finishDataAlter(entry->alter_version, state_lock); } mutations_by_znode.erase(it); @@ -1059,9 +1059,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( /// corresponding alter_version. if (entry.type == LogEntry::ALTER_METADATA) { - if (!alter_chain.canExecuteMetaAlter(entry.alter_version, state_lock)) + if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock)) { - int head_alter = alter_chain.getHeadAlterVersion(state_lock); + int head_alter = alter_sequence.getHeadAlterVersion(state_lock); out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version) + " because another alter " + std::to_string(head_alter) + " must be executed before"; @@ -1072,9 +1072,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( /// If this MUTATE_PART is part of alter modify/drop query, than we have to execute them one by one if (entry.isAlterMutation()) { - if (!alter_chain.canExecuteDataAlter(entry.alter_version, state_lock)) + if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock)) { - int head_alter = alter_chain.getHeadAlterVersion(state_lock); + int head_alter = alter_sequence.getHeadAlterVersion(state_lock); if (head_alter == entry.alter_version) out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) + " because metadata still not altered"; @@ -1182,7 +1182,8 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP if (shouldExecuteLogEntry(**it, (*it)->postpone_reason, merger_mutator, data, lock)) { entry = *it; - /// We gave a chance for the entry, move it to the tail of the queue + /// We gave a chance for the entry, move it to the tail of the queue, after that + /// we move it to the end of the queue. queue.splice(queue.end(), queue, it); break; } @@ -1209,6 +1210,8 @@ bool ReplicatedMergeTreeQueue::processEntry( try { + /// We don't have any backoff for failed entries + /// we just count amount of tries for each ot them. if (func(entry)) removeProcessedEntry(get_zookeeper(), entry); } @@ -1363,7 +1366,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep { LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")"); mutation.is_done = true; - alter_chain.finishDataAlter(mutation.entry->alter_version, lock); + alter_sequence.finishDataAlter(mutation.entry->alter_version, lock); if (mutation.parts_to_do.size() != 0) { LOG_INFO(log, "Seems like we jumped over mutation " << znode << " when downloaded part with bigger mutation number." @@ -1409,7 +1412,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep if (entry->isAlterMutation()) { LOG_TRACE(log, "Finishing data alter with version " << entry->alter_version << " for entry " << entry->znode_name); - alter_chain.finishDataAlter(entry->alter_version, lock); + alter_sequence.finishDataAlter(entry->alter_version, lock); } } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 9d40f1b28c0..43254d174f5 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include @@ -132,7 +132,9 @@ private: /// Provides only one simultaneous call to pullLogsToQueue. std::mutex pull_logs_to_queue_mutex; - ReplicatedQueueAlterChain alter_chain; + /// This sequence control ALTERs execution in replication queue. + /// We need it because alters have to be executed sequentially (one by one). + ReplicatedMergeTreeAltersSequence alter_sequence; /// List of subscribers /// A subscriber callback is called when an entry queue is deleted diff --git a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h deleted file mode 100644 index cfe450c9744..00000000000 --- a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h +++ /dev/null @@ -1,106 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - -class ReplicatedQueueAlterChain -{ -private: - struct AlterState - { - bool metadata_finished = false; - bool data_finished = false; - - AlterState() = default; - - AlterState(bool metadata_finished_, bool data_finished_) - : metadata_finished(metadata_finished_) - , data_finished(data_finished_) - { - } - }; - -private: - std::map queue_state; -public: - - int getHeadAlterVersion(std::lock_guard & /*state_lock*/) const - { - if (!queue_state.empty()) - return queue_state.begin()->first; - return -1; - } - - void addMutationForAlter(int alter_version, std::lock_guard & /*state_lock*/) - { - if (!queue_state.count(alter_version)) - queue_state.emplace(alter_version, AlterState{true, false}); - else - queue_state[alter_version].data_finished = false; - } - - void addMetadataAlter(int alter_version, bool have_mutation, std::lock_guard & /*state_lock*/) - { - if (!queue_state.count(alter_version)) - queue_state.emplace(alter_version, AlterState{false, !have_mutation}); - else - queue_state[alter_version].metadata_finished = false; - } - - void finishMetadataAlter(int alter_version, std::unique_lock & /*state_lock*/) - { - assert(!queue_state.empty()); - assert(queue_state.begin()->first == alter_version); - - if (queue_state[alter_version].data_finished) - queue_state.erase(alter_version); - else - queue_state[alter_version].metadata_finished = true; - } - - void finishDataAlter(int alter_version, std::lock_guard & /*state_lock*/) - { - /// queue can be empty after load of finished mutation without move of mutation pointer - if (queue_state.empty()) - return; - - if (alter_version >= queue_state.begin()->first) - { - assert(queue_state.count(alter_version)); - if (queue_state[alter_version].metadata_finished) - queue_state.erase(alter_version); - else - queue_state[alter_version].data_finished = true; - } - } - - bool canExecuteDataAlter(int alter_version, std::lock_guard & /*state_lock*/) const - { - if (!queue_state.count(alter_version)) - return true; - - return queue_state.at(alter_version).metadata_finished; - } - - bool canExecuteMetaAlter(int alter_version, std::lock_guard & /*state_lock*/) const - { - if (queue_state.empty()) - return true; - - return queue_state.begin()->first == alter_version; - } - -}; - -}