From 260a4687f05030a6f7505752705288496c6b7191 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 10 Feb 2020 19:55:09 +0300 Subject: [PATCH] Something working --- .../ReplicatedMergeTreeBlockOutputStream.cpp | 1 + .../MergeTree/ReplicatedMergeTreeQueue.cpp | 33 ++-- .../MergeTree/ReplicatedQueueAlterState.h | 150 ++++++------------ .../Storages/StorageReplicatedMergeTree.cpp | 28 ++-- 4 files changed, 88 insertions(+), 124 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 0442d964e5e..bddf8434e1c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -256,6 +256,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo zkutil::CreateMode::PersistentSequential)); ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/columns", storage.getMetadataVersion())); + ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/alter_intention_counter", "", -1)); /// Deletes the information that the block number is used for writing. block_number_lock->getUnlockOps(ops); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 7c7ab2644f4..8bfa974ce45 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -167,7 +167,6 @@ void ReplicatedMergeTreeQueue::insertUnlocked( LOG_DEBUG(log, "ADDING DATA ENTRY WITH ALTER VERSION:" << entry->alter_version << " FOR PART:" << entry->source_parts[0] << " to " << entry->getVirtualPartNames()[0]); //LOG_DEBUG(log, "ADDING DATA ENTRY WITH ALTER VERSION:" << entry->alter_version); //std::cerr << "INSERT MUTATE PART:" << entry->alter_version << std::endl; - alter_sequence.addDataAlterIfEmpty(entry->alter_version, state_lock); } } @@ -246,10 +245,10 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( if (entry->type == LogEntry::ALTER_METADATA) { + LOG_DEBUG(log, "FIN ALTER FOR PART with ALTER VERSION:" << entry->alter_version); //std::cerr << "Alter have mutation:" << entry->have_mutation << std::endl; alter_sequence.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock); - LOG_DEBUG(log, "FIN ALTER FOR PART with ALTER VERSION:" << entry->alter_version); } if (entry->type == LogEntry::MUTATE_PART) { @@ -726,6 +725,9 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C if (mutation.parts_to_do.size() == 0) some_mutations_are_probably_done = true; + + if (entry->alter_version != -1) + alter_sequence.addMutationForAlter(entry->alter_version, entry->block_numbers, state_lock); } } @@ -987,6 +989,12 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( MergeTreeData & data, std::lock_guard & state_lock) const { + if (entry.type == LogEntry::GET_PART) + { + if (!alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock)) + return false; + } + if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::GET_PART || entry.type == LogEntry::MUTATE_PART) @@ -1074,20 +1082,23 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( { //std::cerr << "Should we execute alter:"; - LOG_DEBUG(log, "Should we execute alter entry:" << entry.toString()); + LOG_DEBUG(log, "Should we execute alter entry:" << entry.znode_name << "\n"<< entry.toString()); LOG_DEBUG(log, "We are in front:" << (entry.znode_name == *entries_in_queue.begin())); + for (auto & log_entry : entries_in_queue) { LOG_DEBUG(log, "LogEntry:" << log_entry); } + for (auto & log_entry : queue) + { + LOG_DEBUG(log, "LogEntryData:" << log_entry->znode_name << "\n" << log_entry->toString()); + } //std::cerr << alter_sequence.canExecuteMetadataAlter(entry.alter_version, state_lock) << std::endl; - if (!alter_sequence.canExecuteMetadataAlter(entry.alter_version, state_lock) || *entries_in_queue.begin() != entry.znode_name) + if (*entries_in_queue.begin() != entry.znode_name || !alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock)) { - LOG_DEBUG(log, "No we shouldn't"); - out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version) - + " because current head is " + std::to_string(alter_sequence.queue.front().alter_version) - + " with state: " + std::to_string(alter_sequence.queue.front().state); + out_postpone_reason + = "Cannot execute alter metadata with because head smallest node is " + *entries_in_queue.begin() + " but we are " + entry.znode_name; return false; } else @@ -1101,13 +1112,11 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( //std::cerr << "Should we execute mutation:"; //std::cerr << alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock) << std::endl; - LOG_DEBUG(log, "Should we execute mutation entry:" << entry.toString()); + LOG_DEBUG(log, "Should we execute mutation entry:" << entry.znode_name << "\n" << entry.toString()); if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock)) { LOG_DEBUG(log, "NOOOO"); - out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) - + " because current head is " + std::to_string(alter_sequence.queue.front().alter_version) - + " with state: " + std::to_string(alter_sequence.queue.front().state); + out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version); return false; } else diff --git a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h index a6a4b44da83..b359ce0dc14 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h +++ b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB { @@ -16,22 +17,16 @@ namespace ErrorCodes class AlterSequence { private: - enum AlterState - { - APPLY_METADATA_CHANGES, - APPLY_DATA_CHANGES, - DATA_CHANGES_NOT_NEEDED, - }; - struct AlterInQueue { - int alter_version; - AlterState state; std::map block_numbers; + bool metadata_finished = false; - AlterInQueue(int alter_version_, AlterState state_) - : alter_version(alter_version_) - , state(state_) + AlterInQueue() = default; + + AlterInQueue(const std::map & block_numbers_, bool metadata_finished_) + : block_numbers(block_numbers_) + , metadata_finished(metadata_finished_) { } }; @@ -43,65 +38,60 @@ public: : log(log_) { } - std::deque queue; + std::map queue_state; bool empty() const { - return queue.empty(); + return queue_state.empty(); + } + + void addMutationForAlter(int alter_version, const std::map & block_numbers, std::lock_guard & /*state_lock*/) + { + LOG_DEBUG(log, "Adding mutation with alter version:" << alter_version); + queue_state.emplace(alter_version, AlterInQueue(block_numbers, true)); } void addMetadataAlter(int alter_version, std::lock_guard & /*state_lock*/) { - if (!queue.empty() && queue.front().alter_version > alter_version) - { - throw Exception("Alter not in order " + std::to_string(alter_version), ErrorCodes::LOGICAL_ERROR); - } - queue.emplace_back(alter_version, AlterState::APPLY_METADATA_CHANGES); + LOG_DEBUG(log, "Adding meta with alter version:" << alter_version); + if (!queue_state.count(alter_version)) + queue_state.emplace(alter_version, AlterInQueue({}, false)); + else + queue_state[alter_version].metadata_finished = false; } - void addDataAlterIfEmpty(int alter_version, std::lock_guard & /*state_lock*/) + bool canExecuteGetEntry(const String & part_name, MergeTreeDataFormatVersion format_version, std::lock_guard & /*state_lock*/) const { - if (queue.empty()) - queue.emplace_back(alter_version, AlterState::APPLY_DATA_CHANGES); + if (empty()) + return true; + + MergeTreePartInfo info = MergeTreePartInfo::fromPartName(part_name, format_version); + if (queue_state.begin()->second.block_numbers.count(info.partition_id)) + return info.getDataVersion() < queue_state.begin()->second.block_numbers.at(info.partition_id); + return true; - //if (queue.front().alter_version != alter_version) - //{ - // throw Exception( - // "Alter head has another version number " - // + std::to_string(queue.front().alter_version) + " than ours " + std::to_string(alter_version), - // ErrorCodes::LOGICAL_ERROR); - //} } void finishMetadataAlter(int alter_version, bool have_data_alter, std::unique_lock & /*state_lock*/) { - if (queue.empty()) + if (queue_state.empty()) { throw Exception("Queue shouldn't be empty on metadata alter", ErrorCodes::LOGICAL_ERROR); } - LOG_DEBUG( - log, - "FINISHING METADATA ALTER WITH VERSION:" << alter_version << " AND HAVE DATA ALTER: " << have_data_alter - << " QUEUE HEAD:" << queue.front().alter_version << " state:" << queue.front().state); - if (queue.front().alter_version != alter_version) - throw Exception("Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue.front().alter_version), ErrorCodes::LOGICAL_ERROR); - - if (have_data_alter && queue.front().state == AlterState::APPLY_METADATA_CHANGES) + if (queue_state.begin()->first != alter_version) { - LOG_DEBUG( - log, - "FINISHING METADATA ALTER WITH VERSION:" << alter_version << " AND SWITCHING QUEUE STATE"); + LOG_DEBUG(log, "Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first)); + throw Exception("Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first), ErrorCodes::LOGICAL_ERROR); + } - //std::cerr << "Switching head state:" << AlterState::APPLY_DATA_CHANGES << std::endl; - queue.front().state = AlterState::APPLY_DATA_CHANGES; + if (!have_data_alter) + { + queue_state.erase(alter_version); } else { - LOG_DEBUG(log, "FINISHING METADATA ALTER WITH VERSION:" << alter_version << " AND DOING POP"); - - //std::cerr << "JUST POP FRONT\n"; - queue.pop_front(); + queue_state[alter_version].metadata_finished = true; } } @@ -109,69 +99,29 @@ public: { /// queue can be empty after load of finished mutation without move of mutation pointer - if (queue.empty()) + if (queue_state.empty()) { LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE EMPTY"); return; } - //std::cerr << "Finishing data alter:" << alter_version << std::endl; - if (queue.front().alter_version != alter_version) - { - for (auto & state : queue) - { - if (state.alter_version == alter_version) - { - LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT HEAD IS NOT SAME SO MAKE DATA_CHANGED_NOT_NEEDED"); - state.state = AlterState::DATA_CHANGES_NOT_NEEDED; - return; - } - } - } - //if (queue.front().alter_version != alter_version) - //{ - // LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE VERSION IS " << queue.front().alter_version << " state " << queue.front().state); - // throw Exception( - // "Finished data alter with version " + std::to_string(alter_version) + " but current alter in queue is " - // + std::to_string(queue.front().alter_version), - // ErrorCodes::LOGICAL_ERROR); - //} - if (queue.front().state != AlterState::APPLY_DATA_CHANGES) - { - LOG_DEBUG( - log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT STATE IS METADATA"); - queue.front().state = AlterState::DATA_CHANGES_NOT_NEEDED; - return; - } - - LOG_DEBUG( - log, - "FINISHING DATA ALTER WITH VERSION:" << alter_version << " QUEUE VERSION IS " << queue.front().alter_version << " STATE " - << queue.front().state); - queue.pop_front(); - } - - bool canExecuteMetadataAlter(int alter_version, std::lock_guard & /*state_lock*/) const - { - if (queue.empty()) - throw Exception("QUEUE EMPTY ON METADATA", ErrorCodes::LOGICAL_ERROR); - LOG_DEBUG(log, "CHECK METADATADATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE HEAD IS " << queue.front().alter_version); - - //std::cerr << "Alter queue front:" << queue.front().alter_version << " state:" << queue.front().state << std::endl; - return queue.front().alter_version == alter_version; + LOG_DEBUG(log, "FINISH DATA ALTER: " << alter_version); + queue_state.erase(alter_version); } bool canExecuteDataAlter(int alter_version, std::lock_guard & /*state_lock*/) const { - if (queue.empty()) - throw Exception("QUEUE EMPTY ON DATA", ErrorCodes::LOGICAL_ERROR); - //std::cerr << "Alter queue front:" << queue.front().alter_version << " state:" << queue.front().state << std::endl; - //std::cerr << "CAn execute:" << alter_version << std::endl; - //std::cerr << "FRont version:" << queue.front().alter_version << std::endl; - //std::cerr << "State:" << queue.front().state << std::endl; - LOG_DEBUG(log, "CHECK DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE HEAD IS " << queue.front().alter_version << " state:" << queue.front().state); - return queue.front().alter_version == alter_version && queue.front().state == AlterState::APPLY_DATA_CHANGES; + LOG_DEBUG(log, "Can execute data alter:" << alter_version); + for (auto [key, value] : queue_state) + { + LOG_DEBUG(log, "Key:" << key << " is metadata finished:" << value.metadata_finished); + } + return queue_state.at(alter_version).metadata_finished; + } + bool canExecuteMetaAlter(int alter_version, std::lock_guard & /*state_lock*/) const + { + return queue_state.empty() || queue_state.begin()->first == alter_version; } }; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index d732d569b99..cc1311a8760 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -397,6 +397,7 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes() zookeeper->createIfNotExists(zookeeper_path + "/quorum", String()); zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", String()); zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", String()); + zookeeper->createIfNotExists(zookeeper_path + "/alter_intention_counter", String()); /// Tracking lag of replicas. zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", String()); @@ -3290,7 +3291,7 @@ void StorageReplicatedMergeTree::alter( { assertNotReadonly(); - LOG_DEBUG(log, "Doing ALTER"); + LOG_DEBUG(log, "Doing ALTER FROM " << metadata_version); auto maybe_mutation_commands = params.getMutationCommands(getInMemoryMetadata()); auto table_id = getStorageID(); @@ -3394,7 +3395,6 @@ void StorageReplicatedMergeTree::alter( bool have_mutation = false; std::optional lock_holder; - size_t partitions_count = 0; if (!maybe_mutation_commands.empty()) { String mutations_path = zookeeper_path + "/mutations"; @@ -3406,24 +3406,20 @@ void StorageReplicatedMergeTree::alter( Coordination::Stat mutations_stat; zookeeper->get(mutations_path, &mutations_stat); + Coordination::Stat intention_counter_stat; + zookeeper->get(zookeeper_path + "/alter_intention_counter", &intention_counter_stat); lock_holder.emplace( - zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper); + zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper); - Coordination::Stat block_numbers_version; - zookeeper->get(zookeeper_path + "/block_numbers", &block_numbers_version); + ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/alter_intention_counter", intention_counter_stat.version)); for (const auto & lock : lock_holder->getLocks()) { - Coordination::Stat partition_stat; mutation_entry.block_numbers[lock.partition_id] = lock.number; - zookeeper->get(zookeeper_path + "/block_numbers/" + lock.partition_id, &partition_stat); - ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/block_numbers/" + lock.partition_id, partition_stat.version)); - partitions_count++; + LOG_DEBUG(log, "ALLOCATED:" << lock.number << " FOR VERSION:" << metadata_version + 1); } mutation_entry.create_time = time(nullptr); - /// We have to be sure, that no inserts happened - ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/block_numbers", block_numbers_version.version)); ops.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version)); ops.emplace_back( @@ -3435,6 +3431,8 @@ void StorageReplicatedMergeTree::alter( Coordination::Responses results; int32_t rc = zookeeper->tryMulti(ops, results); + LOG_DEBUG(log, "ALTER REQUESTED TO" << entry.alter_version); + //std::cerr << "Results size:" << results.size() << std::endl; //std::cerr << "Have mutation:" << have_mutation << std::endl; @@ -3445,7 +3443,7 @@ void StorageReplicatedMergeTree::alter( { //std::cerr << "In have mutation\n"; //std::cerr << "INDEX:" << results.size() - 2 << std::endl; - String alter_path = dynamic_cast(*results[results.size() - 4 - partitions_count]).path_created; + String alter_path = dynamic_cast(*results[results.size() - 4]).path_created; //std::cerr << "Alter path:" << alter_path << std::endl; entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1); @@ -3474,7 +3472,11 @@ void StorageReplicatedMergeTree::alter( std::vector unwaited; //std::cerr << "Started wait for alter\n"; if (query_context.getSettingsRef().replication_alter_partitions_sync == 2) + { + LOG_DEBUG(log, "Start waiting for metadata alter"); unwaited = waitForAllReplicasToProcessLogEntry(entry, false); + LOG_DEBUG(log, "Finished waiting for metadata alter"); + } else if (query_context.getSettingsRef().replication_alter_partitions_sync == 1) waitForReplicaToProcessLogEntry(replica_name, entry); //std::cerr << "FInished wait for alter\n"; @@ -3487,7 +3489,9 @@ void StorageReplicatedMergeTree::alter( if (mutation_znode) { //std::cerr << "Started wait for mutation:" << *mutation_znode << std::endl; + LOG_DEBUG(log, "Start waiting for mutation"); waitMutation(*mutation_znode, query_context.getSettingsRef().replication_alter_partitions_sync); + LOG_DEBUG(log, "Finished waiting for mutation"); //std::cerr << "FInished wait for mutation\n"; } }