diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 8393ea85112..a0eb7a5fb48 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -203,6 +203,11 @@ \ M(CannotWriteToWriteBufferDiscard, "Number of stack traces dropped by query profiler or signal handler because pipe is full or cannot write to pipe.") \ M(QueryProfilerSignalOverruns, "Number of times we drop processing of a signal due to overrun plus the number of signals that OS has not delivered due to overrun.") \ + \ + M(CreatedLogEntryForMerge, "Successfully created log entry to merge parts in ReplicatedMergeTree.") \ + M(NotCreatedLogEntryForMerge, "Log entry to merge parts in ReplicatedMergeTree is not created due to concurrent log update by another replica.") \ + M(CreatedLogEntryForMutation, "Successfully created log entry to mutate parts in ReplicatedMergeTree.") \ + M(NotCreatedLogEntryForMutation, "Log entry to mutate parts in ReplicatedMergeTree is not created due to concurrent log update by another replica.") \ namespace ProfileEvents { diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index 7a4df63c17a..704fba3b3ef 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -47,7 +47,8 @@ public: String current_user; String current_query_id; Poco::Net::SocketAddress current_address; - /// Use current user and password when sending query to replica leader + + /// This field is only used in foreign "Arcadia" build. String current_password; /// When query_kind == INITIAL_QUERY, these values are equal to current. diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index cb780443e03..02060534aef 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -660,9 +660,13 @@ void Context::setUser(const String & name, const String & password, const Poco:: auto lock = getLock(); client_info.current_user = name; - client_info.current_password = password; client_info.current_address = address; +#if defined(ARCADIA_BUILD) + /// This is harmful field that is used only in foreign "Arcadia" build. + client_info.current_password = password; +#endif + auto new_user_id = getAccessControlManager().find(name); std::shared_ptr new_access; if (new_user_id) diff --git a/src/Interpreters/SystemLog.cpp b/src/Interpreters/SystemLog.cpp index d79edde7052..b432cd8803b 100644 --- a/src/Interpreters/SystemLog.cpp +++ b/src/Interpreters/SystemLog.cpp @@ -95,7 +95,6 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi if (asynchronous_metric_log) logs.emplace_back(asynchronous_metric_log.get()); - try { for (auto & log : logs) diff --git a/src/Common/ZooKeeper/LeaderElection.h b/src/Storages/MergeTree/LeaderElection.h similarity index 65% rename from src/Common/ZooKeeper/LeaderElection.h rename to src/Storages/MergeTree/LeaderElection.h index f8a4d56dc76..4d3a533d139 100644 --- a/src/Common/ZooKeeper/LeaderElection.h +++ b/src/Storages/MergeTree/LeaderElection.h @@ -1,11 +1,11 @@ #pragma once -#include "ZooKeeper.h" -#include "KeeperException.h" #include #include #include #include +#include +#include #include @@ -23,7 +23,18 @@ namespace CurrentMetrics namespace zkutil { -/** Implements leader election algorithm described here: http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection +/** Initially was used to implement leader election algorithm described here: + * http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection + * + * But then we decided to get rid of leader election, so every replica can become leader. + * For now, every replica can become leader if there is no leader among replicas with old version. + * + * It's tempting to remove this class at all, but we have to maintain it, + * to maintain compatibility when replicas with different versions work on the same cluster + * (this is allowed for short time period during cluster update). + * + * Replicas with new versions creates ephemeral sequential nodes with values like "replica_name (multiple leaders Ok)". + * If the first node belongs to a replica with new version, then all replicas with new versions become leaders. */ class LeaderElection { @@ -42,7 +53,7 @@ public: ZooKeeper & zookeeper_, LeadershipHandler handler_, const std::string & identifier_) - : pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_) + : pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_ + suffix) , log_name("LeaderElection (" + path + ")") , log(&Poco::Logger::get(log_name)) { @@ -65,6 +76,7 @@ public: } private: + static inline constexpr auto suffix = " (multiple leaders Ok)"; DB::BackgroundSchedulePool & pool; DB::BackgroundSchedulePool::TaskHolder task; std::string path; @@ -106,18 +118,27 @@ private: { Strings children = zookeeper.getChildren(path); std::sort(children.begin(), children.end()); - auto it = std::lower_bound(children.begin(), children.end(), node_name); - if (it == children.end() || *it != node_name) + + auto my_node_it = std::lower_bound(children.begin(), children.end(), node_name); + if (my_node_it == children.end() || *my_node_it != node_name) throw Poco::Exception("Assertion failed in LeaderElection"); - if (it == children.begin()) + String value = zookeeper.get(path + "/" + children.front()); + +#if !defined(ARCADIA_BUILD) /// C++20; Replicated tables are unused in Arcadia. + if (value.ends_with(suffix)) { ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership); handler(); return; } +#endif + if (my_node_it == children.begin()) + throw Poco::Exception("Assertion failed in LeaderElection"); - if (!zookeeper.existsWatch(path + "/" + *(it - 1), nullptr, task->getWatchCallback())) + /// Watch for the node in front of us. + --my_node_it; + if (!zookeeper.existsWatch(path + "/" + *my_node_it, nullptr, task->getWatchCallback())) task->schedule(); success = true; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index d861173d8a0..f867a39581f 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -298,7 +298,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( if (parts_to_merge.empty()) { if (out_disable_reason) - *out_disable_reason = "There are no need to merge parts according to merge selector algorithm"; + *out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; return false; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 1bc132eaba4..0870c0fdf72 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -4,6 +4,7 @@ #include #include +#include #include @@ -85,8 +86,14 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() int children_count = stat.numChildren; - /// We will wait for 1.1 times more records to accumulate than necessary. - if (static_cast(children_count) < storage_settings->min_replicated_logs_to_keep * 1.1) + /// We will wait for 1.05 to 1.15 times more records to accumulate than necessary. + /// Randomization is needed to spread the time when multiple replicas come here. + /// Numbers are arbitrary. + std::uniform_real_distribution distr(1.05, 1.15); + double ratio = distr(rng); + size_t min_replicated_logs_to_keep = storage_settings->min_replicated_logs_to_keep * ratio; + + if (static_cast(children_count) < min_replicated_logs_to_keep) return; Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat); @@ -214,10 +221,15 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() if (entries.empty()) return; - markLostReplicas(host_versions_lost_replicas, log_pointers_candidate_lost_replicas, replicas.size() - num_replicas_were_marked_is_lost, zookeeper); + markLostReplicas( + host_versions_lost_replicas, + log_pointers_candidate_lost_replicas, + replicas.size() - num_replicas_were_marked_is_lost, + zookeeper); Coordination::Requests ops; - for (size_t i = 0; i < entries.size(); ++i) + size_t i = 0; + for (; i < entries.size(); ++i) { ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1)); @@ -229,12 +241,25 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() /// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list. ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", stat.version)); - zookeeper->multi(ops); + + try + { + zookeeper->multi(ops); + } + catch (const zkutil::KeeperMultiException & e) + { + /// Another replica already deleted the same node concurrently. + if (e.code == Coordination::Error::ZNONODE) + break; + + throw; + } ops.clear(); } } - LOG_DEBUG(log, "Removed {} old log entries: {} - {}", entries.size(), entries.front(), entries.back()); + if (i != 0) + LOG_DEBUG(log, "Removed {} old log entries: {} - {}", i, entries[0], entries[i - 1]); } @@ -250,8 +275,10 @@ void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map String replica = pair.first; Coordination::Requests ops; /// If host changed version we can not mark replicas, because replica started to be active. - ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_lost_replicas.at(replica))); - ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1)); + ops.emplace_back(zkutil::makeCheckRequest( + storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_lost_replicas.at(replica))); + ops.emplace_back(zkutil::makeSetRequest( + storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1)); candidate_lost_replicas.push_back(replica); requests.push_back(ops); } @@ -299,14 +326,17 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() /// Use ZooKeeper's first node (last according to time) timestamp as "current" time. Int64 current_time = timed_blocks.front().ctime; - Int64 time_threshold = std::max(static_cast(0), current_time - static_cast(1000 * storage_settings->replicated_deduplication_window_seconds)); + Int64 time_threshold = std::max( + static_cast(0), + current_time - static_cast(1000 * storage_settings->replicated_deduplication_window_seconds)); /// Virtual node, all nodes that are "greater" than this one will be deleted NodeWithStat block_threshold{{}, time_threshold}; size_t current_deduplication_window = std::min(timed_blocks.size(), storage_settings->replicated_deduplication_window); auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window; - auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime); + auto first_outdated_block_time_threshold = std::upper_bound( + timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime); auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold); zkutil::AsyncResponses try_remove_futures; @@ -326,13 +356,16 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() zookeeper->removeRecursive(path); cached_block_stats.erase(first_outdated_block->node); } - else if (rc != Coordination::Error::ZOK) - LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, Coordination::errorMessage(rc)); - else + else if (rc == Coordination::Error::ZOK || rc == Coordination::Error::ZNONODE) { + /// No node is Ok. Another replica is removing nodes concurrently. /// Successfully removed blocks have to be removed from cache cached_block_stats.erase(first_outdated_block->node); } + else + { + LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, Coordination::errorMessage(rc)); + } first_outdated_block++; } @@ -453,8 +486,20 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations() { /// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list. ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", replicas_stat.version)); - zookeeper->multi(ops); - LOG_DEBUG(log, "Removed {} old mutation entries: {} - {}", (i + 1 - batch_start_i), entries[batch_start_i], entries[i]); + try + { + zookeeper->multi(ops); + } + catch (const zkutil::KeeperMultiException & e) + { + /// Another replica already deleted the same node concurrently. + if (e.code == Coordination::Error::ZNONODE) + break; + + throw; + } + LOG_DEBUG(log, "Removed {} old mutation entries: {} - {}", + i + 1 - batch_start_i, entries[batch_start_i], entries[i]); batch_start_i = i + 1; ops.clear(); } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index a787f99d907..f4191482d64 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -36,7 +37,7 @@ private: String log_name; Poco::Logger * log; BackgroundSchedulePool::TaskHolder task; - pcg64 rng; + pcg64 rng{randomSeed()}; void run(); void iterate(); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 7cb4a149ec5..adb8ad4a69d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -71,6 +71,10 @@ namespace ProfileEvents extern const Event ReplicatedPartFetches; extern const Event DataAfterMergeDiffersFromReplica; extern const Event DataAfterMutationDiffersFromReplica; + extern const Event CreatedLogEntryForMerge; + extern const Event NotCreatedLogEntryForMerge; + extern const Event CreatedLogEntryForMutation; + extern const Event NotCreatedLogEntryForMutation; } namespace CurrentMetrics @@ -2602,10 +2606,12 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c String path_created = dynamic_cast(*responses.front()).path_created; entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); + ProfileEvents::increment(ProfileEvents::CreatedLogEntryForMerge); LOG_TRACE(log, "Created log entry {} for merge {}", path_created, merged_name); } else if (code == Coordination::Error::ZBADVERSION) { + ProfileEvents::increment(ProfileEvents::NotCreatedLogEntryForMerge); LOG_TRACE(log, "Log entry is not created for merge {} because log was updated", merged_name); return CreateMergeEntryResult::LogUpdated; } @@ -2666,12 +2672,14 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c if (code == Coordination::Error::ZBADVERSION) { + ProfileEvents::increment(ProfileEvents::NotCreatedLogEntryForMutation); LOG_TRACE(log, "Log entry is not created for mutation {} because log was updated", new_part_name); return CreateMergeEntryResult::LogUpdated; } zkutil::KeeperMultiException::check(code, ops, responses); + ProfileEvents::increment(ProfileEvents::CreatedLogEntryForMutation); LOG_TRACE(log, "Created log entry for mutation {}", new_part_name); return CreateMergeEntryResult::Ok; } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index aae0b9c81b8..bd0dff035be 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -19,12 +19,12 @@ #include #include #include +#include #include #include #include #include #include -#include #include #include @@ -222,6 +222,7 @@ private: zkutil::EphemeralNodeHolderPtr replica_is_active_node; /** Is this replica "leading". The leader replica selects the parts to merge. + * It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders. */ std::atomic is_leader {false}; zkutil::LeaderElectionPtr leader_election; @@ -497,6 +498,7 @@ private: bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true); /// Choose leader replica, send requst to it and wait. + /// Only makes sense when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders. void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context); /// Throw an exception if the table is readonly. diff --git a/tests/integration/test_quorum_inserts/test.py b/tests/integration/test_quorum_inserts/test.py index 27901842692..f490c13ca27 100644 --- a/tests/integration/test_quorum_inserts/test.py +++ b/tests/integration/test_quorum_inserts/test.py @@ -256,7 +256,7 @@ def test_insert_quorum_with_ttl(started_cluster): "(a Int8, d Date) " \ "Engine = ReplicatedMergeTree('/clickhouse/tables/{table}', '{replica}') " \ "PARTITION BY d ORDER BY a " \ - "TTL d + INTERVAL 5 second " \ + "TTL d + INTERVAL 5 second DELETE WHERE toYear(d) = 2011 " \ "SETTINGS merge_with_ttl_timeout=2 " print("Create Replicated table with two replicas") @@ -284,11 +284,14 @@ def test_insert_quorum_with_ttl(started_cluster): zero.query("INSERT INTO test_insert_quorum_with_ttl(a,d) VALUES(1, '2011-01-01')", settings={'insert_quorum_timeout' : 5000}) - - assert TSV("1\t2011-01-01\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 0})) - assert TSV("1\t2011-01-01\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 1})) - print("Inserts should resume.") zero.query("INSERT INTO test_insert_quorum_with_ttl(a, d) VALUES(2, '2012-02-02')") + first.query("OPTIMIZE TABLE test_insert_quorum_with_ttl") + first.query("SYSTEM SYNC REPLICA test_insert_quorum_with_ttl") + zero.query("SYSTEM SYNC REPLICA test_insert_quorum_with_ttl") + + assert TSV("2\t2012-02-02\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 0})) + assert TSV("2\t2012-02-02\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 1})) + execute_on_all_cluster("DROP TABLE IF EXISTS test_insert_quorum_with_ttl") diff --git a/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql b/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql index 9622a5bd3c2..f488502b13b 100644 --- a/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql +++ b/tests/queries/0_stateless/00620_optimize_on_nonleader_replica_zookeeper.sql @@ -1,3 +1,5 @@ +-- The test is mostly outdated as now every replica is leader and can do OPTIMIZE locally. + DROP TABLE IF EXISTS rename1; DROP TABLE IF EXISTS rename2; DROP TABLE IF EXISTS rename3; @@ -14,7 +16,9 @@ SELECT * FROM rename1; RENAME TABLE rename2 TO rename3; INSERT INTO rename1 VALUES (0, 1, 2); +SYSTEM SYNC REPLICA rename3; -- Make "rename3" to see all data parts. OPTIMIZE TABLE rename3; +SYSTEM SYNC REPLICA rename1; -- Make "rename1" to see and process all scheduled merges. SELECT * FROM rename1; DROP TABLE IF EXISTS rename1; diff --git a/tests/queries/0_stateless/01307_multiple_leaders.sh b/tests/queries/0_stateless/01307_multiple_leaders.sh index e19a10bcecb..a43aa074c43 100755 --- a/tests/queries/0_stateless/01307_multiple_leaders.sh +++ b/tests/queries/0_stateless/01307_multiple_leaders.sh @@ -5,35 +5,28 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) set -e -$CLICKHOUSE_CLIENT -n --query " -DROP TABLE IF EXISTS r0; -DROP TABLE IF EXISTS r1; +NUM_REPLICAS=2 +DATA_SIZE=200 -CREATE TABLE r0 (x UInt64) ENGINE = ReplicatedMergeTree('/test/table', 'r0') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M'; -CREATE TABLE r1 (x UInt64) ENGINE = ReplicatedMergeTree('/test/table', 'r1') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M'; -" +SEQ=$(seq 0 $(($NUM_REPLICAS - 1))) + +for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "DROP TABLE IF EXISTS r$REPLICA"; done +for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE r$REPLICA (x UInt64) ENGINE = ReplicatedMergeTree('/test/table', 'r$REPLICA') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M';"; done function thread() { REPLICA=$1 ITERATIONS=$2 - $CLICKHOUSE_CLIENT --max_block_size 1 --min_insert_block_size_rows 0 --min_insert_block_size_bytes 0 --query "INSERT INTO r$REPLICA SELECT number * 2 + $REPLICA FROM numbers($ITERATIONS)" + $CLICKHOUSE_CLIENT --max_block_size 1 --min_insert_block_size_rows 0 --min_insert_block_size_bytes 0 --query "INSERT INTO r$REPLICA SELECT number * $NUM_REPLICAS + $REPLICA FROM numbers($ITERATIONS)" } - -thread 0 200 & -thread 1 200 & +for REPLICA in $SEQ; do + thread $REPLICA $DATA_SIZE & +done wait -$CLICKHOUSE_CLIENT -n --query " -SYSTEM SYNC REPLICA r0; -SYSTEM SYNC REPLICA r1; - -SELECT count(), sum(x) FROM r0; -SELECT count(), sum(x) FROM r1; - -DROP TABLE r0; -DROP TABLE r1; -" +for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "SYSTEM SYNC REPLICA r$REPLICA"; done +for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "SELECT count(), sum(x) FROM r$REPLICA"; done +for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "DROP TABLE r$REPLICA"; done