Merge pull request #59501 from ClickHouse/pr-better-replicas-failover-2

Parallel replicas: better initial replicas failover (2)
This commit is contained in:
Igor Nikonov 2024-02-09 15:50:56 +01:00 committed by GitHub
commit effaaceb26
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 148 additions and 75 deletions

View File

@ -19,7 +19,6 @@
#include <Storages/MergeTree/RequestResponse.h>
#include <atomic>
#include <optional>
#include "config.h"

View File

@ -27,6 +27,9 @@ class IConnectionPool : private boost::noncopyable
public:
using Entry = PoolBase<Connection>::Entry;
IConnectionPool() = default;
IConnectionPool(String host_, UInt16 port_) : host(host_), port(port_), address(host + ":" + toString(port_)) {}
virtual ~IConnectionPool() = default;
/// Selects the connection to work.
@ -36,7 +39,15 @@ public:
const Settings & settings,
bool force_connected = true) = 0;
const std::string & getHost() const { return host; }
UInt16 getPort() const { return port; }
const String & getAddress() const { return address; }
virtual Priority getPriority() const { return Priority{1}; }
protected:
const String host;
const UInt16 port = 0;
const String address;
};
using ConnectionPoolPtr = std::shared_ptr<IConnectionPool>;
@ -63,10 +74,9 @@ public:
Protocol::Compression compression_,
Protocol::Secure secure_,
Priority priority_ = Priority{1})
: Base(max_connections_,
: IConnectionPool(host_, port_),
Base(max_connections_,
getLogger("ConnectionPool (" + host_ + ":" + toString(port_) + ")")),
host(host_),
port(port_),
default_database(default_database_),
user(user_),
password(password_),
@ -99,10 +109,6 @@ public:
return entry;
}
const std::string & getHost() const
{
return host;
}
std::string getDescription() const
{
return host + ":" + toString(port);
@ -125,8 +131,6 @@ protected:
}
private:
String host;
UInt16 port;
String default_database;
String user;
String password;

View File

@ -1,7 +1,5 @@
#pragma once
#include <compare>
#include <Client/Connection.h>
#include <Storages/MergeTree/RequestResponse.h>

View File

@ -593,6 +593,7 @@
M(711, FILECACHE_ACCESS_DENIED) \
M(712, TOO_MANY_MATERIALIZED_VIEWS) \
M(713, BROKEN_PROJECTION) \
M(714, UNEXPECTED_CLUSTER) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -629,6 +629,8 @@ The server successfully detected this situation and will download merged part fr
M(InterfacePostgreSQLReceiveBytes, "Number of bytes received through PostgreSQL interfaces") \
\
M(ParallelReplicasUsedCount, "Number of replicas used to execute a query with task-based parallel replicas") \
M(ParallelReplicasAvailableCount, "Number of replicas available to execute a query with task-based parallel replicas") \
M(ParallelReplicasUnavailableCount, "Number of replicas which was chosen, but found to be unavailable during query execution with task-based parallel replicas") \
#ifdef APPLY_FOR_EXTERNAL_EVENTS
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)

View File

@ -32,6 +32,7 @@ namespace ErrorCodes
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int LOGICAL_ERROR;
extern const int CLUSTER_DOESNT_EXIST;
extern const int UNEXPECTED_CLUSTER;
}
namespace ClusterProxy
@ -374,12 +375,12 @@ void executeQueryWithParallelReplicas(
shard_num = column->getUInt(0);
}
ClusterPtr new_cluster;
const auto shard_count = not_optimized_cluster->getShardCount();
ClusterPtr new_cluster = not_optimized_cluster;
/// if got valid shard_num from query initiator, then parallel replicas scope is the specified shard
/// shards are numbered in order of appearance in the cluster config
if (shard_num > 0)
{
const auto shard_count = not_optimized_cluster->getShardCount();
if (shard_num > shard_count)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
@ -395,17 +396,18 @@ void executeQueryWithParallelReplicas(
// get cluster for shard specified by shard_num
// shard_num is 1-based, but getClusterWithSingleShard expects 0-based index
auto single_shard_cluster = not_optimized_cluster->getClusterWithSingleShard(shard_num - 1);
// convert cluster to representation expected by parallel replicas
new_cluster = single_shard_cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas);
new_cluster = not_optimized_cluster->getClusterWithSingleShard(shard_num - 1);
}
else
{
new_cluster = not_optimized_cluster->getClusterWithReplicasAsShards(settings, settings.max_parallel_replicas);
if (not_optimized_cluster->getShardCount() > 1)
throw DB::Exception(
ErrorCodes::UNEXPECTED_CLUSTER,
"`cluster_for_parallel_replicas` setting refers to cluster with several shards. Expected a cluster with one shard");
}
auto coordinator
= std::make_shared<ParallelReplicasReadingCoordinator>(new_cluster->getShardCount(), settings.parallel_replicas_mark_segment_size);
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>(
new_cluster->getShardsInfo().begin()->getAllNodeCount(), settings.parallel_replicas_mark_segment_size);
auto external_tables = new_context->getExternalTables();
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
query_ast,

View File

@ -12,7 +12,7 @@
#include <Processors/Sources/DelayedSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Interpreters/ActionsDAG.h>
#include "Common/logger_useful.h"
#include <Common/logger_useful.h>
#include <Common/checkStackSize.h>
#include <Core/QueryProcessingStage.h>
#include <Client/ConnectionPool.h>
@ -375,10 +375,11 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
, storage_limits(std::move(storage_limits_))
, log(log_)
{
std::vector<String> description;
chassert(cluster->getShardCount() == 1);
for (const auto & address : cluster->getShardsAddresses())
description.push_back(fmt::format("Replica: {}", address[0].host_name));
std::vector<String> description;
for (const auto & pool : cluster->getShardsInfo().front().per_replica_pools)
description.push_back(fmt::format("Replica: {}", pool->getHost()));
setStepDescription(boost::algorithm::join(description, ", "));
}
@ -399,51 +400,44 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
const auto & shard = cluster->getShardsInfo().at(0);
size_t all_replicas_count = current_settings.max_parallel_replicas;
if (all_replicas_count > cluster->getShardsInfo().size())
if (all_replicas_count > shard.getAllNodeCount())
{
LOG_INFO(getLogger("ReadFromParallelRemoteReplicasStep"),
"The number of replicas requested ({}) is bigger than the real number available in the cluster ({}). "\
"Will use the latter number to execute the query.", current_settings.max_parallel_replicas, cluster->getShardsInfo().size());
all_replicas_count = cluster->getShardsInfo().size();
LOG_INFO(
getLogger("ReadFromParallelRemoteReplicasStep"),
"The number of replicas requested ({}) is bigger than the real number available in the cluster ({}). "
"Will use the latter number to execute the query.",
current_settings.max_parallel_replicas,
shard.getAllNodeCount());
all_replicas_count = shard.getAllNodeCount();
}
/// Find local shard. It might happen that there is no local shard, but that's fine
for (const auto & shard: cluster->getShardsInfo())
{
if (shard.isLocal())
{
IConnections::ReplicaInfo replica_info
{
.all_replicas_count = all_replicas_count,
/// `shard_num` will be equal to the number of the given replica in the cluster (set by `Cluster::getClusterWithReplicasAsShards`).
/// we should use this number specifically because efficiency of data distribution by consistent hash depends on it.
.number_of_current_replica = shard.shard_num - 1,
};
addPipeForSingeReplica(pipes, shard.pool, replica_info);
}
std::vector<ConnectionPoolWithFailover::Base::ShuffledPool> shuffled_pool;
if (all_replicas_count < shard.getAllNodeCount())
{
shuffled_pool = shard.pool->getShuffledPools(current_settings);
shuffled_pool.resize(all_replicas_count);
}
else
{
/// try to preserve replicas order if all replicas in cluster are used for query execution
/// it's important for data locality during query execution
auto priority_func = [](size_t i) { return Priority{static_cast<Int64>(i)}; };
shuffled_pool = shard.pool->getShuffledPools(current_settings, priority_func);
}
auto current_shard = cluster->getShardsInfo().begin();
while (pipes.size() != all_replicas_count)
for (size_t i=0; i < all_replicas_count; ++i)
{
if (current_shard->isLocal())
{
++current_shard;
continue;
}
IConnections::ReplicaInfo replica_info
{
.all_replicas_count = all_replicas_count,
/// `shard_num` will be equal to the number of the given replica in the cluster (set by `Cluster::getClusterWithReplicasAsShards`).
/// we should use this number specifically because efficiency of data distribution by consistent hash depends on it.
.number_of_current_replica = current_shard->shard_num - 1,
.number_of_current_replica = i,
};
addPipeForSingeReplica(pipes, current_shard->pool, replica_info);
++current_shard;
addPipeForSingeReplica(pipes, shuffled_pool[i].pool, replica_info);
}
auto pipe = Pipe::unitePipes(std::move(pipes));
@ -456,7 +450,8 @@ void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder
}
void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica(Pipes & pipes, std::shared_ptr<ConnectionPoolWithFailover> pool, IConnections::ReplicaInfo replica_info)
void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica(
Pipes & pipes, const ConnectionPoolPtr & pool, IConnections::ReplicaInfo replica_info)
{
bool add_agg_info = stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
@ -476,7 +471,14 @@ void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica(Pipes & pipes, s
assert(output_stream);
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
pool, query_string, output_stream->header, context, throttler, scalars, external_tables, stage,
pool,
query_string,
output_stream->header,
context,
throttler,
scalars,
external_tables,
stage,
RemoteQueryExecutor::Extension{.parallel_reading_coordinator = coordinator, .replica_info = std::move(replica_info)});
remote_query_executor->setLogger(log);

View File

@ -9,10 +9,6 @@
namespace DB
{
class ConnectionPoolWithFailover;
using ConnectionPoolWithFailoverPtr = std::shared_ptr<ConnectionPoolWithFailover>;
class Throttler;
using ThrottlerPtr = std::shared_ptr<Throttler>;
@ -91,8 +87,7 @@ public:
void enforceAggregationInOrder();
private:
void addPipeForSingeReplica(Pipes & pipes, std::shared_ptr<ConnectionPoolWithFailover> pool, IConnections::ReplicaInfo replica_info);
void addPipeForSingeReplica(Pipes & pipes, const ConnectionPoolPtr & pool, IConnections::ReplicaInfo replica_info);
ClusterPtr cluster;
ASTPtr query_ast;

View File

@ -4,7 +4,7 @@
#include <Columns/ColumnConst.h>
#include <Common/CurrentThread.h>
#include "Core/Protocol.h"
#include <Core/Protocol.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
@ -17,6 +17,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <IO/ConnectionTimeouts.h>
#include <Client/ConnectionEstablisher.h>
#include <Client/MultiplexedConnections.h>
#include <Client/HedgedConnections.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
@ -29,6 +30,7 @@ namespace ProfileEvents
extern const Event SuspendSendingQueryToShard;
extern const Event ReadTaskRequestsReceived;
extern const Event MergeTreeReadTaskRequestsReceived;
extern const Event ParallelReplicasAvailableCount;
}
namespace DB
@ -62,6 +64,55 @@ RemoteQueryExecutor::RemoteQueryExecutor(
{
}
RemoteQueryExecutor::RemoteQueryExecutor(
ConnectionPoolPtr pool,
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
{
create_connections = [this, pool, throttler, extension_](AsyncCallback)
{
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
ConnectionPoolWithFailover::TryResult result;
std::string fail_message;
if (main_table)
{
auto table_name = main_table.getQualifiedName();
ConnectionEstablisher connection_establisher(pool, &timeouts, current_settings, log, &table_name);
connection_establisher.run(result, fail_message);
}
else
{
ConnectionEstablisher connection_establisher(pool, &timeouts, current_settings, log, nullptr);
connection_establisher.run(result, fail_message);
}
std::vector<IConnectionPool::Entry> connection_entries;
if (!result.entry.isNull() && result.is_usable)
{
if (extension_ && extension_->parallel_reading_coordinator)
ProfileEvents::increment(ProfileEvents::ParallelReplicasAvailableCount);
connection_entries.emplace_back(std::move(result.entry));
}
auto res = std::make_unique<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
};
}
RemoteQueryExecutor::RemoteQueryExecutor(
Connection & connection,
const String & query_,

View File

@ -52,6 +52,18 @@ public:
std::optional<IConnections::ReplicaInfo> replica_info = {};
};
/// Takes a connection pool for a node (not cluster)
RemoteQueryExecutor(
ConnectionPoolPtr pool,
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler = nullptr,
const Scalars & scalars_ = Scalars(),
const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
std::optional<Extension> extension_ = std::nullopt);
/// Takes already set connection.
RemoteQueryExecutor(
Connection & connection,

View File

@ -97,11 +97,9 @@ extern const Event ParallelReplicasCollectingOwnedSegmentsMicroseconds;
extern const Event ParallelReplicasReadAssignedMarks;
extern const Event ParallelReplicasReadUnassignedMarks;
extern const Event ParallelReplicasReadAssignedForStealingMarks;
}
namespace ProfileEvents
{
extern const Event ParallelReplicasUsedCount;
extern const Event ParallelReplicasUsedCount;
extern const Event ParallelReplicasUnavailableCount;
}
namespace DB
@ -1025,6 +1023,8 @@ ParallelReadResponse ParallelReplicasReadingCoordinator::handleRequest(ParallelR
void ParallelReplicasReadingCoordinator::markReplicaAsUnavailable(size_t replica_number)
{
ProfileEvents::increment(ProfileEvents::ParallelReplicasUnavailableCount);
std::lock_guard lock(mutex);
if (!pimpl)

View File

@ -2,14 +2,13 @@ DROP TABLE IF EXISTS test_parallel_replicas_unavailable_shards;
CREATE TABLE test_parallel_replicas_unavailable_shards (n UInt64) ENGINE=MergeTree() ORDER BY tuple();
INSERT INTO test_parallel_replicas_unavailable_shards SELECT * FROM numbers(10);
SYSTEM FLUSH LOGS;
SET allow_experimental_parallel_reading_from_replicas=2, max_parallel_replicas=11, cluster_for_parallel_replicas='parallel_replicas', parallel_replicas_for_non_replicated_merge_tree=1;
SET send_logs_level='error';
SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*);
SELECT count() FROM test_parallel_replicas_unavailable_shards WHERE NOT ignore(*) SETTINGS log_comment = '02769_7b513191-5082-4073-8568-53b86a49da79';
SYSTEM FLUSH LOGS;
SELECT count() > 0 FROM system.text_log WHERE yesterday() <= event_date AND message LIKE '%Replica number 10 is unavailable%';
SET allow_experimental_parallel_reading_from_replicas=0;
SELECT ProfileEvents['ParallelReplicasUnavailableCount'] FROM system.query_log WHERE yesterday() <= event_date AND query_id in (select query_id from system.query_log where log_comment = '02769_7b513191-5082-4073-8568-53b86a49da79' and current_database = currentDatabase()) and type = 'QueryFinish' and query_id == initial_query_id;
DROP TABLE test_parallel_replicas_unavailable_shards;

View File

@ -12,16 +12,16 @@ SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.
WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3;
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
-- Testing that it is disabled for allow_experimental_analyzer=0. With analyzer it will be supported (with correct result)
WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED }
SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED }
-- Sanitizer
SELECT count() FROM pr_2 JOIN numbers(10) as pr_1 ON pr_2.a = pr_1.number
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3;
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_one_shard_three_replicas_localhost', max_parallel_replicas = 3;
DROP TABLE IF EXISTS pr_1;
DROP TABLE IF EXISTS pr_2;

View File

@ -0,0 +1,8 @@
DROP TABLE IF EXISTS test_unexpected_cluster;
CREATE TABLE test_unexpected_cluster (n UInt64) ENGINE=MergeTree() ORDER BY tuple();
INSERT INTO test_unexpected_cluster SELECT * FROM numbers(10);
SET allow_experimental_parallel_reading_from_replicas=2, max_parallel_replicas=2, cluster_for_parallel_replicas='test_cluster_two_shards', parallel_replicas_for_non_replicated_merge_tree=1;
SELECT count() FROM test_unexpected_cluster WHERE NOT ignore(*); -- { serverError UNEXPECTED_CLUSTER }
DROP TABLE test_unexpected_cluster;