From 192393037987d38faf7c71dc901d50bcad7ba644 Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 5 Feb 2020 19:30:02 +0300 Subject: [PATCH] Fix mutations finalization task --- .../Storages/MergeTree/ActiveDataPartSet.h | 17 +++ .../MergeTree/ReplicatedMergeTreeQueue.cpp | 56 +++++--- .../MergeTree/ReplicatedMergeTreeQueue.h | 24 ++-- ...allel_alter_replicated_zookeeper.reference | 17 +++ ...076_parallel_alter_replicated_zookeeper.sh | 122 ++++++++++++++++++ 5 files changed, 213 insertions(+), 23 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/01076_parallel_alter_replicated_zookeeper.reference create mode 100755 dbms/tests/queries/0_stateless/01076_parallel_alter_replicated_zookeeper.sh diff --git a/dbms/src/Storages/MergeTree/ActiveDataPartSet.h b/dbms/src/Storages/MergeTree/ActiveDataPartSet.h index 17b30205bd5..beb538e2ca5 100644 --- a/dbms/src/Storages/MergeTree/ActiveDataPartSet.h +++ b/dbms/src/Storages/MergeTree/ActiveDataPartSet.h @@ -55,6 +55,18 @@ public: return remove(MergeTreePartInfo::fromPartName(part_name, format_version)); } + /// Remove part and all covered parts from active set + bool removePartAndCoveredParts(const String & part_name) + { + Strings parts_covered_by = getPartsCoveredBy(MergeTreePartInfo::fromPartName(part_name, format_version)); + bool result = true; + result &= remove(part_name); + for (const auto & part : parts_covered_by) + result &= remove(part); + + return result; + } + /// If not found, return an empty string. String getContainingPart(const MergeTreePartInfo & part_info) const; String getContainingPart(const String & name) const; @@ -66,6 +78,11 @@ public: size_t size() const; + void clear() + { + part_info_to_name.clear(); + } + MergeTreeDataFormatVersion getFormatVersion() const { return format_version; } private: diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 7fd08788704..3a869aa1e86 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -130,7 +130,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked( for (const String & virtual_part_name : entry->getVirtualPartNames()) { virtual_parts.add(virtual_part_name); - updateMutationsPartsToDo(virtual_part_name, /* add = */ true); + addPartToMutations(virtual_part_name); } /// Put 'DROP PARTITION' entries at the beginning of the queue not to make superfluous fetches of parts that will be eventually deleted @@ -200,12 +200,16 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( for (const String & virtual_part_name : entry->getVirtualPartNames()) { Strings replaced_parts; + /// In most cases we will replace only current parts, but sometimes + /// we can even replace virtual parts. For example when we failed to + /// GET source part and dowloaded merged/mutated part instead. current_parts.add(virtual_part_name, &replaced_parts); + virtual_parts.add(virtual_part_name, &replaced_parts); /// Each part from `replaced_parts` should become Obsolete as a result of executing the entry. - /// So it is one less part to mutate for each mutation with block number greater than part_info.getDataVersion() + /// So it is one less part to mutate for each mutation with block number greater or equal than part_info.getDataVersion() for (const String & replaced_part_name : replaced_parts) - updateMutationsPartsToDo(replaced_part_name, /* add = */ false); + removePartFromMutations(replaced_part_name); } String drop_range_part_name; @@ -226,13 +230,13 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( { /// Because execution of the entry is unsuccessful, `virtual_part_name` will never appear /// so we won't need to mutate it. - updateMutationsPartsToDo(virtual_part_name, /* add = */ false); + removePartFromMutations(virtual_part_name); } } } -void ReplicatedMergeTreeQueue::updateMutationsPartsToDo(const String & part_name, bool add) +void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name) { auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); auto in_partition = mutations_by_partition.find(part_info.partition_id); @@ -241,15 +245,15 @@ void ReplicatedMergeTreeQueue::updateMutationsPartsToDo(const String & part_name bool some_mutations_are_probably_done = false; - auto from_it = in_partition->second.upper_bound(part_info.getDataVersion()); + auto from_it = in_partition->second.lower_bound(part_info.getDataVersion()); for (auto it = from_it; it != in_partition->second.end(); ++it) { MutationStatus & status = *it->second; - status.parts_to_do += (add ? +1 : -1); - if (status.parts_to_do <= 0) + status.parts_to_do.removePartAndCoveredParts(part_name); + if (status.parts_to_do.size() == 0) some_mutations_are_probably_done = true; - if (!add && !status.latest_failed_part.empty() && part_info.contains(status.latest_failed_part_info)) + if (!status.latest_failed_part.empty() && part_info.contains(status.latest_failed_part_info)) { status.latest_failed_part.clear(); status.latest_failed_part_info = MergeTreePartInfo(); @@ -262,6 +266,20 @@ void ReplicatedMergeTreeQueue::updateMutationsPartsToDo(const String & part_name storage.mutations_finalizing_task->schedule(); } +void ReplicatedMergeTreeQueue::addPartToMutations(const String & part_name) +{ + auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); + auto in_partition = mutations_by_partition.find(part_info.partition_id); + if (in_partition == mutations_by_partition.end()) + return; + + auto from_it = in_partition->second.upper_bound(part_info.getDataVersion()); + for (auto it = from_it; it != in_partition->second.end(); ++it) + { + MutationStatus & status = *it->second; + status.parts_to_do.add(part_name); + } +} void ReplicatedMergeTreeQueue::updateTimesInZooKeeper( zkutil::ZooKeeperPtr zookeeper, @@ -629,7 +647,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations) { - auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry)) + auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version)) .first->second; for (const auto & pair : entry->block_numbers) @@ -640,7 +658,9 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C } /// Initialize `mutation.parts_to_do`. First we need to mutate all parts in `current_parts`. - mutation.parts_to_do += getPartNamesToMutate(*entry, current_parts).size(); + Strings current_parts_to_mutate = getPartNamesToMutate(*entry, current_parts); + for (const String & current_part_to_mutate : current_parts_to_mutate) + mutation.parts_to_do.add(current_part_to_mutate); /// And next we would need to mutate all parts with getDataVersion() greater than /// mutation block number that would appear as a result of executing the queue. @@ -651,11 +671,11 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version); auto it = entry->block_numbers.find(part_info.partition_id); if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion()) - ++mutation.parts_to_do; + mutation.parts_to_do.add(produced_part_name); } } - if (mutation.parts_to_do == 0) + if (mutation.parts_to_do.size() == 0) some_mutations_are_probably_done = true; } } @@ -1277,8 +1297,14 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep { LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")"); mutation.is_done = true; + if (mutation.parts_to_do.size() != 0) + { + LOG_INFO(log, "Seems like we jumped over mutation " << znode << " when downloaded part with bigger mutation number." + << " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas."); + mutation.parts_to_do.clear(); + } } - else if (mutation.parts_to_do == 0) + else if (mutation.parts_to_do.size() == 0) { LOG_TRACE(log, "Will check if mutation " << mutation.entry->znode_name << " is done"); candidates.push_back(mutation.entry); @@ -1417,7 +1443,7 @@ std::vector ReplicatedMergeTreeQueue::getMutationsStatu { const MutationStatus & status = pair.second; const ReplicatedMergeTreeMutationEntry & entry = *status.entry; - const Names parts_to_mutate = getPartNamesToMutate(entry, current_parts); + Names parts_to_mutate = status.parts_to_do.getParts(); for (const MutationCommand & command : entry.commands) { diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 4e199068667..885a432e3cf 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -97,18 +97,21 @@ private: struct MutationStatus { - MutationStatus(const ReplicatedMergeTreeMutationEntryPtr & entry_) + MutationStatus(const ReplicatedMergeTreeMutationEntryPtr & entry_, MergeTreeDataFormatVersion format_version_) : entry(entry_) + , parts_to_do(format_version_) { } ReplicatedMergeTreeMutationEntryPtr entry; - /// A number of parts that should be mutated/merged or otherwise moved to Obsolete state for this mutation to complete. - Int64 parts_to_do = 0; + /// Parts we have to mutate to complete mutation. We use ActiveDataPartSet structure + /// to be able to manage covering and covered parts. + ActiveDataPartSet parts_to_do; - /// Note that is_done is not equivalent to parts_to_do == 0 - /// (even if parts_to_do == 0 some relevant parts can still commit in the future). + /// Note that is_done is not equivalent to parts_to_do.size() == 0 + /// (even if parts_to_do.size() == 0 some relevant parts can still commit in the future). + /// Also we can jump over mutation when we dowload mutated part from other replica. bool is_done = false; String latest_failed_part; @@ -191,9 +194,14 @@ private: std::optional & max_processed_insert_time_changed, std::unique_lock & state_lock); - /// If the new part appears (add == true) or becomes obsolete (add == false), update parts_to_do of all affected mutations. - /// Notifies storage.mutations_finalizing_task if some mutations are probably finished. - void updateMutationsPartsToDo(const String & part_name, bool add); + /// Add part for mutations with block_number > part.getDataVersion() + void addPartToMutations(const String & part_name); + + /// Remove part from mutations which were assigned to mutate it + /// with block_number > part.getDataVersion() + /// and block_number == part.getDataVersion() + /// ^ (this may happen if we downloaded mutated part from other replica) + void removePartFromMutations(const String & part_name); /// Update the insertion times in ZooKeeper. void updateTimesInZooKeeper(zkutil::ZooKeeperPtr zookeeper, diff --git a/dbms/tests/queries/0_stateless/01076_parallel_alter_replicated_zookeeper.reference b/dbms/tests/queries/0_stateless/01076_parallel_alter_replicated_zookeeper.reference new file mode 100644 index 00000000000..ff9c6824f00 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01076_parallel_alter_replicated_zookeeper.reference @@ -0,0 +1,17 @@ +1725 +1725 +1725 +1725 +1725 +Starting alters +Finishing alters +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 diff --git a/dbms/tests/queries/0_stateless/01076_parallel_alter_replicated_zookeeper.sh b/dbms/tests/queries/0_stateless/01076_parallel_alter_replicated_zookeeper.sh new file mode 100755 index 00000000000..8f4df35e749 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01076_parallel_alter_replicated_zookeeper.sh @@ -0,0 +1,122 @@ +#!/usr/bin/env bash + + +# This test checks mutations concurrent execution with concurrent inserts. +# There was a bug in mutations finalization, when mutation finishes not after all +# MUTATE_PART tasks execution, but after GET of already mutated part from other replica. +# To test it we stop replicated sends on some replicas to delay fetch of required parts for mutation. +# Since our replication queue executing tasks concurrently it may happen, that we dowload already mutated +# part before source part. + + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +REPLICAS=5 + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i" +done + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_mt_$i (key UInt64, value1 UInt64, value2 String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_alter_mt', '$i') ORDER BY key" +done + +$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10)" +$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10, 40)" + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i" +done + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_$i" +done + +INITIAL_SUM=`$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_1"` + +# Run mutation on random replica +function correct_alter_thread() +{ + TYPES=(Float64 String UInt8 UInt32) + while true; do + REPLICA=$(($RANDOM % 5 + 1)) + TYPE=${TYPES[$RANDOM % ${#TYPES[@]} ]} + $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_$REPLICA UPDATE value1 = value1 + 1 WHERE 1"; + sleep 0.$RANDOM + done +} + +# This thread add some data to table. +function insert_thread() +{ + + VALUES=(7 8 9) + while true; do + REPLICA=$(($RANDOM % 5 + 1)) + VALUE=${VALUES[$RANDOM % ${#VALUES[@]} ]} + $CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_$REPLICA VALUES($RANDOM, $VALUE, toString($VALUE))" + sleep 0.$RANDOM + done +} + +# Stop sends for some times on random replica +function stop_sends_thread() +{ + + while true; do + REPLICA=$(($RANDOM % 5 + 1)) + $CLICKHOUSE_CLIENT --query "SYSTEM STOP REPLICATED SENDS concurrent_alter_mt_$REPLICA" + sleep 0.$RANDOM + sleep 0.$RANDOM + sleep 0.$RANDOM + $CLICKHOUSE_CLIENT --query "SYSTEM START REPLICATED SENDS concurrent_alter_mt_$REPLICA" + sleep 0.$RANDOM + done +} + + +echo "Starting alters" + +export -f correct_alter_thread; +export -f insert_thread; +export -f stop_sends_thread; + +TIMEOUT=30 + +timeout $TIMEOUT bash -c stop_sends_thread 2> /dev/null & + +timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null & + +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & + +wait + +echo "Finishing alters" + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SYSTEM START REPLICATED SENDS concurrent_alter_mt_$i" +done + +# This alter will wait for all previous mutations +$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 UPDATE value1 = value1 + 1 WHERE 1 SETTINGS mutations_sync = 2" + +# maybe it's redundant +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i" +done + + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_mt_$i" + $CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done=0" # all mutations have to be done + $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i" +done