From b8b55a5b9911c2e0b551274d507a78cf4dbe739e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 12 Jun 2020 22:04:18 +0300 Subject: [PATCH 01/19] More LeaderElection to Storage/MergeTree --- src/{Common/ZooKeeper => Storages/MergeTree}/LeaderElection.h | 4 ++-- src/Storages/StorageReplicatedMergeTree.h | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename src/{Common/ZooKeeper => Storages/MergeTree}/LeaderElection.h (97%) diff --git a/src/Common/ZooKeeper/LeaderElection.h b/src/Storages/MergeTree/LeaderElection.h similarity index 97% rename from src/Common/ZooKeeper/LeaderElection.h rename to src/Storages/MergeTree/LeaderElection.h index f8a4d56dc76..c94e3e27e5a 100644 --- a/src/Common/ZooKeeper/LeaderElection.h +++ b/src/Storages/MergeTree/LeaderElection.h @@ -1,11 +1,11 @@ #pragma once -#include "ZooKeeper.h" -#include "KeeperException.h" #include #include #include #include +#include +#include #include diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index ec38eb7e842..382cf7ac469 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -19,12 +19,12 @@ #include #include #include +#include #include #include #include #include #include -#include #include #include From 85c0706901de09a6a327d3d953a509a6ef2376b3 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 12 Jun 2020 22:19:01 +0300 Subject: [PATCH 02/19] Step 2: allow multiple leaders --- src/Storages/MergeTree/LeaderElection.h | 45 ++++++++++++------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/src/Storages/MergeTree/LeaderElection.h b/src/Storages/MergeTree/LeaderElection.h index c94e3e27e5a..680ffe4992f 100644 --- a/src/Storages/MergeTree/LeaderElection.h +++ b/src/Storages/MergeTree/LeaderElection.h @@ -23,7 +23,16 @@ namespace CurrentMetrics namespace zkutil { -/** Implements leader election algorithm described here: http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection +/** Initially was used to implement leader election algorithm described here: + * http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection + * + * But then we decided to get rid of leader election, so every replica can become leader. + * For now, every replica can become leader if there is no leader among replicas with old version. + * + * Replicas with old versions participate in leader election with ephemeral sequential nodes. + * If the node is first, then replica is leader. + * Replicas with new versions creates persistent sequential nodes. + * If the first node is persistent, then all replicas with new versions become leaders. */ class LeaderElection { @@ -67,16 +76,13 @@ public: private: DB::BackgroundSchedulePool & pool; DB::BackgroundSchedulePool::TaskHolder task; - std::string path; + const std::string path; ZooKeeper & zookeeper; LeadershipHandler handler; std::string identifier; std::string log_name; Poco::Logger * log; - EphemeralNodeHolderPtr node; - std::string node_name; - std::atomic shutdown_called {false}; CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection}; @@ -84,43 +90,35 @@ private: void createNode() { shutdown_called = false; - node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier); - - std::string node_path = node->getPath(); - node_name = node_path.substr(node_path.find_last_of('/') + 1); - + zookeeper.create(path + "/leader_election-", identifier, CreateMode::PersistentSequential); task->activateAndSchedule(); } void releaseNode() { shutdown(); - node = nullptr; } void threadFunction() { - bool success = false; - try { Strings children = zookeeper.getChildren(path); - std::sort(children.begin(), children.end()); - auto it = std::lower_bound(children.begin(), children.end(), node_name); - if (it == children.end() || *it != node_name) + if (children.empty()) throw Poco::Exception("Assertion failed in LeaderElection"); - if (it == children.begin()) + std::sort(children.begin(), children.end()); + + Coordination::Stat stat; + zookeeper.get(path + "/" + children.front(), &stat); + + if (!stat.ephemeralOwner) { + /// It is sequential node - we can become leader. ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership); handler(); return; } - - if (!zookeeper.existsWatch(path + "/" + *(it - 1), nullptr, task->getWatchCallback())) - task->schedule(); - - success = true; } catch (const KeeperException & e) { @@ -134,8 +132,7 @@ private: DB::tryLogCurrentException(log); } - if (!success) - task->scheduleAfter(10 * 1000); + task->scheduleAfter(10 * 1000); } }; From ab00e343054eb1dee01a0c0d87da5bc2875dfc28 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 12 Jun 2020 22:38:36 +0300 Subject: [PATCH 03/19] Miscellaneous --- src/Storages/MergeTree/LeaderElection.h | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/Storages/MergeTree/LeaderElection.h b/src/Storages/MergeTree/LeaderElection.h index 680ffe4992f..725ab61e877 100644 --- a/src/Storages/MergeTree/LeaderElection.h +++ b/src/Storages/MergeTree/LeaderElection.h @@ -70,7 +70,7 @@ public: ~LeaderElection() { - releaseNode(); + shutdown(); } private: @@ -94,11 +94,6 @@ private: task->activateAndSchedule(); } - void releaseNode() - { - shutdown(); - } - void threadFunction() { try From cfef7ba6920560442543ffb40fb5be89693f3a0c Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 12 Jun 2020 23:23:15 +0300 Subject: [PATCH 04/19] Whitespace --- src/Interpreters/SystemLog.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index d79edde7052..b432cd8803b 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -95,7 +95,6 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi if (asynchronous_metric_log) logs.emplace_back(asynchronous_metric_log.get()); - try { for (auto & log : logs) From 6ff671b092b65b9b016248ebfcc936d4bd5684d7 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 12 Jun 2020 23:32:31 +0300 Subject: [PATCH 05/19] Improvement --- src/Storages/MergeTree/LeaderElection.h | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/LeaderElection.h b/src/Storages/MergeTree/LeaderElection.h index 725ab61e877..36209d6c003 100644 --- a/src/Storages/MergeTree/LeaderElection.h +++ b/src/Storages/MergeTree/LeaderElection.h @@ -29,8 +29,12 @@ namespace zkutil * But then we decided to get rid of leader election, so every replica can become leader. * For now, every replica can become leader if there is no leader among replicas with old version. * + * It's tempting to remove this class at all, but we have to maintain it, + * to maintain compatibility when replicas with different versions work on the same cluster + * (this is allowed for short time period during cluster update). + * * Replicas with old versions participate in leader election with ephemeral sequential nodes. - * If the node is first, then replica is leader. + * If the node is first, then replica is the leader. * Replicas with new versions creates persistent sequential nodes. * If the first node is persistent, then all replicas with new versions become leaders. */ @@ -90,6 +94,17 @@ private: void createNode() { shutdown_called = false; + + /// If there is at least one persistent node, we don't have to create another. + Strings children = zookeeper.getChildren(path); + for (const auto & child : children) + { + Coordination::Stat stat; + zookeeper.get(path + "/" + child, &stat); + if (!stat.ephemeralOwner) + return; + } + zookeeper.create(path + "/leader_election-", identifier, CreateMode::PersistentSequential); task->activateAndSchedule(); } From 21897f2abd451e5c80a9e9cf14bf7ce6f0d83bfe Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 12 Jun 2020 23:38:43 +0300 Subject: [PATCH 06/19] Instrument --- src/Common/ProfileEvents.cpp | 5 +++++ src/Storages/StorageReplicatedMergeTree.cpp | 8 ++++++++ 2 files changed, 13 insertions(+) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 8393ea85112..a0eb7a5fb48 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -203,6 +203,11 @@ \ M(CannotWriteToWriteBufferDiscard, "Number of stack traces dropped by query profiler or signal handler because pipe is full or cannot write to pipe.") \ M(QueryProfilerSignalOverruns, "Number of times we drop processing of a signal due to overrun plus the number of signals that OS has not delivered due to overrun.") \ + \ + M(CreatedLogEntryForMerge, "Successfully created log entry to merge parts in ReplicatedMergeTree.") \ + M(NotCreatedLogEntryForMerge, "Log entry to merge parts in ReplicatedMergeTree is not created due to concurrent log update by another replica.") \ + M(CreatedLogEntryForMutation, "Successfully created log entry to mutate parts in ReplicatedMergeTree.") \ + M(NotCreatedLogEntryForMutation, "Log entry to mutate parts in ReplicatedMergeTree is not created due to concurrent log update by another replica.") \ namespace ProfileEvents { diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 57535466558..eb395ff55c0 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -71,6 +71,10 @@ namespace ProfileEvents extern const Event ReplicatedPartFetches; extern const Event DataAfterMergeDiffersFromReplica; extern const Event DataAfterMutationDiffersFromReplica; + extern const Event CreatedLogEntryForMerge; + extern const Event NotCreatedLogEntryForMerge; + extern const Event CreatedLogEntryForMutation; + extern const Event NotCreatedLogEntryForMutation; } namespace CurrentMetrics @@ -2579,10 +2583,12 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c String path_created = dynamic_cast(*responses.front()).path_created; entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); + ProfileEvents::increment(ProfileEvents::CreatedLogEntryForMerge); LOG_TRACE(log, "Created log entry {} for merge {}", path_created, merged_name); } else if (code == Coordination::Error::ZBADVERSION) { + ProfileEvents::increment(ProfileEvents::NotCreatedLogEntryForMerge); LOG_TRACE(log, "Log entry is not created for merge {} because log was updated", merged_name); return CreateMergeEntryResult::LogUpdated; } @@ -2643,12 +2649,14 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c if (code == Coordination::Error::ZBADVERSION) { + ProfileEvents::increment(ProfileEvents::NotCreatedLogEntryForMutation); LOG_TRACE(log, "Log entry is not created for mutation {} because log was updated", new_part_name); return CreateMergeEntryResult::LogUpdated; } zkutil::KeeperMultiException::check(code, ops, responses); + ProfileEvents::increment(ProfileEvents::CreatedLogEntryForMutation); LOG_TRACE(log, "Created log entry for mutation {}", new_part_name); return CreateMergeEntryResult::Ok; } From 6f0db5ef108b1f13351e4792d427043417187728 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Fri, 12 Jun 2020 23:42:31 +0300 Subject: [PATCH 07/19] Fix error --- src/Storages/MergeTree/LeaderElection.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/LeaderElection.h b/src/Storages/MergeTree/LeaderElection.h index 36209d6c003..ef6b68bbe15 100644 --- a/src/Storages/MergeTree/LeaderElection.h +++ b/src/Storages/MergeTree/LeaderElection.h @@ -102,7 +102,11 @@ private: Coordination::Stat stat; zookeeper.get(path + "/" + child, &stat); if (!stat.ephemeralOwner) + { + ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership); + handler(); return; + } } zookeeper.create(path + "/leader_election-", identifier, CreateMode::PersistentSequential); @@ -124,7 +128,7 @@ private: if (!stat.ephemeralOwner) { - /// It is sequential node - we can become leader. + /// It is persistent node - we can become leader. ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership); handler(); return; From 18f8861fa0df4ac2d1d57c5fc16dca651f4081df Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 13 Jun 2020 00:19:08 +0300 Subject: [PATCH 08/19] Better test --- .../0_stateless/01307_multiple_leaders.sh | 33 ++++++++----------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/tests/queries/0_stateless/01307_multiple_leaders.sh b/tests/queries/0_stateless/01307_multiple_leaders.sh index 0bf5e0b13bf..b16feaeb591 100755 --- a/tests/queries/0_stateless/01307_multiple_leaders.sh +++ b/tests/queries/0_stateless/01307_multiple_leaders.sh @@ -5,35 +5,28 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) set -e -$CLICKHOUSE_CLIENT -n --query " -DROP TABLE IF EXISTS r0; -DROP TABLE IF EXISTS r1; +NUM_REPLICAS=2 +DATA_SIZE=1000 -CREATE TABLE r0 (x UInt64) ENGINE = ReplicatedMergeTree('/test/table', 'r0') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M'; -CREATE TABLE r1 (x UInt64) ENGINE = ReplicatedMergeTree('/test/table', 'r1') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M'; -" +SEQ=$(seq 0 $(($NUM_REPLICAS - 1))) + +for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "DROP TABLE IF EXISTS r$REPLICA"; done +for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE r$REPLICA (x UInt64) ENGINE = ReplicatedMergeTree('/test/table', 'r$REPLICA') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M';"; done function thread() { REPLICA=$1 ITERATIONS=$2 - $CLICKHOUSE_CLIENT --max_block_size 1 --min_insert_block_size_rows 0 --min_insert_block_size_bytes 0 --query "INSERT INTO r$REPLICA SELECT number * 2 + $REPLICA FROM numbers($ITERATIONS)" + $CLICKHOUSE_CLIENT --max_block_size 1 --min_insert_block_size_rows 0 --min_insert_block_size_bytes 0 --query "INSERT INTO r$REPLICA SELECT number * $NUM_REPLICAS + $REPLICA FROM numbers($ITERATIONS)" } - -thread 0 1000 & -thread 1 1000 & +for REPLICA in $SEQ; do + thread $REPLICA $DATA_SIZE & +done wait -$CLICKHOUSE_CLIENT -n --query " -SYSTEM SYNC REPLICA r0; -SYSTEM SYNC REPLICA r1; - -SELECT count(), sum(x) FROM r0; -SELECT count(), sum(x) FROM r1; - -DROP TABLE r0; -DROP TABLE r1; -" +for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "SYSTEM SYNC REPLICA r$REPLICA"; done +for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "SELECT count(), sum(x) FROM r$REPLICA"; done +for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "DROP TABLE r$REPLICA"; done From 1c438a133ea1db5efea9526dbafe5c9c4762a368 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 15 Jun 2020 03:59:12 +0300 Subject: [PATCH 09/19] Leader election both backward and forward compatible --- src/Storages/MergeTree/LeaderElection.h | 68 ++++++++++++++----------- 1 file changed, 39 insertions(+), 29 deletions(-) diff --git a/src/Storages/MergeTree/LeaderElection.h b/src/Storages/MergeTree/LeaderElection.h index ef6b68bbe15..4d3a533d139 100644 --- a/src/Storages/MergeTree/LeaderElection.h +++ b/src/Storages/MergeTree/LeaderElection.h @@ -33,10 +33,8 @@ namespace zkutil * to maintain compatibility when replicas with different versions work on the same cluster * (this is allowed for short time period during cluster update). * - * Replicas with old versions participate in leader election with ephemeral sequential nodes. - * If the node is first, then replica is the leader. - * Replicas with new versions creates persistent sequential nodes. - * If the first node is persistent, then all replicas with new versions become leaders. + * Replicas with new versions creates ephemeral sequential nodes with values like "replica_name (multiple leaders Ok)". + * If the first node belongs to a replica with new version, then all replicas with new versions become leaders. */ class LeaderElection { @@ -55,7 +53,7 @@ public: ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_) - : pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_) + : pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_ + suffix) , log_name("LeaderElection (" + path + ")") , log(&Poco::Logger::get(log_name)) { @@ -74,19 +72,23 @@ public: ~LeaderElection() { - shutdown(); + releaseNode(); } private: + static inline constexpr auto suffix = " (multiple leaders Ok)"; DB::BackgroundSchedulePool & pool; DB::BackgroundSchedulePool::TaskHolder task; - const std::string path; + std::string path; ZooKeeper & zookeeper; LeadershipHandler handler; std::string identifier; std::string log_name; Poco::Logger * log; + EphemeralNodeHolderPtr node; + std::string node_name; + std::atomic shutdown_called {false}; CurrentMetrics::Increment metric_increment{CurrentMetrics::LeaderElection}; @@ -94,45 +96,52 @@ private: void createNode() { shutdown_called = false; + node = EphemeralNodeHolder::createSequential(path + "/leader_election-", zookeeper, identifier); - /// If there is at least one persistent node, we don't have to create another. - Strings children = zookeeper.getChildren(path); - for (const auto & child : children) - { - Coordination::Stat stat; - zookeeper.get(path + "/" + child, &stat); - if (!stat.ephemeralOwner) - { - ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership); - handler(); - return; - } - } + std::string node_path = node->getPath(); + node_name = node_path.substr(node_path.find_last_of('/') + 1); - zookeeper.create(path + "/leader_election-", identifier, CreateMode::PersistentSequential); task->activateAndSchedule(); } + void releaseNode() + { + shutdown(); + node = nullptr; + } + void threadFunction() { + bool success = false; + try { Strings children = zookeeper.getChildren(path); - if (children.empty()) - throw Poco::Exception("Assertion failed in LeaderElection"); - std::sort(children.begin(), children.end()); - Coordination::Stat stat; - zookeeper.get(path + "/" + children.front(), &stat); + auto my_node_it = std::lower_bound(children.begin(), children.end(), node_name); + if (my_node_it == children.end() || *my_node_it != node_name) + throw Poco::Exception("Assertion failed in LeaderElection"); - if (!stat.ephemeralOwner) + String value = zookeeper.get(path + "/" + children.front()); + +#if !defined(ARCADIA_BUILD) /// C++20; Replicated tables are unused in Arcadia. + if (value.ends_with(suffix)) { - /// It is persistent node - we can become leader. ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership); handler(); return; } +#endif + if (my_node_it == children.begin()) + throw Poco::Exception("Assertion failed in LeaderElection"); + + /// Watch for the node in front of us. + --my_node_it; + if (!zookeeper.existsWatch(path + "/" + *my_node_it, nullptr, task->getWatchCallback())) + task->schedule(); + + success = true; } catch (const KeeperException & e) { @@ -146,7 +155,8 @@ private: DB::tryLogCurrentException(log); } - task->scheduleAfter(10 * 1000); + if (!success) + task->scheduleAfter(10 * 1000); } }; From 689b6901f8ac076bcb11249a9a69303b2817e679 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 15 Jun 2020 04:04:42 +0300 Subject: [PATCH 10/19] Fix typo --- src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index d861173d8a0..f867a39581f 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -298,7 +298,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( if (parts_to_merge.empty()) { if (out_disable_reason) - *out_disable_reason = "There are no need to merge parts according to merge selector algorithm"; + *out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; return false; } From b51cbbdf15ea74218792d73168e8596b538f9802 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 15 Jun 2020 04:08:56 +0300 Subject: [PATCH 11/19] Update test --- .../00620_optimize_on_nonleader_replica_zookeeper.sql | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql b/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql index 9622a5bd3c2..f488502b13b 100644 --- a/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql +++ b/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql @@ -1,3 +1,5 @@ +-- The test is mostly outdated as now every replica is leader and can do OPTIMIZE locally. + DROP TABLE IF EXISTS rename1; DROP TABLE IF EXISTS rename2; DROP TABLE IF EXISTS rename3; @@ -14,7 +16,9 @@ SELECT * FROM rename1; RENAME TABLE rename2 TO rename3; INSERT INTO rename1 VALUES (0, 1, 2); +SYSTEM SYNC REPLICA rename3; -- Make "rename3" to see all data parts. OPTIMIZE TABLE rename3; +SYSTEM SYNC REPLICA rename1; -- Make "rename1" to see and process all scheduled merges. SELECT * FROM rename1; DROP TABLE IF EXISTS rename1; From 66ccb2f6b121b1e85cb035d6e6a256617722d4d3 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 15 Jun 2020 04:12:01 +0300 Subject: [PATCH 12/19] Remove "current_password" because it is harmful --- src/Interpreters/ClientInfo.h | 2 -- src/Interpreters/Context.cpp | 1 - src/Storages/StorageReplicatedMergeTree.cpp | 2 +- src/Storages/StorageReplicatedMergeTree.h | 2 ++ 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index 7a4df63c17a..294eb47e3e9 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -47,8 +47,6 @@ public: String current_user; String current_query_id; Poco::Net::SocketAddress current_address; - /// Use current user and password when sending query to replica leader - String current_password; /// When query_kind == INITIAL_QUERY, these values are equal to current. String initial_user; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index cb780443e03..bd99039c36d 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -660,7 +660,6 @@ void Context::setUser(const String & name, const String & password, const Poco:: auto lock = getLock(); client_info.current_user = name; - client_info.current_password = password; client_info.current_address = address; auto new_user_id = getAccessControlManager().find(name); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ac762b3e05f..055e709cbc3 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4416,7 +4416,7 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query const auto & query_settings = query_context.getSettingsRef(); const auto & query_client_info = query_context.getClientInfo(); String user = query_client_info.current_user; - String password = query_client_info.current_password; + String password; if (auto address = findClusterAddress(leader_address); address) { diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 5083abf7ef9..b2bd546b478 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -222,6 +222,7 @@ private: zkutil::EphemeralNodeHolderPtr replica_is_active_node; /** Is this replica "leading". The leader replica selects the parts to merge. + * It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders. */ std::atomic is_leader {false}; zkutil::LeaderElectionPtr leader_election; @@ -497,6 +498,7 @@ private: bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); /// Choose leader replica, send requst to it and wait. + /// Only makes sense when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders. void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context); /// Throw an exception if the table is readonly. From 5866401f6078010e51f31b6b2bed367c0bccca49 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 15 Jun 2020 05:12:06 +0300 Subject: [PATCH 13/19] Less noise in cleanup thread --- .../ReplicatedMergeTreeCleanupThread.cpp | 75 +++++++++++++++---- .../ReplicatedMergeTreeCleanupThread.h | 3 +- 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 1bc132eaba4..0870c0fdf72 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -4,6 +4,7 @@ #include #include +#include #include @@ -85,8 +86,14 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() int children_count = stat.numChildren; - /// We will wait for 1.1 times more records to accumulate than necessary. - if (static_cast(children_count) < storage_settings->min_replicated_logs_to_keep * 1.1) + /// We will wait for 1.05 to 1.15 times more records to accumulate than necessary. + /// Randomization is needed to spread the time when multiple replicas come here. + /// Numbers are arbitrary. + std::uniform_real_distribution distr(1.05, 1.15); + double ratio = distr(rng); + size_t min_replicated_logs_to_keep = storage_settings->min_replicated_logs_to_keep * ratio; + + if (static_cast(children_count) < min_replicated_logs_to_keep) return; Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat); @@ -214,10 +221,15 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() if (entries.empty()) return; - markLostReplicas(host_versions_lost_replicas, log_pointers_candidate_lost_replicas, replicas.size() - num_replicas_were_marked_is_lost, zookeeper); + markLostReplicas( + host_versions_lost_replicas, + log_pointers_candidate_lost_replicas, + replicas.size() - num_replicas_were_marked_is_lost, + zookeeper); Coordination::Requests ops; - for (size_t i = 0; i < entries.size(); ++i) + size_t i = 0; + for (; i < entries.size(); ++i) { ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1)); @@ -229,12 +241,25 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() /// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list. ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", stat.version)); - zookeeper->multi(ops); + + try + { + zookeeper->multi(ops); + } + catch (const zkutil::KeeperMultiException & e) + { + /// Another replica already deleted the same node concurrently. + if (e.code == Coordination::Error::ZNONODE) + break; + + throw; + } ops.clear(); } } - LOG_DEBUG(log, "Removed {} old log entries: {} - {}", entries.size(), entries.front(), entries.back()); + if (i != 0) + LOG_DEBUG(log, "Removed {} old log entries: {} - {}", i, entries[0], entries[i - 1]); } @@ -250,8 +275,10 @@ void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map String replica = pair.first; Coordination::Requests ops; /// If host changed version we can not mark replicas, because replica started to be active. - ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_lost_replicas.at(replica))); - ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1)); + ops.emplace_back(zkutil::makeCheckRequest( + storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_lost_replicas.at(replica))); + ops.emplace_back(zkutil::makeSetRequest( + storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1)); candidate_lost_replicas.push_back(replica); requests.push_back(ops); } @@ -299,14 +326,17 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() /// Use ZooKeeper's first node (last according to time) timestamp as "current" time. Int64 current_time = timed_blocks.front().ctime; - Int64 time_threshold = std::max(static_cast(0), current_time - static_cast(1000 * storage_settings->replicated_deduplication_window_seconds)); + Int64 time_threshold = std::max( + static_cast(0), + current_time - static_cast(1000 * storage_settings->replicated_deduplication_window_seconds)); /// Virtual node, all nodes that are "greater" than this one will be deleted NodeWithStat block_threshold{{}, time_threshold}; size_t current_deduplication_window = std::min(timed_blocks.size(), storage_settings->replicated_deduplication_window); auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window; - auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime); + auto first_outdated_block_time_threshold = std::upper_bound( + timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime); auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold); zkutil::AsyncResponses try_remove_futures; @@ -326,13 +356,16 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() zookeeper->removeRecursive(path); cached_block_stats.erase(first_outdated_block->node); } - else if (rc != Coordination::Error::ZOK) - LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, Coordination::errorMessage(rc)); - else + else if (rc == Coordination::Error::ZOK || rc == Coordination::Error::ZNONODE) { + /// No node is Ok. Another replica is removing nodes concurrently. /// Successfully removed blocks have to be removed from cache cached_block_stats.erase(first_outdated_block->node); } + else + { + LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, Coordination::errorMessage(rc)); + } first_outdated_block++; } @@ -453,8 +486,20 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations() { /// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list. ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", replicas_stat.version)); - zookeeper->multi(ops); - LOG_DEBUG(log, "Removed {} old mutation entries: {} - {}", (i + 1 - batch_start_i), entries[batch_start_i], entries[i]); + try + { + zookeeper->multi(ops); + } + catch (const zkutil::KeeperMultiException & e) + { + /// Another replica already deleted the same node concurrently. + if (e.code == Coordination::Error::ZNONODE) + break; + + throw; + } + LOG_DEBUG(log, "Removed {} old mutation entries: {} - {}", + i + 1 - batch_start_i, entries[batch_start_i], entries[i]); batch_start_i = i + 1; ops.clear(); } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index a787f99d907..f4191482d64 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -36,7 +37,7 @@ private: String log_name; Poco::Logger * log; BackgroundSchedulePool::TaskHolder task; - pcg64 rng; + pcg64 rng{randomSeed()}; void run(); void iterate(); From bbe5f4c9090d7dda7d79281f24b75e39a384aae0 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 15 Jun 2020 05:13:41 +0300 Subject: [PATCH 14/19] Revert "Remove "current_password" because it is harmful" This reverts commit 66ccb2f6b121b1e85cb035d6e6a256617722d4d3. --- src/Interpreters/ClientInfo.h | 2 ++ src/Interpreters/Context.cpp | 1 + src/Storages/StorageReplicatedMergeTree.cpp | 2 +- src/Storages/StorageReplicatedMergeTree.h | 2 -- 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index 294eb47e3e9..7a4df63c17a 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -47,6 +47,8 @@ public: String current_user; String current_query_id; Poco::Net::SocketAddress current_address; + /// Use current user and password when sending query to replica leader + String current_password; /// When query_kind == INITIAL_QUERY, these values are equal to current. String initial_user; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index bd99039c36d..cb780443e03 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -660,6 +660,7 @@ void Context::setUser(const String & name, const String & password, const Poco:: auto lock = getLock(); client_info.current_user = name; + client_info.current_password = password; client_info.current_address = address; auto new_user_id = getAccessControlManager().find(name); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 055e709cbc3..ac762b3e05f 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4416,7 +4416,7 @@ void StorageReplicatedMergeTree::sendRequestToLeaderReplica(const ASTPtr & query const auto & query_settings = query_context.getSettingsRef(); const auto & query_client_info = query_context.getClientInfo(); String user = query_client_info.current_user; - String password; + String password = query_client_info.current_password; if (auto address = findClusterAddress(leader_address); address) { diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index b2bd546b478..5083abf7ef9 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -222,7 +222,6 @@ private: zkutil::EphemeralNodeHolderPtr replica_is_active_node; /** Is this replica "leading". The leader replica selects the parts to merge. - * It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders. */ std::atomic is_leader {false}; zkutil::LeaderElectionPtr leader_election; @@ -498,7 +497,6 @@ private: bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); /// Choose leader replica, send requst to it and wait. - /// Only makes sense when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders. void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context); /// Throw an exception if the table is readonly. From d2c66f96881bcdc18248711a2db3ed6e953437c3 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 15 Jun 2020 05:14:59 +0300 Subject: [PATCH 15/19] Added comments --- src/Storages/StorageReplicatedMergeTree.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 5083abf7ef9..b2bd546b478 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -222,6 +222,7 @@ private: zkutil::EphemeralNodeHolderPtr replica_is_active_node; /** Is this replica "leading". The leader replica selects the parts to merge. + * It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders. */ std::atomic is_leader {false}; zkutil::LeaderElectionPtr leader_election; @@ -497,6 +498,7 @@ private: bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); /// Choose leader replica, send requst to it and wait. + /// Only makes sense when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders. void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context); /// Throw an exception if the table is readonly. From 1ab599b0a0598be53720765bebc0565d222addaa Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Mon, 15 Jun 2020 05:17:08 +0300 Subject: [PATCH 16/19] Remove "current_password" but keep it for Arcadians --- src/Interpreters/ClientInfo.h | 3 ++- src/Interpreters/Context.cpp | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index 7a4df63c17a..704fba3b3ef 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -47,7 +47,8 @@ public: String current_user; String current_query_id; Poco::Net::SocketAddress current_address; - /// Use current user and password when sending query to replica leader + + /// This field is only used in foreign "Arcadia" build. String current_password; /// When query_kind == INITIAL_QUERY, these values are equal to current. diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index cb780443e03..02060534aef 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -660,9 +660,13 @@ void Context::setUser(const String & name, const String & password, const Poco:: auto lock = getLock(); client_info.current_user = name; - client_info.current_password = password; client_info.current_address = address; +#if defined(ARCADIA_BUILD) + /// This is harmful field that is used only in foreign "Arcadia" build. + client_info.current_password = password; +#endif + auto new_user_id = getAccessControlManager().find(name); std::shared_ptr new_access; if (new_user_id) From bd330cfeb61dafed0c8c09e583235476eba7c279 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 16 Jun 2020 05:56:50 +0300 Subject: [PATCH 17/19] Update test --- tests/integration/test_quorum_inserts/test.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_quorum_inserts/test.py b/tests/integration/test_quorum_inserts/test.py index 27901842692..607fe93f1ef 100644 --- a/tests/integration/test_quorum_inserts/test.py +++ b/tests/integration/test_quorum_inserts/test.py @@ -284,11 +284,14 @@ def test_insert_quorum_with_ttl(started_cluster): zero.query("INSERT INTO test_insert_quorum_with_ttl(a,d) VALUES(1, '2011-01-01')", settings={'insert_quorum_timeout' : 5000}) - - assert TSV("1\t2011-01-01\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 0})) - assert TSV("1\t2011-01-01\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 1})) - print("Inserts should resume.") zero.query("INSERT INTO test_insert_quorum_with_ttl(a, d) VALUES(2, '2012-02-02')") + first.query("OPTIMIZE TABLE test_insert_quorum_with_ttl") + first.query("SYSTEM SYNC REPLICA test_insert_quorum_with_ttl") + zero.query("SYSTEM SYNC REPLICA test_insert_quorum_with_ttl") + + assert TSV("2\t2012-02-02\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 0})) + assert TSV("2\t2012-02-02\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 1})) + execute_on_all_cluster("DROP TABLE IF EXISTS test_insert_quorum_with_ttl") From bf4c82dbd88f19ac9458fd23057afcf80f1622c8 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 16 Jun 2020 22:04:19 +0300 Subject: [PATCH 18/19] Update test --- tests/integration/test_quorum_inserts/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_quorum_inserts/test.py b/tests/integration/test_quorum_inserts/test.py index 607fe93f1ef..ba0ab044643 100644 --- a/tests/integration/test_quorum_inserts/test.py +++ b/tests/integration/test_quorum_inserts/test.py @@ -256,7 +256,7 @@ def test_insert_quorum_with_ttl(started_cluster): "(a Int8, d Date) " \ "Engine = ReplicatedMergeTree('/clickhouse/tables/{table}', '{replica}') " \ "PARTITION BY d ORDER BY a " \ - "TTL d + INTERVAL 5 second " \ + "TTL d + INTERVAL 5 second DELETE WHERE toYear(d) = 2011" \ "SETTINGS merge_with_ttl_timeout=2 " print("Create Replicated table with two replicas") From 2024d2de5426109bd3abdca360099a942d00d925 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 16 Jun 2020 22:10:25 +0300 Subject: [PATCH 19/19] Update test --- tests/integration/test_quorum_inserts/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_quorum_inserts/test.py b/tests/integration/test_quorum_inserts/test.py index ba0ab044643..f490c13ca27 100644 --- a/tests/integration/test_quorum_inserts/test.py +++ b/tests/integration/test_quorum_inserts/test.py @@ -256,7 +256,7 @@ def test_insert_quorum_with_ttl(started_cluster): "(a Int8, d Date) " \ "Engine = ReplicatedMergeTree('/clickhouse/tables/{table}', '{replica}') " \ "PARTITION BY d ORDER BY a " \ - "TTL d + INTERVAL 5 second DELETE WHERE toYear(d) = 2011" \ + "TTL d + INTERVAL 5 second DELETE WHERE toYear(d) = 2011 " \ "SETTINGS merge_with_ttl_timeout=2 " print("Create Replicated table with two replicas")