Merge pull request #11639 from ClickHouse/remove-leader-election-2

Remove leader election, step 2: allow multiple leaders
This commit is contained in:
alexey-milovidov 2020-06-17 09:25:23 +03:00 committed by GitHub
commit 598fd75caf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 140 additions and 54 deletions

View File

@ -203,6 +203,11 @@
\
M(CannotWriteToWriteBufferDiscard, "Number of stack traces dropped by query profiler or signal handler because pipe is full or cannot write to pipe.") \
M(QueryProfilerSignalOverruns, "Number of times we drop processing of a signal due to overrun plus the number of signals that OS has not delivered due to overrun.") \
\
M(CreatedLogEntryForMerge, "Successfully created log entry to merge parts in ReplicatedMergeTree.") \
M(NotCreatedLogEntryForMerge, "Log entry to merge parts in ReplicatedMergeTree is not created due to concurrent log update by another replica.") \
M(CreatedLogEntryForMutation, "Successfully created log entry to mutate parts in ReplicatedMergeTree.") \
M(NotCreatedLogEntryForMutation, "Log entry to mutate parts in ReplicatedMergeTree is not created due to concurrent log update by another replica.") \
namespace ProfileEvents
{

View File

@ -47,7 +47,8 @@ public:
String current_user;
String current_query_id;
Poco::Net::SocketAddress current_address;
/// Use current user and password when sending query to replica leader
/// This field is only used in foreign "Arcadia" build.
String current_password;
/// When query_kind == INITIAL_QUERY, these values are equal to current.

View File

@ -660,9 +660,13 @@ void Context::setUser(const String & name, const String & password, const Poco::
auto lock = getLock();
client_info.current_user = name;
client_info.current_password = password;
client_info.current_address = address;
#if defined(ARCADIA_BUILD)
/// This is harmful field that is used only in foreign "Arcadia" build.
client_info.current_password = password;
#endif
auto new_user_id = getAccessControlManager().find<User>(name);
std::shared_ptr<const ContextAccess> new_access;
if (new_user_id)

View File

@ -95,7 +95,6 @@ SystemLogs::SystemLogs(Context & global_context, const Poco::Util::AbstractConfi
if (asynchronous_metric_log)
logs.emplace_back(asynchronous_metric_log.get());
try
{
for (auto & log : logs)

View File

@ -1,11 +1,11 @@
#pragma once
#include "ZooKeeper.h"
#include "KeeperException.h"
#include <functional>
#include <memory>
#include <common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Core/BackgroundSchedulePool.h>
@ -23,7 +23,18 @@ namespace CurrentMetrics
namespace zkutil
{
/** Implements leader election algorithm described here: http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection
/** Initially was used to implement leader election algorithm described here:
* http://zookeeper.apache.org/doc/r3.4.5/recipes.html#sc_leaderElection
*
* But then we decided to get rid of leader election, so every replica can become leader.
* For now, every replica can become leader if there is no leader among replicas with old version.
*
* It's tempting to remove this class at all, but we have to maintain it,
* to maintain compatibility when replicas with different versions work on the same cluster
* (this is allowed for short time period during cluster update).
*
* Replicas with new versions creates ephemeral sequential nodes with values like "replica_name (multiple leaders Ok)".
* If the first node belongs to a replica with new version, then all replicas with new versions become leaders.
*/
class LeaderElection
{
@ -42,7 +53,7 @@ public:
ZooKeeper & zookeeper_,
LeadershipHandler handler_,
const std::string & identifier_)
: pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_)
: pool(pool_), path(path_), zookeeper(zookeeper_), handler(handler_), identifier(identifier_ + suffix)
, log_name("LeaderElection (" + path + ")")
, log(&Poco::Logger::get(log_name))
{
@ -65,6 +76,7 @@ public:
}
private:
static inline constexpr auto suffix = " (multiple leaders Ok)";
DB::BackgroundSchedulePool & pool;
DB::BackgroundSchedulePool::TaskHolder task;
std::string path;
@ -106,18 +118,27 @@ private:
{
Strings children = zookeeper.getChildren(path);
std::sort(children.begin(), children.end());
auto it = std::lower_bound(children.begin(), children.end(), node_name);
if (it == children.end() || *it != node_name)
auto my_node_it = std::lower_bound(children.begin(), children.end(), node_name);
if (my_node_it == children.end() || *my_node_it != node_name)
throw Poco::Exception("Assertion failed in LeaderElection");
if (it == children.begin())
String value = zookeeper.get(path + "/" + children.front());
#if !defined(ARCADIA_BUILD) /// C++20; Replicated tables are unused in Arcadia.
if (value.ends_with(suffix))
{
ProfileEvents::increment(ProfileEvents::LeaderElectionAcquiredLeadership);
handler();
return;
}
#endif
if (my_node_it == children.begin())
throw Poco::Exception("Assertion failed in LeaderElection");
if (!zookeeper.existsWatch(path + "/" + *(it - 1), nullptr, task->getWatchCallback()))
/// Watch for the node in front of us.
--my_node_it;
if (!zookeeper.existsWatch(path + "/" + *my_node_it, nullptr, task->getWatchCallback()))
task->schedule();
success = true;

View File

@ -298,7 +298,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
if (parts_to_merge.empty())
{
if (out_disable_reason)
*out_disable_reason = "There are no need to merge parts according to merge selector algorithm";
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
return false;
}

View File

@ -4,6 +4,7 @@
#include <Interpreters/Context.h>
#include <random>
#include <pcg_random.hpp>
#include <unordered_set>
@ -85,8 +86,14 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
int children_count = stat.numChildren;
/// We will wait for 1.1 times more records to accumulate than necessary.
if (static_cast<double>(children_count) < storage_settings->min_replicated_logs_to_keep * 1.1)
/// We will wait for 1.05 to 1.15 times more records to accumulate than necessary.
/// Randomization is needed to spread the time when multiple replicas come here.
/// Numbers are arbitrary.
std::uniform_real_distribution<double> distr(1.05, 1.15);
double ratio = distr(rng);
size_t min_replicated_logs_to_keep = storage_settings->min_replicated_logs_to_keep * ratio;
if (static_cast<double>(children_count) < min_replicated_logs_to_keep)
return;
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat);
@ -214,10 +221,15 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
if (entries.empty())
return;
markLostReplicas(host_versions_lost_replicas, log_pointers_candidate_lost_replicas, replicas.size() - num_replicas_were_marked_is_lost, zookeeper);
markLostReplicas(
host_versions_lost_replicas,
log_pointers_candidate_lost_replicas,
replicas.size() - num_replicas_were_marked_is_lost,
zookeeper);
Coordination::Requests ops;
for (size_t i = 0; i < entries.size(); ++i)
size_t i = 0;
for (; i < entries.size(); ++i)
{
ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1));
@ -229,12 +241,25 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
/// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", stat.version));
zookeeper->multi(ops);
try
{
zookeeper->multi(ops);
}
catch (const zkutil::KeeperMultiException & e)
{
/// Another replica already deleted the same node concurrently.
if (e.code == Coordination::Error::ZNONODE)
break;
throw;
}
ops.clear();
}
}
LOG_DEBUG(log, "Removed {} old log entries: {} - {}", entries.size(), entries.front(), entries.back());
if (i != 0)
LOG_DEBUG(log, "Removed {} old log entries: {} - {}", i, entries[0], entries[i - 1]);
}
@ -250,8 +275,10 @@ void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map
String replica = pair.first;
Coordination::Requests ops;
/// If host changed version we can not mark replicas, because replica started to be active.
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_lost_replicas.at(replica)));
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1));
ops.emplace_back(zkutil::makeCheckRequest(
storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_lost_replicas.at(replica)));
ops.emplace_back(zkutil::makeSetRequest(
storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1));
candidate_lost_replicas.push_back(replica);
requests.push_back(ops);
}
@ -299,14 +326,17 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64 current_time = timed_blocks.front().ctime;
Int64 time_threshold = std::max(static_cast<Int64>(0), current_time - static_cast<Int64>(1000 * storage_settings->replicated_deduplication_window_seconds));
Int64 time_threshold = std::max(
static_cast<Int64>(0),
current_time - static_cast<Int64>(1000 * storage_settings->replicated_deduplication_window_seconds));
/// Virtual node, all nodes that are "greater" than this one will be deleted
NodeWithStat block_threshold{{}, time_threshold};
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage_settings->replicated_deduplication_window);
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block_time_threshold = std::upper_bound(
timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
zkutil::AsyncResponses<Coordination::RemoveResponse> try_remove_futures;
@ -326,13 +356,16 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
zookeeper->removeRecursive(path);
cached_block_stats.erase(first_outdated_block->node);
}
else if (rc != Coordination::Error::ZOK)
LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, Coordination::errorMessage(rc));
else
else if (rc == Coordination::Error::ZOK || rc == Coordination::Error::ZNONODE)
{
/// No node is Ok. Another replica is removing nodes concurrently.
/// Successfully removed blocks have to be removed from cache
cached_block_stats.erase(first_outdated_block->node);
}
else
{
LOG_WARNING(log, "Error while deleting ZooKeeper path `{}`: {}, ignoring.", path, Coordination::errorMessage(rc));
}
first_outdated_block++;
}
@ -453,8 +486,20 @@ void ReplicatedMergeTreeCleanupThread::clearOldMutations()
{
/// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", replicas_stat.version));
zookeeper->multi(ops);
LOG_DEBUG(log, "Removed {} old mutation entries: {} - {}", (i + 1 - batch_start_i), entries[batch_start_i], entries[i]);
try
{
zookeeper->multi(ops);
}
catch (const zkutil::KeeperMultiException & e)
{
/// Another replica already deleted the same node concurrently.
if (e.code == Coordination::Error::ZNONODE)
break;
throw;
}
LOG_DEBUG(log, "Removed {} old mutation entries: {} - {}",
i + 1 - batch_start_i, entries[batch_start_i], entries[i]);
batch_start_i = i + 1;
ops.clear();
}

View File

@ -4,6 +4,7 @@
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <common/logger_useful.h>
#include <Common/randomSeed.h>
#include <Core/BackgroundSchedulePool.h>
#include <thread>
@ -36,7 +37,7 @@ private:
String log_name;
Poco::Logger * log;
BackgroundSchedulePool::TaskHolder task;
pcg64 rng;
pcg64 rng{randomSeed()};
void run();
void iterate();

View File

@ -71,6 +71,10 @@ namespace ProfileEvents
extern const Event ReplicatedPartFetches;
extern const Event DataAfterMergeDiffersFromReplica;
extern const Event DataAfterMutationDiffersFromReplica;
extern const Event CreatedLogEntryForMerge;
extern const Event NotCreatedLogEntryForMerge;
extern const Event CreatedLogEntryForMutation;
extern const Event NotCreatedLogEntryForMutation;
}
namespace CurrentMetrics
@ -2602,10 +2606,12 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*responses.front()).path_created;
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
ProfileEvents::increment(ProfileEvents::CreatedLogEntryForMerge);
LOG_TRACE(log, "Created log entry {} for merge {}", path_created, merged_name);
}
else if (code == Coordination::Error::ZBADVERSION)
{
ProfileEvents::increment(ProfileEvents::NotCreatedLogEntryForMerge);
LOG_TRACE(log, "Log entry is not created for merge {} because log was updated", merged_name);
return CreateMergeEntryResult::LogUpdated;
}
@ -2666,12 +2672,14 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
if (code == Coordination::Error::ZBADVERSION)
{
ProfileEvents::increment(ProfileEvents::NotCreatedLogEntryForMutation);
LOG_TRACE(log, "Log entry is not created for mutation {} because log was updated", new_part_name);
return CreateMergeEntryResult::LogUpdated;
}
zkutil::KeeperMultiException::check(code, ops, responses);
ProfileEvents::increment(ProfileEvents::CreatedLogEntryForMutation);
LOG_TRACE(log, "Created log entry for mutation {}", new_part_name);
return CreateMergeEntryResult::Ok;
}

View File

@ -19,12 +19,12 @@
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Storages/MergeTree/DataPartsExchange.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Storages/MergeTree/LeaderElection.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/PartLog.h>
#include <Common/randomSeed.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/LeaderElection.h>
#include <Core/BackgroundSchedulePool.h>
#include <Processors/Pipe.h>
@ -222,6 +222,7 @@ private:
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
/** Is this replica "leading". The leader replica selects the parts to merge.
* It can be false only when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders.
*/
std::atomic<bool> is_leader {false};
zkutil::LeaderElectionPtr leader_election;
@ -497,6 +498,7 @@ private:
bool waitForReplicaToProcessLogEntry(const String & replica_name, const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
/// Choose leader replica, send requst to it and wait.
/// Only makes sense when old ClickHouse versions are working on the same cluster, because now we allow multiple leaders.
void sendRequestToLeaderReplica(const ASTPtr & query, const Context & query_context);
/// Throw an exception if the table is readonly.

View File

@ -256,7 +256,7 @@ def test_insert_quorum_with_ttl(started_cluster):
"(a Int8, d Date) " \
"Engine = ReplicatedMergeTree('/clickhouse/tables/{table}', '{replica}') " \
"PARTITION BY d ORDER BY a " \
"TTL d + INTERVAL 5 second " \
"TTL d + INTERVAL 5 second DELETE WHERE toYear(d) = 2011 " \
"SETTINGS merge_with_ttl_timeout=2 "
print("Create Replicated table with two replicas")
@ -284,11 +284,14 @@ def test_insert_quorum_with_ttl(started_cluster):
zero.query("INSERT INTO test_insert_quorum_with_ttl(a,d) VALUES(1, '2011-01-01')",
settings={'insert_quorum_timeout' : 5000})
assert TSV("1\t2011-01-01\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 0}))
assert TSV("1\t2011-01-01\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 1}))
print("Inserts should resume.")
zero.query("INSERT INTO test_insert_quorum_with_ttl(a, d) VALUES(2, '2012-02-02')")
first.query("OPTIMIZE TABLE test_insert_quorum_with_ttl")
first.query("SYSTEM SYNC REPLICA test_insert_quorum_with_ttl")
zero.query("SYSTEM SYNC REPLICA test_insert_quorum_with_ttl")
assert TSV("2\t2012-02-02\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 0}))
assert TSV("2\t2012-02-02\n") == TSV(first.query("SELECT * FROM test_insert_quorum_with_ttl", settings={'select_sequential_consistency' : 1}))
execute_on_all_cluster("DROP TABLE IF EXISTS test_insert_quorum_with_ttl")

View File

@ -1,3 +1,5 @@
-- The test is mostly outdated as now every replica is leader and can do OPTIMIZE locally.
DROP TABLE IF EXISTS rename1;
DROP TABLE IF EXISTS rename2;
DROP TABLE IF EXISTS rename3;
@ -14,7 +16,9 @@ SELECT * FROM rename1;
RENAME TABLE rename2 TO rename3;
INSERT INTO rename1 VALUES (0, 1, 2);
SYSTEM SYNC REPLICA rename3; -- Make "rename3" to see all data parts.
OPTIMIZE TABLE rename3;
SYSTEM SYNC REPLICA rename1; -- Make "rename1" to see and process all scheduled merges.
SELECT * FROM rename1;
DROP TABLE IF EXISTS rename1;

View File

@ -5,35 +5,28 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
set -e
$CLICKHOUSE_CLIENT -n --query "
DROP TABLE IF EXISTS r0;
DROP TABLE IF EXISTS r1;
NUM_REPLICAS=2
DATA_SIZE=200
CREATE TABLE r0 (x UInt64) ENGINE = ReplicatedMergeTree('/test/table', 'r0') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M';
CREATE TABLE r1 (x UInt64) ENGINE = ReplicatedMergeTree('/test/table', 'r1') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M';
"
SEQ=$(seq 0 $(($NUM_REPLICAS - 1)))
for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "DROP TABLE IF EXISTS r$REPLICA"; done
for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "CREATE TABLE r$REPLICA (x UInt64) ENGINE = ReplicatedMergeTree('/test/table', 'r$REPLICA') ORDER BY x SETTINGS min_bytes_for_wide_part = '10M';"; done
function thread()
{
REPLICA=$1
ITERATIONS=$2
$CLICKHOUSE_CLIENT --max_block_size 1 --min_insert_block_size_rows 0 --min_insert_block_size_bytes 0 --query "INSERT INTO r$REPLICA SELECT number * 2 + $REPLICA FROM numbers($ITERATIONS)"
$CLICKHOUSE_CLIENT --max_block_size 1 --min_insert_block_size_rows 0 --min_insert_block_size_bytes 0 --query "INSERT INTO r$REPLICA SELECT number * $NUM_REPLICAS + $REPLICA FROM numbers($ITERATIONS)"
}
thread 0 200 &
thread 1 200 &
for REPLICA in $SEQ; do
thread $REPLICA $DATA_SIZE &
done
wait
$CLICKHOUSE_CLIENT -n --query "
SYSTEM SYNC REPLICA r0;
SYSTEM SYNC REPLICA r1;
SELECT count(), sum(x) FROM r0;
SELECT count(), sum(x) FROM r1;
DROP TABLE r0;
DROP TABLE r1;
"
for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "SYSTEM SYNC REPLICA r$REPLICA"; done
for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "SELECT count(), sum(x) FROM r$REPLICA"; done
for REPLICA in $SEQ; do $CLICKHOUSE_CLIENT -n --query "DROP TABLE r$REPLICA"; done