diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index bddf8434e1c..bf410011fef 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -256,7 +256,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo zkutil::CreateMode::PersistentSequential)); ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/columns", storage.getMetadataVersion())); - ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/alter_intention_counter", "", -1)); + ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/block_numbers/" + part->info.partition_id, "", -1)); /// Deletes the information that the block number is used for writing. block_number_lock->getUnlockOps(ops); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 8bfa974ce45..6b0322d1e7e 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -287,6 +287,8 @@ void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name) for (auto it = from_it; it != in_partition->second.end(); ++it) { MutationStatus & status = *it->second; + + LOG_DEBUG(log, "Removing part name:" << part_name << " from mutation:" << status.entry->znode_name); status.parts_to_do.removePartAndCoveredParts(part_name); if (status.parts_to_do.size() == 0) some_mutations_are_probably_done = true; @@ -695,6 +697,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations) { + LOG_DEBUG(log, "PROCESSING MUTATION:" << entry->znode_name); auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version)) .first->second; @@ -972,6 +975,9 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa { std::lock_guard lock(state_mutex); + if (!alter_sequence.canExecuteGetEntry(part_name, format_version, lock)) + return false; + if (isNotCoveredByFuturePartsImpl(part_name, reject_reason, lock)) { CurrentlyExecuting::setActualPartName(entry, part_name, *this); @@ -991,7 +997,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( { if (entry.type == LogEntry::GET_PART) { - if (!alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock)) + if (!entry.actual_new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.actual_new_part_name, format_version, state_lock)) + return false; + + if (!entry.new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock)) return false; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h index b359ce0dc14..7e10a81247c 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h +++ b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterState.h @@ -47,7 +47,10 @@ public: void addMutationForAlter(int alter_version, const std::map & block_numbers, std::lock_guard & /*state_lock*/) { LOG_DEBUG(log, "Adding mutation with alter version:" << alter_version); - queue_state.emplace(alter_version, AlterInQueue(block_numbers, true)); + if (queue_state.count(alter_version)) + queue_state[alter_version].block_numbers = block_numbers; + else + queue_state.emplace(alter_version, AlterInQueue(block_numbers, true)); } void addMetadataAlter(int alter_version, std::lock_guard & /*state_lock*/) @@ -65,8 +68,19 @@ public: return true; MergeTreePartInfo info = MergeTreePartInfo::fromPartName(part_name, format_version); - if (queue_state.begin()->second.block_numbers.count(info.partition_id)) - return info.getDataVersion() < queue_state.begin()->second.block_numbers.at(info.partition_id); + LOG_DEBUG(log, "Checking can fetch:" << part_name); + + for (const auto & [block_number, state] : queue_state) + { + + LOG_DEBUG(log, "Looking at block:" << block_number << " with part name:" << part_name); + if (state.block_numbers.count(info.partition_id)) + { + LOG_DEBUG(log, "Block number:" << block_number << " has part name " << part_name << " version " << state.block_numbers.at(info.partition_id)); + return info.getDataVersion() < state.block_numbers.at(info.partition_id); + } + } + LOG_DEBUG(log, "Nobody has block number for part " << part_name); return true; } @@ -117,6 +131,8 @@ public: { LOG_DEBUG(log, "Key:" << key << " is metadata finished:" << value.metadata_finished); } + if (alter_version < queue_state.begin()->first) + return true; return queue_state.at(alter_version).metadata_finished; } bool canExecuteMetaAlter(int alter_version, std::lock_guard & /*state_lock*/) const diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index cc1311a8760..9bfefeb5e00 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2265,6 +2265,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() LOG_DEBUG(log, "I'm not leader, I don't want to assign anything"); return; } + LOG_DEBUG(log, "Merge selecting started"); const auto storage_settings_ptr = getSettings(); const bool deduplicate = false; /// TODO: read deduplicate option from table config @@ -2304,6 +2305,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() if (max_source_parts_size_for_merge > 0 && merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred)) { + LOG_DEBUG(log, "ASSIGNING MERGE"); success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, force_ttl); } @@ -2337,6 +2339,10 @@ void StorageReplicatedMergeTree::mergeSelectingTask() } } } + else + { + LOG_DEBUG(log, "TOO MANY MUTATIONS IN QUEUE"); + } } } catch (...) @@ -3395,6 +3401,7 @@ void StorageReplicatedMergeTree::alter( bool have_mutation = false; std::optional lock_holder; + size_t partitions_count = 0; if (!maybe_mutation_commands.empty()) { String mutations_path = zookeeper_path + "/mutations"; @@ -3406,17 +3413,36 @@ void StorageReplicatedMergeTree::alter( Coordination::Stat mutations_stat; zookeeper->get(mutations_path, &mutations_stat); - Coordination::Stat intention_counter_stat; - zookeeper->get(zookeeper_path + "/alter_intention_counter", &intention_counter_stat); + //Coordination::Stat intention_counter_stat; + //zookeeper->get(zookeeper_path + "/alter_intention_counter", &intention_counter_stat); + Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers"); + + std::vector> partition_futures; + for (const String & partition : partitions) + partition_futures.push_back(zookeeper->asyncGet(zookeeper_path + "/block_numbers/" + partition)); + + std::unordered_map partition_versions; + for (size_t i = 0; i < partition_futures.size(); ++i) + { + auto stat = partition_futures[i].get().stat; + auto partition = partitions[i]; + partition_versions[partition] = stat.version; + LOG_DEBUG(log, "Partition version:" << partition << " stat version " << stat.version); + } + lock_holder.emplace( zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper); - ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/alter_intention_counter", intention_counter_stat.version)); - for (const auto & lock : lock_holder->getLocks()) { mutation_entry.block_numbers[lock.partition_id] = lock.number; - LOG_DEBUG(log, "ALLOCATED:" << lock.number << " FOR VERSION:" << metadata_version + 1); + //ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/alter_intention_counter", intention_counter_stat.version)); + ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/block_numbers/" + lock.partition_id, partition_versions[lock.partition_id])); + partitions_count++; + LOG_DEBUG( + log, + "ALLOCATED:" << lock.number << " FOR VERSION:" << metadata_version + 1 + << " Partition version:" << partition_versions[lock.partition_id] + 1 << " for partition " << lock.partition_id); } mutation_entry.create_time = time(nullptr); @@ -3431,7 +3457,7 @@ void StorageReplicatedMergeTree::alter( Coordination::Responses results; int32_t rc = zookeeper->tryMulti(ops, results); - LOG_DEBUG(log, "ALTER REQUESTED TO" << entry.alter_version); + LOG_DEBUG(log, "ALTER REQUESTED TO:" << entry.alter_version); //std::cerr << "Results size:" << results.size() << std::endl; //std::cerr << "Have mutation:" << have_mutation << std::endl; @@ -3443,7 +3469,7 @@ void StorageReplicatedMergeTree::alter( { //std::cerr << "In have mutation\n"; //std::cerr << "INDEX:" << results.size() - 2 << std::endl; - String alter_path = dynamic_cast(*results[results.size() - 4]).path_created; + String alter_path = dynamic_cast(*results[results.size() - 3 - partitions_count]).path_created; //std::cerr << "Alter path:" << alter_path << std::endl; entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1); diff --git a/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.reference b/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.reference new file mode 100644 index 00000000000..ff9c6824f00 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.reference @@ -0,0 +1,17 @@ +1725 +1725 +1725 +1725 +1725 +Starting alters +Finishing alters +1 +0 +1 +0 +1 +0 +1 +0 +1 +0 diff --git a/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh b/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh new file mode 100755 index 00000000000..572beb88389 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh @@ -0,0 +1,109 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CURDIR/../shell_config.sh + +REPLICAS=5 + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i" +done + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_mt_$i (key UInt64, value1 UInt64, value2 String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_alter_mt', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0" +done + +$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10)" +$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10, 40)" + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i" +done + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_$i" +done + +INITIAL_SUM=`$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_1"` + +# This is just garbage thread with conflictings alter +# it additionally loads alters "queue". +function garbage_alter_thread() +{ + while true; do + REPLICA=$(($RANDOM % 5 + 1)) + $CLICKHOUSE_CLIENT -n --query "ALTER TABLE concurrent_alter_mt_$REPLICA ADD COLUMN h String DEFAULT '0'; ALTER TABLE concurrent_alter_mt_$REPLICA MODIFY COLUMN h UInt64; ALTER TABLE concurrent_alter_mt_$REPLICA DROP COLUMN h;"; + done +} + + +# This alters mostly requires not only metadata change +# but also conversion of data. Also they are all compatible +# between each other, so can be executed concurrently. +function correct_alter_thread() +{ + TYPES=(Float64 String UInt8 UInt32) + while true; do + REPLICA=$(($RANDOM % 5 + 1)) + TYPE=${TYPES[$RANDOM % ${#TYPES[@]} ]} + $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_$REPLICA MODIFY COLUMN value1 $TYPE SETTINGS replication_alter_partitions_sync=0"; # additionaly we don't wait anything for more heavy concurrency + sleep 0.$RANDOM + done +} + +# This thread add some data to table. After we finish we can check, that +# all our data have same types. +# insert queries will fail sometime because of wrong types. +function insert_thread() +{ + + VALUES=(7.0 7 '7') + while true; do + REPLICA=$(($RANDOM % 5 + 1)) + VALUE=${VALUES[$RANDOM % ${#VALUES[@]} ]} + $CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_$REPLICA VALUES($RANDOM, $VALUE, toString($VALUE))" + sleep 0.$RANDOM + done +} + + + +echo "Starting alters" +export -f garbage_alter_thread; +export -f correct_alter_thread; +export -f insert_thread; + + +TIMEOUT=120 + + +timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null & +timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null & +timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null & + +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & +timeout $TIMEOUT bash -c insert_thread 2> /dev/null & + +wait + +echo "Finishing alters" + +# This alter will finish all previous +$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>/dev/null +while [ $? -ne 0 ]; do + $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>/dev/null +done + + +for i in `seq $REPLICAS`; do + $CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_mt_$i" + $CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done=0" # all mutations have to be done + #$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i" +done