diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index d71b2ce8253..c44033bf49f 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -152,7 +152,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked( if (entry->type == LogEntry::ALTER_METADATA) { LOG_TRACE(log, "Adding alter metadata version " << entry->alter_version << " to the queue"); - alter_chain.addMetadataAlter(entry->alter_version, state_lock); + alter_chain.addMetadataAlter(entry->alter_version, entry->have_mutation, state_lock); } } @@ -231,7 +231,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( if (entry->type == LogEntry::ALTER_METADATA) { LOG_TRACE(log, "Finishing metadata alter with version " << entry->alter_version); - alter_chain.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock); + alter_chain.finishMetadataAlter(entry->alter_version, state_lock); } } else @@ -1357,12 +1357,12 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep { LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")"); mutation.is_done = true; + alter_chain.finishDataAlter(mutation.entry->alter_version, lock); if (mutation.parts_to_do.size() != 0) { LOG_INFO(log, "Seems like we jumped over mutation " << znode << " when downloaded part with bigger mutation number." << " It's OK, tasks for rest parts will be skipped, but probably a lot of mutations were executed concurrently on different replicas."); mutation.parts_to_do.clear(); - alter_chain.finishDataAlter(mutation.entry->alter_version, lock); } } else if (mutation.parts_to_do.size() == 0) diff --git a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h index cef9cb28e85..f4b21dc2b1d 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h +++ b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h @@ -32,8 +32,8 @@ private: }; std::map queue_state; - public: + int getHeadAlterVersion(std::lock_guard & /*state_lock*/) const { if (!queue_state.empty()) @@ -44,25 +44,25 @@ public: void addMutationForAlter(int alter_version, std::lock_guard & /*state_lock*/) { if (!queue_state.count(alter_version)) - queue_state.emplace(alter_version, AlterState{false, false}); + queue_state.emplace(alter_version, AlterState{true, false}); else queue_state[alter_version].data_finished = false; } - void addMetadataAlter(int alter_version, std::lock_guard & /*state_lock*/) + void addMetadataAlter(int alter_version, bool have_mutation, std::lock_guard & /*state_lock*/) { if (!queue_state.count(alter_version)) - queue_state.emplace(alter_version, AlterState{false, true}); + queue_state.emplace(alter_version, AlterState{false, !have_mutation}); else queue_state[alter_version].metadata_finished = false; } - void finishMetadataAlter(int alter_version, bool have_mutation, std::unique_lock & /*state_lock*/) + void finishMetadataAlter(int alter_version, std::unique_lock & /*state_lock*/) { assert(!queue_state.empty()); assert(queue_state.begin()->first == alter_version); - if (!have_mutation) + if (queue_state[alter_version].data_finished) queue_state.erase(alter_version); else queue_state[alter_version].metadata_finished = true; diff --git a/dbms/tests/queries/0_stateless/01079_parallel_alter_detach_table.reference b/dbms/tests/queries/0_stateless/01079_parallel_alter_detach_table.reference new file mode 100644 index 00000000000..ed688ba26d2 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01079_parallel_alter_detach_table.reference @@ -0,0 +1,11 @@ +1725 +1725 +1725 +Starting alters +Finishing alters +1 +0 +1 +0 +1 +0 diff --git a/dbms/tests/queries/0_stateless/01079_parallel_alter_detach_table.sh b/dbms/tests/queries/0_stateless/01079_parallel_alter_detach_table.sh new file mode 100755 index 00000000000..8adad72d837 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01079_parallel_alter_detach_table.sh @@ -0,0 +1,108 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +REPLICAS=3 + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_detach_$i" +done + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_detach_$i (key UInt64, value1 UInt8, value2 UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_alter_mt', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000" +done + +$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_detach_1 SELECT number, number + 10, number from numbers(10)" +$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_detach_1 SELECT number, number + 10, number from numbers(10, 40)" + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_detach_$i" +done + +INITIAL_SUM=`$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_detach_1"` + +# This alters mostly requires not only metadata change +# but also conversion of data. Also they are all compatible +# between each other, so can be executed concurrently. +function correct_alter_thread() +{ + TYPES=(Float64 String UInt8 UInt32) + while true; do + REPLICA=$(($RANDOM % 3 + 1)) + TYPE=${TYPES[$RANDOM % ${#TYPES[@]} ]} + $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_detach_$REPLICA MODIFY COLUMN value1 $TYPE SETTINGS replication_alter_partitions_sync=2"; # additionaly we don't wait anything for more heavy concurrency + sleep 0.$RANDOM + done +} + +# This thread add some data to table. After we finish we can check, that +# all our data have same types. +# insert queries will fail sometime because of wrong types. +function insert_thread() +{ + + VALUES=(7.0 7 '7') + while true; do + REPLICA=$(($RANDOM % 3 + 1)) + VALUE=${VALUES[$RANDOM % ${#VALUES[@]} ]} + $CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_detach_$REPLICA VALUES($RANDOM, $VALUE, $VALUE)" + sleep 0.$RANDOM + done +} + +function detach_attach_thread() +{ + while true; do + REPLICA=$(($RANDOM % 3 + 1)) + $CLICKHOUSE_CLIENT --query "DETACH TABLE concurrent_alter_detach_$REPLICA" + sleep 0.$RANDOM + $CLICKHOUSE_CLIENT --query "ATTACH TABLE concurrent_alter_detach_$REPLICA" + done +} + +echo "Starting alters" +export -f correct_alter_thread; +export -f insert_thread; +export -f detach_attach_thread; + +TIMEOUT=30 + +# Sometimes we detach and attach tables +timeout $TIMEOUT bash -c detach_attach_thread 2> /dev/null & +timeout $TIMEOUT bash -c detach_attach_thread 2> /dev/null & + +timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null & +timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null & + +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & + +wait + +echo "Finishing alters" + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "ATTACH TABLE concurrent_alter_detach_$i" 2> /dev/null +done + + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_detach_$i" +done + + +# This alter will finish all previous, but replica 1 maybe still not up-to-date +while [[ $($CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_detach_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do + sleep 1 +done + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_detach_$i" + $CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_detach_$i" + $CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done=0" # all mutations have to be done + $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_detach_$i" +done