From 59eb3aa9a9662b6a2930453f3909a271c4a8198f Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 20 Aug 2021 15:59:57 +0300 Subject: [PATCH 1/4] avoid too long waiting for inactive replicas --- src/Core/Settings.h | 3 +- .../PartMovesBetweenShardsOrchestrator.cpp | 10 +- src/Storages/StorageReplicatedMergeTree.cpp | 137 ++++++++++++------ src/Storages/StorageReplicatedMergeTree.h | 10 +- tests/config/users.d/timeouts.xml | 4 + 5 files changed, 109 insertions(+), 55 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 30f3c69ff4a..f154b1ccbd5 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -97,7 +97,7 @@ class IColumn; M(Bool, optimize_move_to_prewhere_if_final, false, "If query has `FINAL`, the optimization `move_to_prewhere` is not always correct and it is enabled only if both settings `optimize_move_to_prewhere` and `optimize_move_to_prewhere_if_final` are turned on", 0) \ \ M(UInt64, replication_alter_partitions_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) \ - M(UInt64, replication_alter_columns_timeout, 60, "Wait for actions to change the table structure within the specified number of seconds. 0 - wait unlimited time.", 0) \ + M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - wait unlimited time, negative - do not wait.", 0) \ \ M(LoadBalancing, load_balancing, LoadBalancing::RANDOM, "Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.", 0) \ M(UInt64, load_balancing_first_offset, 0, "Which replica to preferably send a query when FIRST_OR_RANDOM load balancing strategy is used.", 0) \ @@ -508,6 +508,7 @@ class IColumn; M(Bool, allow_experimental_window_functions, true, "Obsolete setting, does nothing.", 0) \ M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "Obsolete setting, does nothing.", 0) \ M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing.", 0) \ + M(UInt64, replication_alter_columns_timeout, 60, "Obsolete setting, does nothing.", 0) \ /** The section above is for obsolete settings. Do not add anything there. */ diff --git a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp index 4c187109ac6..7297266bd30 100644 --- a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp +++ b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp @@ -194,7 +194,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: /// This wait in background schedule pool is useless. It'd be /// better to have some notification which will call `step` /// function when all replicated will finish. TODO. - storage.waitForAllReplicasToProcessLogEntry(log_entry, true); + storage.waitForAllReplicasToProcessLogEntry(log_entry, 0); } { @@ -231,7 +231,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true); + storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -269,7 +269,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true); + storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -318,7 +318,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, true); + storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -348,7 +348,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: { ReplicatedMergeTreeLogEntry log_entry; if (storage.dropPartImpl(zk, entry.part_name, log_entry, false, false)) - storage.waitForAllReplicasToProcessLogEntry(log_entry, true); + storage.waitForAllReplicasToProcessLogEntry(log_entry, 0); } { diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 98ce2ac73e1..165a7998cfa 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4719,11 +4719,15 @@ bool StorageReplicatedMergeTree::optimize( } } - if (query_context->getSettingsRef().replication_alter_partitions_sync != 0) + table_lock.reset(); + + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; + for (auto & merge_entry : merge_entries) { - /// NOTE Table lock must not be held while waiting. Some combination of R-W-R locks from different threads will yield to deadlock. - for (auto & merge_entry : merge_entries) - waitForAllReplicasToProcessLogEntry(merge_entry, false); + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + waitForReplicaToProcessLogEntry(replica_name, merge_entry, wait_for_inactive_timeout); + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + waitForAllReplicasToProcessLogEntry(merge_entry, wait_for_inactive_timeout); } return true; @@ -5036,15 +5040,16 @@ void StorageReplicatedMergeTree::alter( table_lock_holder.reset(); std::vector unwaited; + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) { LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); - unwaited = waitForAllReplicasToProcessLogEntry(*alter_entry, false); + unwaited = tryWaitForAllReplicasToProcessLogEntry(*alter_entry, wait_for_inactive_timeout); } else if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) { LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); - waitForReplicaToProcessLogEntry(replica_name, *alter_entry); + waitForReplicaToProcessLogEntry(replica_name, *alter_entry, wait_for_inactive_timeout); } if (!unwaited.empty()) @@ -5200,10 +5205,11 @@ void StorageReplicatedMergeTree::dropPart(const String & part_name, bool detach, dropPartImpl(zookeeper, part_name, entry, detach, /*throw_if_noop=*/ true); /// If necessary, wait until the operation is performed on itself or on all replicas. + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry); + waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry); + waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); } void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) @@ -5221,10 +5227,11 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de if (did_drop) { /// If necessary, wait until the operation is performed on itself or on all replicas. + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry); + waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry); + waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); cleanLastPartNode(partition_id); } @@ -5244,12 +5251,22 @@ void StorageReplicatedMergeTree::truncate( Strings partitions = zookeeper->getChildren(fs::path(zookeeper_path) / "block_numbers"); + std::vector> entries_to_wait; + entries_to_wait.reserve(partitions.size()); for (String & partition_id : partitions) { - LogEntry entry; + auto entry = std::make_unique(); + if (dropAllPartsInPartition(*zookeeper, partition_id, *entry, query_context, false)) + entries_to_wait.push_back(std::move(entry)); + } - if (dropAllPartsInPartition(*zookeeper, partition_id, entry, query_context, false)) - waitForAllReplicasToProcessLogEntry(entry); + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; + for (const auto & entry : entries_to_wait) + { + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + waitForReplicaToProcessLogEntry(replica_name, *entry, wait_for_inactive_timeout); + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + waitForAllReplicasToProcessLogEntry(*entry, wait_for_inactive_timeout); } } @@ -5408,19 +5425,20 @@ StorageReplicatedMergeTree::allocateBlockNumber( } -Strings StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry( - const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) +Strings StorageReplicatedMergeTree::tryWaitForAllTableReplicasToProcessLogEntry( + const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { LOG_DEBUG(log, "Waiting for all replicas to process {}", entry.znode_name); auto zookeeper = getZooKeeper(); Strings replicas = zookeeper->getChildren(fs::path(table_zookeeper_path) / "replicas"); Strings unwaited; + bool wait_for_inactive = 0 <= wait_for_inactive_timeout; for (const String & replica : replicas) { - if (wait_for_non_active || zookeeper->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active")) + if (wait_for_inactive || zookeeper->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active")) { - if (!waitForTableReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_non_active)) + if (!waitForTableReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_inactive_timeout)) unwaited.push_back(replica); } else @@ -5433,16 +5451,31 @@ Strings StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry( return unwaited; } - -Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry( - const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) +void StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry( + const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { - return waitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_non_active); + Strings unfinished_replicas = tryWaitForAllTableReplicasToProcessLogEntry(table_zookeeper_path, entry, wait_for_inactive_timeout); + if (unfinished_replicas.empty()) + return; + + throw Exception(ErrorCodes::UNFINISHED, "Timeout exceeded while waiting for replicas {} to process entry {}. " + "Probably some replicas are inactive", fmt::join(unfinished_replicas, ", "), entry.znode_name); } +Strings StorageReplicatedMergeTree::tryWaitForAllReplicasToProcessLogEntry( + const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) +{ + return tryWaitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout); +} + +void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry( + const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) +{ + waitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout); +} bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( - const String & table_zookeeper_path, const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) + const String & table_zookeeper_path, const String & replica, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { String entry_str = entry.toString(); String log_node_name; @@ -5460,18 +5493,24 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( */ bool waiting_itself = replica == replica_name; + bool wait_for_inactive = 0 <= wait_for_inactive_timeout; + Stopwatch time_waiting; const auto & stop_waiting = [&]() { bool stop_waiting_itself = waiting_itself && partial_shutdown_called; - bool stop_waiting_non_active = !wait_for_non_active && !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active"); - return is_dropped || stop_waiting_itself || stop_waiting_non_active; + bool timeout_exceeded = 0 < wait_for_inactive_timeout && wait_for_inactive_timeout < time_waiting.elapsedSeconds(); + bool stop_waiting_inactive = (!wait_for_inactive || timeout_exceeded) + && !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active"); + return is_dropped || stop_waiting_itself || stop_waiting_inactive; }; /// Don't recheck ZooKeeper too often constexpr auto event_wait_timeout_ms = 3000; - if (startsWith(entry.znode_name, "log-")) + if (!startsWith(entry.znode_name, "log-")) + throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR); + { /// Take the number from the node name `log-xxxxxxxxxx`. UInt64 log_index = parse(entry.znode_name.substr(entry.znode_name.size() - 10)); @@ -5480,13 +5519,17 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( LOG_DEBUG(log, "Waiting for {} to pull {} to queue", replica, log_node_name); /// Let's wait until entry gets into the replica queue. + bool pulled_to_queue = false; while (!stop_waiting()) { zkutil::EventPtr event = std::make_shared(); String log_pointer = getZooKeeper()->get(fs::path(table_zookeeper_path) / "replicas" / replica / "log_pointer", nullptr, event); if (!log_pointer.empty() && parse(log_pointer) > log_index) + { + pulled_to_queue = true; break; + } /// Wait with timeout because we can be already shut down, but not dropped. /// So log_pointer node will exist, but we will never update it because all background threads already stopped. @@ -5494,9 +5537,10 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( /// but the query will never finish because the drop already shut down the table. event->tryWait(event_wait_timeout_ms); } + + if (!pulled_to_queue) + return false; } - else - throw Exception("Logical error: unexpected name of log node: " + entry.znode_name, ErrorCodes::LOGICAL_ERROR); LOG_DEBUG(log, "Looking for node corresponding to {} in {} queue", log_node_name, replica); @@ -5535,9 +5579,9 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry( - const String & replica, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active) + const String & replica, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { - return waitForTableReplicaToProcessLogEntry(zookeeper_path, replica, entry, wait_for_non_active); + return waitForTableReplicaToProcessLogEntry(zookeeper_path, replica, entry, wait_for_inactive_timeout); } @@ -6548,13 +6592,15 @@ void StorageReplicatedMergeTree::replacePartitionFrom( parts_to_remove.clear(); cleanup_thread.wakeup(); + lock2.reset(); + lock1.reset(); + /// If necessary, wait until the operation is performed on all replicas. - if (query_context->getSettingsRef().replication_alter_partitions_sync > 1) - { - lock2.reset(); - lock1.reset(); - waitForAllReplicasToProcessLogEntry(entry); - } + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); } void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) @@ -6732,12 +6778,13 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta parts_to_remove.clear(); cleanup_thread.wakeup(); + lock2.reset(); - if (query_context->getSettingsRef().replication_alter_partitions_sync > 1) - { - lock2.reset(); - dest_table_storage->waitForAllReplicasToProcessLogEntry(entry); - } + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); /// Create DROP_RANGE for the source table alter_partition_version_path = zookeeper_path + "/alter_partition_version"; @@ -6764,11 +6811,11 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta log_znode_path = dynamic_cast(*op_results.front()).path_created; entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - if (query_context->getSettingsRef().replication_alter_partitions_sync > 1) - { - lock1.reset(); - waitForAllReplicasToProcessLogEntry(entry_delete); - } + lock1.reset(); + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + waitForReplicaToProcessLogEntry(replica_name, entry_delete, wait_for_inactive_timeout); + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + waitForAllReplicasToProcessLogEntry(entry_delete, wait_for_inactive_timeout); /// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper. cleanLastPartNode(partition_id); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 3d2727d7bb9..7a733c42427 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -638,14 +638,16 @@ private: * * One method for convenient use on current table, another for waiting on foreign shards. */ - Strings waitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); - Strings waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); + void waitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); + void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); + Strings tryWaitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); + Strings tryWaitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); /** Wait until the specified replica executes the specified action from the log. * NOTE: See comment about locks above. */ - bool waitForTableReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); - bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); + bool waitForTableReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0); + bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0); /// Throw an exception if the table is readonly. void assertNotReadonly() const; diff --git a/tests/config/users.d/timeouts.xml b/tests/config/users.d/timeouts.xml index 60b24cfdef8..6a1099c4404 100644 --- a/tests/config/users.d/timeouts.xml +++ b/tests/config/users.d/timeouts.xml @@ -6,6 +6,10 @@ 60 60000 + + 10 + + 30 From 9ef0b00803e67bf18051f118abd80d5fd66e082f Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 20 Aug 2021 17:12:42 +0300 Subject: [PATCH 2/4] fix config --- tests/config/config.d/merge_tree_settings.xml | 6 ++++++ tests/config/install.sh | 1 + tests/config/users.d/timeouts.xml | 2 -- 3 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 tests/config/config.d/merge_tree_settings.xml diff --git a/tests/config/config.d/merge_tree_settings.xml b/tests/config/config.d/merge_tree_settings.xml new file mode 100644 index 00000000000..8c57dc3acfd --- /dev/null +++ b/tests/config/config.d/merge_tree_settings.xml @@ -0,0 +1,6 @@ + + + + 10 + + diff --git a/tests/config/install.sh b/tests/config/install.sh index 571dff34018..ea8c3ec56c8 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -28,6 +28,7 @@ ln -sf $SRC_PATH/config.d/clusters.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/ +ln -sf $SRC_PATH/config.d/merge_tree_settings.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/ diff --git a/tests/config/users.d/timeouts.xml b/tests/config/users.d/timeouts.xml index 6a1099c4404..7d58315bd94 100644 --- a/tests/config/users.d/timeouts.xml +++ b/tests/config/users.d/timeouts.xml @@ -6,8 +6,6 @@ 60 60000 - - 10 30 From cc9c2fd63b05be686b66c365d7a713c7909a1b05 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 23 Aug 2021 15:57:50 +0300 Subject: [PATCH 3/4] make code better --- src/Core/Settings.h | 2 +- .../PartMovesBetweenShardsOrchestrator.cpp | 10 +- src/Storages/StorageReplicatedMergeTree.cpp | 123 ++++++------------ src/Storages/StorageReplicatedMergeTree.h | 23 ++-- ...ical_result_after_merge_zookeeper_long.sql | 1 + ..._execute_merges_on_single_replica_long.sql | 1 + 6 files changed, 60 insertions(+), 100 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index f154b1ccbd5..e2e662a64d6 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -97,7 +97,7 @@ class IColumn; M(Bool, optimize_move_to_prewhere_if_final, false, "If query has `FINAL`, the optimization `move_to_prewhere` is not always correct and it is enabled only if both settings `optimize_move_to_prewhere` and `optimize_move_to_prewhere_if_final` are turned on", 0) \ \ M(UInt64, replication_alter_partitions_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) \ - M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - wait unlimited time, negative - do not wait.", 0) \ + M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - do not wait, negative - wait for unlimited time.", 0) \ \ M(LoadBalancing, load_balancing, LoadBalancing::RANDOM, "Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.", 0) \ M(UInt64, load_balancing_first_offset, 0, "Which replica to preferably send a query when FIRST_OR_RANDOM load balancing strategy is used.", 0) \ diff --git a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp index 7297266bd30..ca51353b1bc 100644 --- a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp +++ b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp @@ -194,7 +194,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: /// This wait in background schedule pool is useless. It'd be /// better to have some notification which will call `step` /// function when all replicated will finish. TODO. - storage.waitForAllReplicasToProcessLogEntry(log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, 0); } { @@ -231,7 +231,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -269,7 +269,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -318,7 +318,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllTableReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); } { @@ -348,7 +348,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: { ReplicatedMergeTreeLogEntry log_entry; if (storage.dropPartImpl(zk, entry.part_name, log_entry, false, false)) - storage.waitForAllReplicasToProcessLogEntry(log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, 0); } { diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 165a7998cfa..a88e38873a5 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4721,14 +4721,8 @@ bool StorageReplicatedMergeTree::optimize( table_lock.reset(); - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; for (auto & merge_entry : merge_entries) - { - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, merge_entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(merge_entry, wait_for_inactive_timeout); - } + waitForLogEntryToBeProcessedIfNecessary(merge_entry, query_context); return true; } @@ -5039,21 +5033,8 @@ void StorageReplicatedMergeTree::alter( table_lock_holder.reset(); - std::vector unwaited; - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; - if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - { - LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); - unwaited = tryWaitForAllReplicasToProcessLogEntry(*alter_entry, wait_for_inactive_timeout); - } - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - { - LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); - waitForReplicaToProcessLogEntry(replica_name, *alter_entry, wait_for_inactive_timeout); - } - - if (!unwaited.empty()) - throw Exception("Some replicas doesn't finish metadata alter", ErrorCodes::UNFINISHED); + LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); + waitForLogEntryToBeProcessedIfNecessary(*alter_entry, query_context, "Some replicas doesn't finish metadata alter: "); if (mutation_znode) { @@ -5204,12 +5185,7 @@ void StorageReplicatedMergeTree::dropPart(const String & part_name, bool detach, dropPartImpl(zookeeper, part_name, entry, detach, /*throw_if_noop=*/ true); - /// If necessary, wait until the operation is performed on itself or on all replicas. - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); + waitForLogEntryToBeProcessedIfNecessary(entry, query_context); } void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) @@ -5226,13 +5202,7 @@ void StorageReplicatedMergeTree::dropPartition(const ASTPtr & partition, bool de if (did_drop) { - /// If necessary, wait until the operation is performed on itself or on all replicas. - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); - + waitForLogEntryToBeProcessedIfNecessary(entry, query_context); cleanLastPartNode(partition_id); } } @@ -5260,14 +5230,8 @@ void StorageReplicatedMergeTree::truncate( entries_to_wait.push_back(std::move(entry)); } - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; for (const auto & entry : entries_to_wait) - { - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, *entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(*entry, wait_for_inactive_timeout); - } + waitForLogEntryToBeProcessedIfNecessary(*entry, query_context); } @@ -5425,7 +5389,7 @@ StorageReplicatedMergeTree::allocateBlockNumber( } -Strings StorageReplicatedMergeTree::tryWaitForAllTableReplicasToProcessLogEntry( +Strings StorageReplicatedMergeTree::tryWaitForAllReplicasToProcessLogEntry( const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { LOG_DEBUG(log, "Waiting for all replicas to process {}", entry.znode_name); @@ -5433,12 +5397,12 @@ Strings StorageReplicatedMergeTree::tryWaitForAllTableReplicasToProcessLogEntry( auto zookeeper = getZooKeeper(); Strings replicas = zookeeper->getChildren(fs::path(table_zookeeper_path) / "replicas"); Strings unwaited; - bool wait_for_inactive = 0 <= wait_for_inactive_timeout; + bool wait_for_inactive = wait_for_inactive_timeout != 0; for (const String & replica : replicas) { if (wait_for_inactive || zookeeper->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active")) { - if (!waitForTableReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_inactive_timeout)) + if (!tryWaitForReplicaToProcessLogEntry(table_zookeeper_path, replica, entry, wait_for_inactive_timeout)) unwaited.push_back(replica); } else @@ -5451,30 +5415,37 @@ Strings StorageReplicatedMergeTree::tryWaitForAllTableReplicasToProcessLogEntry( return unwaited; } -void StorageReplicatedMergeTree::waitForAllTableReplicasToProcessLogEntry( - const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) +void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry( + const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout, const String & error_context) { - Strings unfinished_replicas = tryWaitForAllTableReplicasToProcessLogEntry(table_zookeeper_path, entry, wait_for_inactive_timeout); + Strings unfinished_replicas = tryWaitForAllReplicasToProcessLogEntry(table_zookeeper_path, entry, wait_for_inactive_timeout); if (unfinished_replicas.empty()) return; - throw Exception(ErrorCodes::UNFINISHED, "Timeout exceeded while waiting for replicas {} to process entry {}. " - "Probably some replicas are inactive", fmt::join(unfinished_replicas, ", "), entry.znode_name); + throw Exception(ErrorCodes::UNFINISHED, "{}Timeout exceeded while waiting for replicas {} to process entry {}. " + "Probably some replicas are inactive", error_context, fmt::join(unfinished_replicas, ", "), entry.znode_name); } -Strings StorageReplicatedMergeTree::tryWaitForAllReplicasToProcessLogEntry( - const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) +void StorageReplicatedMergeTree::waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context) { - return tryWaitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout); + /// If necessary, wait until the operation is performed on itself or on all replicas. + Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; + if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) + { + bool finished = tryWaitForReplicaToProcessLogEntry(zookeeper_path, replica_name, entry, wait_for_inactive_timeout); + if (!finished) + { + throw Exception(ErrorCodes::UNFINISHED, "{}Log entry {} is not precessed on local replica, " + "most likely because the replica was shut down.", error_context, entry.znode_name); + } + } + else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) + { + waitForAllReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout, error_context); + } } -void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry( - const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) -{ - waitForAllTableReplicasToProcessLogEntry(zookeeper_path, entry, wait_for_inactive_timeout); -} - -bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( +bool StorageReplicatedMergeTree::tryWaitForReplicaToProcessLogEntry( const String & table_zookeeper_path, const String & replica, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) { String entry_str = entry.toString(); @@ -5493,13 +5464,16 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( */ bool waiting_itself = replica == replica_name; - bool wait_for_inactive = 0 <= wait_for_inactive_timeout; + /// Do not wait if timeout is zero + bool wait_for_inactive = wait_for_inactive_timeout != 0; + /// Wait for unlimited time if timeout is negative + bool check_timeout = wait_for_inactive_timeout > 0; Stopwatch time_waiting; const auto & stop_waiting = [&]() { bool stop_waiting_itself = waiting_itself && partial_shutdown_called; - bool timeout_exceeded = 0 < wait_for_inactive_timeout && wait_for_inactive_timeout < time_waiting.elapsedSeconds(); + bool timeout_exceeded = check_timeout && wait_for_inactive_timeout < time_waiting.elapsedSeconds(); bool stop_waiting_inactive = (!wait_for_inactive || timeout_exceeded) && !getZooKeeper()->exists(fs::path(table_zookeeper_path) / "replicas" / replica / "is_active"); return is_dropped || stop_waiting_itself || stop_waiting_inactive; @@ -5578,13 +5552,6 @@ bool StorageReplicatedMergeTree::waitForTableReplicaToProcessLogEntry( } -bool StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry( - const String & replica, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout) -{ - return waitForTableReplicaToProcessLogEntry(zookeeper_path, replica, entry, wait_for_inactive_timeout); -} - - void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields) { auto zookeeper = tryGetZooKeeper(); @@ -6595,12 +6562,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom( lock2.reset(); lock1.reset(); - /// If necessary, wait until the operation is performed on all replicas. - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); + waitForLogEntryToBeProcessedIfNecessary(entry, query_context); } void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) @@ -6780,11 +6742,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta cleanup_thread.wakeup(); lock2.reset(); - Int64 wait_for_inactive_timeout = query_context->getSettingsRef().replication_wait_for_inactive_replica_timeout; - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry, wait_for_inactive_timeout); + dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context); /// Create DROP_RANGE for the source table alter_partition_version_path = zookeeper_path + "/alter_partition_version"; @@ -6812,10 +6770,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); lock1.reset(); - if (query_context->getSettingsRef().replication_alter_partitions_sync == 1) - waitForReplicaToProcessLogEntry(replica_name, entry_delete, wait_for_inactive_timeout); - else if (query_context->getSettingsRef().replication_alter_partitions_sync == 2) - waitForAllReplicasToProcessLogEntry(entry_delete, wait_for_inactive_timeout); + waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context); /// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper. cleanLastPartNode(partition_id); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 7a733c42427..91f055d6aec 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -630,24 +630,27 @@ private: const String & zookeeper_block_id_path = "", const String & zookeeper_path_prefix = "") const; /** Wait until all replicas, including this, execute the specified action from the log. - * If replicas are added at the same time, it can not wait the added replica . + * If replicas are added at the same time, it can not wait the added replica. + * + * Waits for inactive replicas no more than wait_for_inactive_timeout. + * Returns list of inactive replicas that have not executed entry or throws exception. * * NOTE: This method must be called without table lock held. * Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock. - * TODO: There are wrong usages of this method that are not fixed yet. - * - * One method for convenient use on current table, another for waiting on foreign shards. */ - void waitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); - void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); - Strings tryWaitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); - Strings tryWaitForAllTableReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout); + void waitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, + Int64 wait_for_inactive_timeout, const String & error_context = {}); + Strings tryWaitForAllReplicasToProcessLogEntry(const String & table_zookeeper_path, const ReplicatedMergeTreeLogEntryData & entry, + Int64 wait_for_inactive_timeout); /** Wait until the specified replica executes the specified action from the log. * NOTE: See comment about locks above. */ - bool waitForTableReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0); - bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0); + bool tryWaitForReplicaToProcessLogEntry(const String & table_zookeeper_path, const String & replica_name, + const ReplicatedMergeTreeLogEntryData & entry, Int64 wait_for_inactive_timeout = 0); + + /// Depending on settings, do nothing or wait for this replica or all replicas process log entry. + void waitForLogEntryToBeProcessedIfNecessary(const ReplicatedMergeTreeLogEntryData & entry, ContextPtr query_context, const String & error_context = {}); /// Throw an exception if the table is readonly. void assertNotReadonly() const; diff --git a/tests/queries/0_stateless/00721_force_by_identical_result_after_merge_zookeeper_long.sql b/tests/queries/0_stateless/00721_force_by_identical_result_after_merge_zookeeper_long.sql index 50f51510d61..07fba5d39b4 100644 --- a/tests/queries/0_stateless/00721_force_by_identical_result_after_merge_zookeeper_long.sql +++ b/tests/queries/0_stateless/00721_force_by_identical_result_after_merge_zookeeper_long.sql @@ -12,6 +12,7 @@ SYSTEM SYNC REPLICA byte_identical_r2; ALTER TABLE byte_identical_r1 ADD COLUMN y UInt64 DEFAULT rand(); SYSTEM SYNC REPLICA byte_identical_r1; SYSTEM SYNC REPLICA byte_identical_r2; +SET replication_alter_partitions_sync=2; OPTIMIZE TABLE byte_identical_r1 PARTITION tuple() FINAL; SELECT x, t1.y - t2.y FROM byte_identical_r1 t1 SEMI LEFT JOIN byte_identical_r2 t2 USING x ORDER BY x; diff --git a/tests/queries/0_stateless/01532_execute_merges_on_single_replica_long.sql b/tests/queries/0_stateless/01532_execute_merges_on_single_replica_long.sql index 1acae560c93..85a2e893f37 100644 --- a/tests/queries/0_stateless/01532_execute_merges_on_single_replica_long.sql +++ b/tests/queries/0_stateless/01532_execute_merges_on_single_replica_long.sql @@ -98,6 +98,7 @@ SELECT '*** disable the feature'; ALTER TABLE execute_on_single_replica_r1 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0; ALTER TABLE execute_on_single_replica_r2 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0; +SET replication_alter_partitions_sync=2; /* all_0_0_6 - we disabled the feature, both replicas will merge */ OPTIMIZE TABLE execute_on_single_replica_r2 FINAL; /* all_0_0_7 - same */ From 4a4a0b482329c796d4b2d6fa979d021be85910bd Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 23 Aug 2021 16:32:07 +0300 Subject: [PATCH 4/4] fix --- .../MergeTree/PartMovesBetweenShardsOrchestrator.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp index ca51353b1bc..c227febbbc2 100644 --- a/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp +++ b/src/Storages/MergeTree/PartMovesBetweenShardsOrchestrator.cpp @@ -194,7 +194,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: /// This wait in background schedule pool is useless. It'd be /// better to have some notification which will call `step` /// function when all replicated will finish. TODO. - storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, -1); } { @@ -231,7 +231,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1); } { @@ -269,7 +269,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1); } { @@ -318,7 +318,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: String log_znode_path = dynamic_cast(*responses.back()).path_created; log_entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1); - storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(entry.to_shard, log_entry, -1); } { @@ -348,7 +348,7 @@ void PartMovesBetweenShardsOrchestrator::stepEntry(const Entry & entry, zkutil:: { ReplicatedMergeTreeLogEntry log_entry; if (storage.dropPartImpl(zk, entry.part_name, log_entry, false, false)) - storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, 0); + storage.waitForAllReplicasToProcessLogEntry(zookeeper_path, log_entry, -1); } {