diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index a191f77fd7a..e6c4750b4d7 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -24,7 +24,6 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & , format_version(storage.format_version) , current_parts(format_version) , virtual_parts(format_version) - , alter_sequence(&Logger::get(storage_.getStorageID().table_name)) {} @@ -140,8 +139,6 @@ void ReplicatedMergeTreeQueue::insertUnlocked( else queue.push_front(entry); - entries_in_queue.insert(entry->znode_name); - if (entry->type == LogEntry::GET_PART) { inserts_by_time.insert(entry); @@ -153,21 +150,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked( } } if (entry->type == LogEntry::ALTER_METADATA) - { - LOG_DEBUG(log, "ADDING METADATA ENTRY WITH ALTER VERSION:" << entry->alter_version); - //for (auto & log_entry : entries_in_queue) - //{ - // LOG_DEBUG(log, "LogEntry:" << log_entry); - //} alter_sequence.addMetadataAlter(entry->alter_version, state_lock); - } - - if (entry->type == LogEntry::MUTATE_PART && entry->alter_version != -1) - { - LOG_DEBUG(log, "ADDING DATA ENTRY WITH ALTER VERSION:" << entry->alter_version << " FOR PART:" << entry->source_parts[0] << " to " << entry->getVirtualPartNames()[0]); - //LOG_DEBUG(log, "ADDING DATA ENTRY WITH ALTER VERSION:" << entry->alter_version); - //std::cerr << "INSERT MUTATE PART:" << entry->alter_version << std::endl; - } } @@ -243,16 +226,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( } if (entry->type == LogEntry::ALTER_METADATA) - { - LOG_DEBUG(log, "FIN ALTER FOR PART with ALTER VERSION:" << entry->alter_version); - //std::cerr << "Alter have mutation:" << entry->have_mutation << std::endl; alter_sequence.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock); - - } - if (entry->type == LogEntry::MUTATE_PART) - { - LOG_DEBUG(log, "FIN MUTATION FOR PART:" << entry->source_parts[0] << " with ALTER VERSION:" << entry->alter_version); - } } else { @@ -260,25 +234,20 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( { /// Because execution of the entry is unsuccessful, `virtual_part_name` will never appear /// so we won't need to mutate it. - LOG_DEBUG(log, "REMOVING PART FROM MUTATIONS:" << virtual_part_name); removePartFromMutations(virtual_part_name); } } - entries_in_queue.erase(entry->znode_name); } void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name) { - LOG_DEBUG(log, "Removing part from mutations:" << part_name); + //LOG_DEBUG(log, "Removing part from mutations:" << part_name); auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); auto in_partition = mutations_by_partition.find(part_info.partition_id); if (in_partition == mutations_by_partition.end()) - { - LOG_DEBUG(log, "Not found partition in mutations for part:" << part_name); return; - } bool some_mutations_are_probably_done = false; @@ -287,7 +256,7 @@ void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name) { MutationStatus & status = *it->second; - LOG_DEBUG(log, "Removing part name:" << part_name << " from mutation:" << status.entry->znode_name << " block number :" << status.entry->block_numbers.begin()->second); + //LOG_DEBUG(log, "Removing part name:" << part_name << " from mutation:" << status.entry->znode_name << " block number :" << status.entry->block_numbers.begin()->second); status.parts_to_do.removePartAndCoveredParts(part_name); if (status.parts_to_do.size() == 0) some_mutations_are_probably_done = true; @@ -626,7 +595,6 @@ Names ReplicatedMergeTreeQueue::getCurrentPartNamesToMutate(ReplicatedMergeTreeM void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, Coordination::WatchCallback watch_callback) { std::lock_guard lock(update_mutations_mutex); - //std::cerr << "UPdating mutations\n"; Strings entries_in_zk = zookeeper->getChildrenWatch(zookeeper_path + "/mutations", nullptr, watch_callback); StringSet entries_in_zk_set(entries_in_zk.begin(), entries_in_zk.end()); @@ -696,7 +664,6 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations) { - LOG_DEBUG(log, "PROCESSING MUTATION:" << entry->znode_name); auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version)) .first->second; @@ -729,7 +696,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C some_mutations_are_probably_done = true; if (entry->alter_version != -1) - alter_sequence.addMutationForAlter(entry->alter_version, entry->block_numbers, state_lock); + alter_sequence.addMutationForAlter(entry->alter_version, state_lock); } } @@ -995,13 +962,6 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( || entry.type == LogEntry::GET_PART || entry.type == LogEntry::MUTATE_PART) { - //if (!entry.actual_new_part_name.empty() - // && !alter_sequence.canExecuteGetEntry(entry.actual_new_part_name, format_version, state_lock)) - // return false; - - //if (!entry.new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock)) - // return false; - for (const String & new_part_name : entry.getBlockingPartNames()) { if (!isNotCoveredByFuturePartsImpl(new_part_name, out_postpone_reason, state_lock)) @@ -1082,50 +1042,21 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( } if (entry.type == LogEntry::ALTER_METADATA) - { //std::cerr << "Should we execute alter:"; - - - LOG_DEBUG(log, "Should we execute alter entry:" << entry.znode_name << "\n"<< entry.toString()); - LOG_DEBUG(log, "We are in front:" << (entry.znode_name == *entries_in_queue.begin())); - - for (auto & log_entry : entries_in_queue) - { - LOG_DEBUG(log, "LogEntry:" << log_entry); - } - for (auto & log_entry : queue) - { - LOG_DEBUG(log, "LogEntryData:" << log_entry->znode_name << "\n" << log_entry->toString()); - } - - //std::cerr << alter_sequence.canExecuteMetadataAlter(entry.alter_version, state_lock) << std::endl; + { if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock)) { - out_postpone_reason - = "Cannot execute alter metadata with because head smallest node is " + *entries_in_queue.begin() + " but we are " + entry.znode_name; + out_postpone_reason = "Alter is not started, because more old alter is executing right now"; return false; } - else - { - LOG_DEBUG(log, "YESSS"); - } } if (entry.type == LogEntry::MUTATE_PART && entry.alter_version != -1) { - //std::cerr << "Should we execute mutation:"; - //std::cerr << alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock) << std::endl; - - LOG_DEBUG(log, "Should we execute mutation entry:" << entry.znode_name << "\n" << entry.toString()); if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock)) { - LOG_DEBUG(log, "NOOOO"); - out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version); + out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) + " because metadata alter is not finished yet"; return false; } - else - { - LOG_DEBUG(log, "YUESS"); - } } return true; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index aa065e7bc33..9ce9ba9d40c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -73,8 +73,6 @@ private: */ Queue queue; - StringSet entries_in_queue; - InsertsByTime inserts_by_time; time_t min_unprocessed_insert_time = 0; time_t max_processed_insert_time = 0; @@ -348,12 +346,6 @@ public: /// Locks queue's mutex. bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason); - bool checkCanFetchPart(const String & part_name) - { - std::lock_guard lock(state_mutex); - return alter_sequence.canExecuteGetEntry(part_name, format_version, lock); - } - /// A blocker that stops selects from the queue ActionBlocker actions_blocker; diff --git a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h index 10be2059be0..a13b20a44a0 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h +++ b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h @@ -19,128 +19,72 @@ class AlterSequence private: struct AlterInQueue { - std::map block_numbers; bool metadata_finished = false; + bool data_finished = false; AlterInQueue() = default; - AlterInQueue(const std::map & block_numbers_, bool metadata_finished_) - : block_numbers(block_numbers_) - , metadata_finished(metadata_finished_) + AlterInQueue(bool metadata_finished_, bool data_finished_) + : metadata_finished(metadata_finished_) + , data_finished(data_finished_) { } }; - Poco::Logger * log; - -public: - AlterSequence(Poco::Logger * log_) - : log(log_) - { - } std::map queue_state; +public: bool empty() const { return queue_state.empty(); } - void addMutationForAlter(int alter_version, const std::map & block_numbers, std::lock_guard & /*state_lock*/) + void addMutationForAlter(int alter_version, std::lock_guard & /*state_lock*/) { - LOG_DEBUG(log, "Adding mutation with alter version:" << alter_version); - if (queue_state.count(alter_version)) - queue_state[alter_version].block_numbers = block_numbers; + if (!queue_state.count(alter_version)) + queue_state.emplace(alter_version, AlterInQueue(true, false)); else - queue_state.emplace(alter_version, AlterInQueue(block_numbers, true)); + queue_state[alter_version].data_finished = false; } void addMetadataAlter(int alter_version, std::lock_guard & /*state_lock*/) { - //LOG_DEBUG(log, "Adding meta with alter version:" << alter_version); if (!queue_state.count(alter_version)) - queue_state.emplace(alter_version, AlterInQueue({}, false)); + queue_state.emplace(alter_version, AlterInQueue(false, true)); else queue_state[alter_version].metadata_finished = false; } - bool canExecuteGetEntry(const String & part_name, MergeTreeDataFormatVersion format_version, std::lock_guard & /*state_lock*/) const + void finishMetadataAlter(int alter_version, bool have_mutation, std::unique_lock & /*state_lock*/) { - if (empty()) - return true; + assert(!queue_state.empty()); + assert(queue_state.begin()->first == alter_version); - MergeTreePartInfo info = MergeTreePartInfo::fromPartName(part_name, format_version); - LOG_DEBUG(log, "Checking can fetch:" << part_name); - - for (const auto & [block_number, state] : queue_state) - { - - LOG_DEBUG(log, "Looking at alter:" << block_number << " with part name:" << part_name); - if (!state.block_numbers.empty()) - { - LOG_DEBUG(log, "Block number:" << block_number << " has part name " << part_name << " version " << state.block_numbers.at(info.partition_id) << " metadata is done:" << state.metadata_finished); - if (!state.metadata_finished) - return info.getDataVersion() < state.block_numbers.at(info.partition_id) && info.max_block < state.block_numbers.at(info.partition_id); - else - return info.getDataVersion() <= state.block_numbers.at(info.partition_id) && info.max_block <= state.block_numbers.at(info.partition_id); - } - } - //LOG_DEBUG(log, "Nobody has block number for part " << part_name); - return true; - - } - - void finishMetadataAlter(int alter_version, bool have_data_alter, std::unique_lock & /*state_lock*/) - { - - if (queue_state.empty()) - { - throw Exception("Queue shouldn't be empty on metadata alter", ErrorCodes::LOGICAL_ERROR); - } - - if (queue_state.begin()->first != alter_version) - { - //LOG_DEBUG(log, "Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first)); - throw Exception("Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first), ErrorCodes::LOGICAL_ERROR); - } - - LOG_DEBUG(log, "FINISH METADATA ALTER: " << alter_version); - if (!have_data_alter) - { + if (!have_mutation) queue_state.erase(alter_version); - } else - { queue_state[alter_version].metadata_finished = true; - } } void finishDataAlter(int alter_version, std::lock_guard & /*state_lock*/) { - /// queue can be empty after load of finished mutation without move of mutation pointer if (queue_state.empty()) - { - //LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE EMPTY"); - return; - } + assert(queue_state.count(alter_version)); - LOG_DEBUG(log, "FINISH DATA ALTER: " << alter_version); - if (!queue_state.count(alter_version)) - std::terminate(); - queue_state.erase(alter_version); + if (queue_state[alter_version].metadata_finished) + queue_state.erase(alter_version); + else + queue_state[alter_version].data_finished = true; } bool canExecuteDataAlter(int alter_version, std::lock_guard & /*state_lock*/) const { - LOG_DEBUG(log, "Can execute data alter:" << alter_version); - for (auto [key, value] : queue_state) - { - LOG_DEBUG(log, "Key:" << key << " is metadata finished:" << value.metadata_finished); - } if (!queue_state.count(alter_version)) return true; return queue_state.at(alter_version).metadata_finished; } + bool canExecuteMetaAlter(int alter_version, std::lock_guard & /*state_lock*/) const { return queue_state.empty() || queue_state.begin()->first == alter_version; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 1996f33867f..a13fd8ead07 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -777,16 +777,6 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil: if (part_name.empty()) part_name = part->name; - //try - //{ - // check(part->columns); - //} - //catch (...) - //{ - // LOG_DEBUG(log, "EXCEPTION ADDING PART:" << part_name << " VERSION:" << getMetadataVersion()); - // throw; - //} - auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums( part->columns, part->checksums); @@ -2617,12 +2607,8 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entr String largest_part_found; Strings parts = zookeeper->getChildren(zookeeper_path + "/replicas/" + replica + "/parts"); - bool source_part_found = false; for (const String & part_on_replica : parts) { - if (part_on_replica == entry.new_part_name) - source_part_found = true; - if (part_on_replica == entry.new_part_name || MergeTreePartInfo::contains(part_on_replica, entry.new_part_name, format_version)) { @@ -2642,19 +2628,11 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entr if (!the_same_part) { String reject_reason; - if (queue.checkCanFetchPart(largest_part_found)) + if (!queue.addFuturePartIfNotCoveredByThem(largest_part_found, entry, reject_reason)) { - if (!queue.addFuturePartIfNotCoveredByThem(largest_part_found, entry, reject_reason)) - { - LOG_INFO(log, "Will not fetch part " << largest_part_found << " covering " << entry.new_part_name << ". " << reject_reason); - return {}; - } + LOG_INFO(log, "Will not fetch part " << largest_part_found << " covering " << entry.new_part_name << ". " << reject_reason); + return {}; } - if (source_part_found) - return replica; - else - LOG_INFO( - log, "NOT FOUND ANYTHING"); } else { diff --git a/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh b/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh index 572beb88389..c5488a1d3e6 100755 --- a/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh +++ b/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh @@ -74,7 +74,7 @@ export -f correct_alter_thread; export -f insert_thread; -TIMEOUT=120 +TIMEOUT=500 timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null & @@ -103,7 +103,8 @@ done for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i" $CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_mt_$i" $CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done=0" # all mutations have to be done - #$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i" + $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i" done