From 97fef77ed1e530470571925ed004284178e8b790 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Fri, 18 Sep 2020 12:57:33 +0200 Subject: [PATCH] execute_merges_on_single_replica --- src/Storages/MergeTree/MergeTreeSettings.h | 1 + ...ReplicatedMergeTreeMergeStrategyPicker.cpp | 152 ++++++++++++++++++ .../ReplicatedMergeTreeMergeStrategyPicker.h | 66 ++++++++ .../MergeTree/ReplicatedMergeTreeQueue.cpp | 25 ++- .../MergeTree/ReplicatedMergeTreeQueue.h | 4 +- src/Storages/StorageReplicatedMergeTree.cpp | 23 ++- src/Storages/StorageReplicatedMergeTree.h | 8 +- src/Storages/ya.make | 1 + ...execute_merges_on_single_replica.reference | 77 +++++++++ ...01532_execute_merges_on_single_replica.sql | 127 +++++++++++++++ 10 files changed, 478 insertions(+), 6 deletions(-) create mode 100644 src/Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.cpp create mode 100644 src/Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h create mode 100644 tests/queries/0_stateless/01532_execute_merges_on_single_replica.reference create mode 100644 tests/queries/0_stateless/01532_execute_merges_on_single_replica.sql diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 6f0401e061b..c3e5c27e177 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -62,6 +62,7 @@ struct Settings; M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \ M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \ M(UInt64, prefer_fetch_merged_part_size_threshold, 10ULL * 1024 * 1024 * 1024, "If sum size of parts exceeds this threshold and time passed after replication log entry creation is greater than \"prefer_fetch_merged_part_time_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \ + M(Seconds, execute_merges_on_single_replica_time_threshold, 0, "When greater than zero only a single replica starts the merge immediately, others wait up to that amount of time to download the result instead of doing merges locally. If the chosen replica doesn't finish the merge during that amount of time, fallback to standard behavior happens.", 0) \ M(Seconds, try_fetch_recompressed_part_timeout, 7200, "Recompression works slow in most cases, so we don't start merge with recompression until this timeout and trying to fetch recompressed part from replica which assigned this merge with recompression.", 0) \ M(Bool, always_fetch_merged_part, 0, "If true, replica never merge parts and always download merged parts from other replicas.", 0) \ M(UInt64, max_suspicious_broken_parts, 10, "Max broken parts, if more - deny automatic deletion.", 0) \ diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.cpp new file mode 100644 index 00000000000..1653d723b77 --- /dev/null +++ b/src/Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.cpp @@ -0,0 +1,152 @@ +#include +#include +#include + + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +// minimum interval (seconds) between checks if chosen replica finished the merge. +static const auto RECHECK_MERGE_READYNESS_INTERVAL_SECONDS = 1; + +// don't refresh state too often (to limit number of zookeeper ops) +static const auto REFRESH_STATE_MINIMUM_INTERVAL_SECONDS = 5; + +// refresh the state automatically it it was not refreshed for a longer time +static const auto REFRESH_STATE_MAXIMUM_INTERVAL_SECONDS = 30; + + +ReplicatedMergeTreeMergeStrategyPicker::ReplicatedMergeTreeMergeStrategyPicker(StorageReplicatedMergeTree & storage_) + : storage(storage_) +{} + + +bool ReplicatedMergeTreeMergeStrategyPicker::isMergeFinishedByReplica(const String & replica, const ReplicatedMergeTreeLogEntryData & entry) +{ + // those have only seconds resolution, so recheck period is quite rough + auto reference_timestamp = entry.last_postpone_time; + if (reference_timestamp == 0) + reference_timestamp = entry.create_time; + + // we don't want to check zookeeper too frequent + if (time(nullptr) - reference_timestamp >= RECHECK_MERGE_READYNESS_INTERVAL_SECONDS) + { + return storage.checkReplicaHavePart(replica, entry.new_part_name); + } + + return false; +} + + +// that will return the same replica name for ReplicatedMergeTreeLogEntry on all the replicas (if the replica set is the same). +// that way each replica knows who is responsible for doing a certain merge. + +// in some corner cases (added / removed / deactivated replica) +// nodes can pick different replicas to execute merge and wait for it (or to execute the same merge together) +// but that doesn't have a significant impact (in one case it will wait for the execute_merges_on_single_replica_time_threshold, +// in another just 2 replicas will do the merge) +std::optional ReplicatedMergeTreeMergeStrategyPicker::pickReplicaToExecuteMerge(const ReplicatedMergeTreeLogEntryData & entry) +{ + time_t threshold = execute_merges_on_single_replica_time_threshold; + if ( + threshold == 0 // feature turned off + || entry.type != ReplicatedMergeTreeLogEntry::MERGE_PARTS // it is not a merge log entry + ) + return std::nullopt; + + auto now = time(nullptr); + + if (entry.create_time + threshold <= now) // too much time waited + return std::nullopt; + + if (now - last_refresh_time > REFRESH_STATE_MAXIMUM_INTERVAL_SECONDS) // last state refresh was too long ago, need to sync up the replicas list + refreshState(); + + auto hash = getEntryHash(entry); + + std::lock_guard lock(mutex); + + auto num_replicas = active_replicas.size(); + + if (num_replicas==0) + return std::nullopt; + + auto replica_index = static_cast(hash % num_replicas); + + if (replica_index == current_replica_index) + return std::nullopt; + + return active_replicas.at(replica_index); +} + + +void ReplicatedMergeTreeMergeStrategyPicker::refreshState() +{ + auto threshold = storage.getSettings()->execute_merges_on_single_replica_time_threshold.totalSeconds(); + + auto now = time(nullptr); + + // the setting is not changed, and last state refresh was done recently + if (threshold == execute_merges_on_single_replica_time_threshold + && now - last_refresh_time < REFRESH_STATE_MINIMUM_INTERVAL_SECONDS) + return; + + if (threshold == 0) + { + // we can reset the settings w/o lock (it's atomic) + execute_merges_on_single_replica_time_threshold = threshold; + return; + } + + auto zookeeper = storage.getZooKeeper(); + auto all_replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas"); + + // TODO: do we need that sort or we can rely that zookeeper will return them in deterministic order? + std::sort(all_replicas.begin(), all_replicas.end()); + + std::vector active_replicas_tmp; + int current_replica_index_tmp = -1; + + for (const String & replica : all_replicas) + { + if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active")) + { + active_replicas_tmp.push_back(replica); + if (replica == storage.replica_name) + { + current_replica_index_tmp = active_replicas_tmp.size() - 1; + } + } + } + + if (current_replica_index_tmp < 0 || active_replicas_tmp.size() < 2) + { + LOG_ERROR(storage.log, "Can't find current replica in the active replicas list, or too few active replicas to use execute_merges_on_single_replica_time_threshold!"); + // we can reset the settings w/o lock (it's atomic) + execute_merges_on_single_replica_time_threshold = 0; + return; + } + + std::lock_guard lock(mutex); + execute_merges_on_single_replica_time_threshold = threshold; + last_refresh_time = now; + current_replica_index = current_replica_index_tmp; + active_replicas = active_replicas_tmp; +} + + +uint64_t ReplicatedMergeTreeMergeStrategyPicker::getEntryHash(const ReplicatedMergeTreeLogEntryData & entry) const +{ + auto hash_data = storage.zookeeper_path + entry.new_part_name; + return CityHash_v1_0_2::CityHash64(hash_data.c_str(), hash_data.length()); +} + + +} diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h b/src/Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h new file mode 100644 index 00000000000..cbec7fcb85d --- /dev/null +++ b/src/Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h @@ -0,0 +1,66 @@ +#pragma once + +#include +// #include +// #include +#include +#include +#include +#include +#include + +namespace DB +{ + +class StorageReplicatedMergeTree; +struct ReplicatedMergeTreeLogEntryData; + +/// In some use cases merging can be more expensive than fetching +/// (so instead of doing exactly the same merge cluster-wise you can do merge once and fetch ready part) +/// Fetches may be desirable for other operational reasons (backup replica without lot of CPU resources). +/// +/// That class allow to take a decisions about preferred strategy for a concreate merge. +/// +/// Since that code is used in shouldExecuteLogEntry we need to be able to: +/// 1) make decision fast +/// 2) avoid excessive zookeeper operations +/// +/// Because of that we need to cache some important things, +/// like list of active replicas (to limit the number of zookeeper operations) +/// +/// That means we need to refresh the state of that object regularly +class ReplicatedMergeTreeMergeStrategyPicker: public boost::noncopyable +{ +public: + ReplicatedMergeTreeMergeStrategyPicker(StorageReplicatedMergeTree & storage_); + + /// triggers refreshing the cached state (list of replicas etc.) + /// used when we get new merge event from the zookeeper queue ( see queueUpdatingTask() etc ) + void refreshState(); + + /// returns the replica name in the case when feature is active + /// and it's not current replica should do the merge + /// used in shouldExecuteLogEntry and in tryExecuteMerge + std::optional pickReplicaToExecuteMerge(const ReplicatedMergeTreeLogEntryData & entry); + + /// checks (in zookeeper) if the picked replica finished the merge + bool isMergeFinishedByReplica(const String & replica, const ReplicatedMergeTreeLogEntryData & entry); + +private: + StorageReplicatedMergeTree & storage; + + /// calculate entry hash based on zookeeper path and new part name + uint64_t getEntryHash(const ReplicatedMergeTreeLogEntryData & entry) const; + + std::atomic execute_merges_on_single_replica_time_threshold = 0; + std::atomic last_refresh_time = 0; + + std::mutex mutex; + // those 2 member accessed under the mutex, only when + // execute_merges_on_single_replica_time_threshold enabled + int current_replica_index = -1; + std::vector active_replicas; + +}; + +} diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 7a7f5ee3feb..f9dc3ebe808 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include @@ -20,8 +21,9 @@ namespace ErrorCodes } -ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_) +ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_) : storage(storage_) + , merge_strategy_picker(merge_strategy_picker_) , format_version(storage.format_version) , current_parts(format_version) , virtual_parts(format_version) @@ -120,6 +122,8 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper) updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, {}); + merge_strategy_picker.refreshState(); + LOG_TRACE(log, "Loaded queue"); return updated; } @@ -587,7 +591,15 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper } if (!copied_entries.empty()) + { LOG_DEBUG(log, "Pulled {} entries to queue.", copied_entries.size()); + + /// to limit the number of zookeeper operations MergeStrategyPicker state is updated only + /// when new merges appear. + auto operations_in_queue = countMergesAndPartMutations(); + if (operations_in_queue.merges > 0) + merge_strategy_picker.refreshState(); + } } storage.background_executor.triggerTask(); @@ -1088,6 +1100,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( return false; } + auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry); + + if (replica_to_execute_merge && !merge_strategy_picker.isMergeFinishedByReplica(replica_to_execute_merge.value(), entry)) + { + String reason = "Not executing merge for the part " + entry.new_part_name + + ", waiting for " + replica_to_execute_merge.value() + " to execute merge."; + LOG_DEBUG(log, reason); + out_postpone_reason = reason; + return false; + } + UInt64 max_source_parts_size = entry.type == LogEntry::MERGE_PARTS ? merger_mutator.getMaxSourcePartsSizeForMerge() : merger_mutator.getMaxSourcePartSizeForMutation(); /** If there are enough free threads in background pool to do large merges (maximal size of merge is allowed), diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 60f667d26b8..6d5fab744a5 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -22,6 +22,7 @@ class StorageReplicatedMergeTree; class MergeTreeDataMergerMutator; class ReplicatedMergeTreeMergePredicate; +class ReplicatedMergeTreeMergeStrategyPicker; class ReplicatedMergeTreeQueue @@ -57,6 +58,7 @@ private: using InsertsByTime = std::set; StorageReplicatedMergeTree & storage; + ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker; MergeTreeDataFormatVersion format_version; String zookeeper_path; @@ -275,7 +277,7 @@ private: size_t current_multi_batch_size = 1; public: - ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_); + ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_); ~ReplicatedMergeTreeQueue(); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index f41515331f5..e497f365110 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -139,7 +139,6 @@ static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000; static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000; static const auto MUTATIONS_FINALIZING_IDLE_SLEEP_MS = 5 * 1000; - void StorageReplicatedMergeTree::setZooKeeper(zkutil::ZooKeeperPtr zookeeper) { std::lock_guard lock(current_zookeeper_mutex); @@ -202,7 +201,8 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( , reader(*this) , writer(*this) , merger_mutator(*this, global_context.getSettingsRef().background_pool_size) - , queue(*this) + , merge_strategy_picker(*this) + , queue(*this, merge_strategy_picker) , fetcher(*this) , background_executor(*this, global_context) , background_moves_executor(*this, global_context) @@ -1361,6 +1361,16 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) return false; } + /// In some use cases merging can be more expensive than fetching + /// and it may be better to spread merges tasks across the replicas + /// instead of doing exactly the same merge cluster-wise + auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry); + if (replica_to_execute_merge) + { + LOG_DEBUG(log, "Prefer fetching part {} from replica {} due execute_merges_on_single_replica_time_threshold", entry.new_part_name, replica_to_execute_merge.value()); + return false; + } + DataPartsVector parts; bool have_all_parts = true; for (const String & name : entry.source_parts) @@ -3011,6 +3021,11 @@ void StorageReplicatedMergeTree::exitLeaderElection() leader_election = nullptr; } +bool StorageReplicatedMergeTree::checkReplicaHavePart(const String & replica, const String & part_name) +{ + auto zookeeper = getZooKeeper(); + return zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name); +} String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_name, bool active) { @@ -3026,7 +3041,7 @@ String StorageReplicatedMergeTree::findReplicaHavingPart(const String & part_nam if (replica == replica_name) continue; - if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/parts/" + part_name) && + if (checkReplicaHavePart(replica, part_name) && (!active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))) return replica; @@ -4016,6 +4031,8 @@ void StorageReplicatedMergeTree::alter( StorageInMemoryMetadata future_metadata = getInMemoryMetadata(); commands.apply(future_metadata, query_context); + merge_strategy_picker.refreshState(); + changeSettings(future_metadata.settings_changes, table_lock_holder); DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(query_context, table_id, future_metadata); diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 39da562b346..3b4a80bd3bf 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -210,7 +211,6 @@ public: bool canExecuteFetch(const ReplicatedMergeTreeLogEntry & entry, String & disable_reason) const; private: - /// Get a sequential consistent view of current parts. ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock getMaxAddedBlocks() const; @@ -222,11 +222,13 @@ private: friend class ReplicatedMergeTreeCleanupThread; friend class ReplicatedMergeTreeAlterThread; friend class ReplicatedMergeTreeRestartingThread; + friend class ReplicatedMergeTreeMergeStrategyPicker; friend struct ReplicatedMergeTreeLogEntry; friend class ScopedPartitionMergeLock; friend class ReplicatedMergeTreeQueue; friend class MergeTreeData; + using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker; using LogEntry = ReplicatedMergeTreeLogEntry; using LogEntryPtr = LogEntry::Ptr; @@ -262,6 +264,8 @@ private: MergeTreeDataWriter writer; MergeTreeDataMergerMutator merger_mutator; + MergeStrategyPicker merge_strategy_picker; + /** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/). * In ZK entries in chronological order. Here it is not necessary. */ @@ -478,6 +482,8 @@ private: */ String findReplicaHavingPart(const String & part_name, bool active); + bool checkReplicaHavePart(const String & replica, const String & part_name); + /** Find replica having specified part or any part that covers it. * If active = true, consider only active replicas. * If found, returns replica name and set 'entry->actual_new_part_name' to name of found largest covering part. diff --git a/src/Storages/ya.make b/src/Storages/ya.make index d6d55d6db81..27aa9e3ac3f 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -94,6 +94,7 @@ SRCS( MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp MergeTree/ReplicatedMergeTreeCleanupThread.cpp MergeTree/ReplicatedMergeTreeLogEntry.cpp + MergeTree/ReplicatedMergeTreeMergeStrategyPicker.cpp MergeTree/ReplicatedMergeTreeMutationEntry.cpp MergeTree/ReplicatedMergeTreePartCheckThread.cpp MergeTree/ReplicatedMergeTreePartHeader.cpp diff --git a/tests/queries/0_stateless/01532_execute_merges_on_single_replica.reference b/tests/queries/0_stateless/01532_execute_merges_on_single_replica.reference new file mode 100644 index 00000000000..171ce28cbd7 --- /dev/null +++ b/tests/queries/0_stateless/01532_execute_merges_on_single_replica.reference @@ -0,0 +1,77 @@ +############################ +### emulate normal feature operation - merges are distributed between replicas +############################ +### emulate execute_merges_on_single_replica_time_threshold timeout +############################ +### timeout not exceeded, r1 waits for r2 +Row 1: +────── +table: execute_on_single_replica_r1 +type: MERGE_PARTS +new_part_name: all_0_0_5 +has_postpones: 1 +postpone_reason: Not executing merge for the part all_0_0_5, waiting for r2 to execute merge. + +Row 2: +────── +table: execute_on_single_replica_r2 +type: MERGE_PARTS +new_part_name: all_0_0_5 +has_postpones: 0 +postpone_reason: +############################ +### timeout exceeded, r1 failed to get the merged part from r2 and did the merge by its own +Row 1: +────── +table: execute_on_single_replica_r2 +type: MERGE_PARTS +new_part_name: all_0_0_5 +has_postpones: 0 +postpone_reason: +############################ +### queue unfreeze +############################ +### disable the feature +############################ +### part_log +Row 1: +────── +part_name: all_0_0_1 +mergers: ['execute_on_single_replica_r1'] +fetchers: ['execute_on_single_replica_r2'] + +Row 2: +────── +part_name: all_0_0_2 +mergers: ['execute_on_single_replica_r1'] +fetchers: ['execute_on_single_replica_r2'] + +Row 3: +────── +part_name: all_0_0_3 +mergers: ['execute_on_single_replica_r2'] +fetchers: ['execute_on_single_replica_r1'] + +Row 4: +────── +part_name: all_0_0_4 +mergers: ['execute_on_single_replica_r2'] +fetchers: ['execute_on_single_replica_r1'] + +Row 5: +────── +part_name: all_0_0_5 +mergers: ['execute_on_single_replica_r1','execute_on_single_replica_r2'] +fetchers: [] + +Row 6: +────── +part_name: all_0_0_6 +mergers: ['execute_on_single_replica_r1','execute_on_single_replica_r2'] +fetchers: [] + +Row 7: +────── +part_name: all_0_0_7 +mergers: ['execute_on_single_replica_r1','execute_on_single_replica_r2'] +fetchers: [] diff --git a/tests/queries/0_stateless/01532_execute_merges_on_single_replica.sql b/tests/queries/0_stateless/01532_execute_merges_on_single_replica.sql new file mode 100644 index 00000000000..c22f9d38eae --- /dev/null +++ b/tests/queries/0_stateless/01532_execute_merges_on_single_replica.sql @@ -0,0 +1,127 @@ +DROP TABLE IF EXISTS execute_on_single_replica_r1 NO DELAY; +DROP TABLE IF EXISTS execute_on_single_replica_r2 NO DELAY; + +/* that test requires fixed zookeeper path */ +CREATE TABLE execute_on_single_replica_r1 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01532/execute_on_single_replica', 'r1') ORDER BY tuple() SETTINGS execute_merges_on_single_replica_time_threshold=10; +CREATE TABLE execute_on_single_replica_r2 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01532/execute_on_single_replica', 'r2') ORDER BY tuple() SETTINGS execute_merges_on_single_replica_time_threshold=10; + +INSERT INTO execute_on_single_replica_r1 VALUES (1); +SYSTEM SYNC REPLICA execute_on_single_replica_r2; + +SET optimize_throw_if_noop=1; + +SELECT '############################'; +SELECT '### emulate normal feature operation - merges are distributed between replicas'; + +/* all_0_0_1 - will be merged by r1, and downloaded by r2 */ +OPTIMIZE TABLE execute_on_single_replica_r1 FINAL; +SYSTEM SYNC REPLICA execute_on_single_replica_r2; + +/* all_0_0_2 - will be merged by r1, and downloaded by r2 */ +OPTIMIZE TABLE execute_on_single_replica_r2 FINAL; +SYSTEM SYNC REPLICA execute_on_single_replica_r1; + +/* all_0_0_3 - will be merged by r2, and downloaded by r1 */ +OPTIMIZE TABLE execute_on_single_replica_r1 FINAL; +SYSTEM SYNC REPLICA execute_on_single_replica_r2; + +/* all_0_0_4 - will be merged by r2, and downloaded by r1 */ +OPTIMIZE TABLE execute_on_single_replica_r2 FINAL; +SYSTEM SYNC REPLICA execute_on_single_replica_r1; + +SELECT '############################'; +SELECT '### emulate execute_merges_on_single_replica_time_threshold timeout'; + +SYSTEM STOP REPLICATION QUEUES execute_on_single_replica_r2; + +/* all_0_0_5 - should be merged by r2, but it has replication queue stopped, so r1 do the merge */ +OPTIMIZE TABLE execute_on_single_replica_r1 FINAL SETTINGS replication_alter_partitions_sync=0; + +/* if we will check immediately we can find the log entry unchecked */ +SELECT * FROM numbers(4) where sleepEachRow(1); + +SELECT '############################'; +SELECT '### timeout not exceeded, r1 waits for r2'; + +/* we can now check that r1 waits for r2 */ +SELECT + table, + type, + new_part_name, + num_postponed > 0 AS has_postpones, + postpone_reason +FROM system.replication_queue +WHERE table LIKE 'execute\\_on\\_single\\_replica\\_r%' +AND database = currentDatabase() +ORDER BY table +FORMAT Vertical; + +/* we have execute_merges_on_single_replica_time_threshold exceeded */ +SELECT * FROM numbers(10) where sleepEachRow(1); + +SELECT '############################'; +SELECT '### timeout exceeded, r1 failed to get the merged part from r2 and did the merge by its own'; + +SELECT + table, + type, + new_part_name, + num_postponed > 0 AS has_postpones, + postpone_reason +FROM system.replication_queue +WHERE table LIKE 'execute\\_on\\_single\\_replica\\_r%' +AND database = currentDatabase() +ORDER BY table +FORMAT Vertical; + +SYSTEM START REPLICATION QUEUES execute_on_single_replica_r2; +SYSTEM SYNC REPLICA execute_on_single_replica_r2; + +SELECT '############################'; +SELECT '### queue unfreeze'; + +SELECT + table, + type, + new_part_name, + num_postponed > 0 AS has_postpones, + postpone_reason +FROM system.replication_queue +WHERE table LIKE 'execute\\_on\\_single\\_replica\\_r%' +AND database = currentDatabase() +ORDER BY table +FORMAT Vertical; + +SELECT '############################'; +SELECT '### disable the feature'; + +ALTER TABLE execute_on_single_replica_r1 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0; +ALTER TABLE execute_on_single_replica_r2 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0; + +/* all_0_0_6 - we disabled the feature, both replicas will merge */ +OPTIMIZE TABLE execute_on_single_replica_r2 FINAL; +/* all_0_0_7 - same */ +OPTIMIZE TABLE execute_on_single_replica_r1 FINAL; + +SYSTEM SYNC REPLICA execute_on_single_replica_r1; +SYSTEM SYNC REPLICA execute_on_single_replica_r2; + +SYSTEM FLUSH LOGS; + +SELECT '############################'; +SELECT '### part_log'; +SELECT + part_name, + arraySort(groupArrayIf(table, event_type = 'MergeParts')) AS mergers, + arraySort(groupArrayIf(table, event_type = 'DownloadPart')) AS fetchers +FROM system.part_log +WHERE (event_time > (now() - 40)) + AND (table LIKE 'execute\\_on\\_single\\_replica\\_r%') + AND (part_name NOT LIKE '%\\_0') + AND (database = currentDatabase()) +GROUP BY part_name +ORDER BY part_name +FORMAT Vertical; + +DROP TABLE execute_on_single_replica_r1 NO DELAY; +DROP TABLE execute_on_single_replica_r2 NO DELAY; \ No newline at end of file