Support for simultaneous read from local and remote parallel replica (#37204)

This commit is contained in:
Nikita Mikhaylov 2022-06-02 11:46:33 +02:00 committed by GitHub
parent 2b2232c264
commit d34e051c69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 468 additions and 176 deletions

View File

@ -418,6 +418,8 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
if (address.is_local)
info.local_addresses.push_back(address);
info.all_addresses.push_back(address);
auto pool = ConnectionPoolFactory::instance().get(
settings.distributed_connections_pool_size,
address.host_name, address.port,
@ -485,6 +487,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
}
Addresses shard_local_addresses;
Addresses shard_all_addresses;
ConnectionPoolPtrs all_replicas_pools;
all_replicas_pools.reserve(replica_addresses.size());
@ -502,6 +505,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
all_replicas_pools.emplace_back(replica_pool);
if (replica.is_local)
shard_local_addresses.push_back(replica);
shard_all_addresses.push_back(replica);
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
@ -516,6 +520,7 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
current_shard_num,
weight,
std::move(shard_local_addresses),
std::move(shard_all_addresses),
std::move(shard_pool),
std::move(all_replicas_pools),
internal_replication
@ -571,6 +576,7 @@ Cluster::Cluster(
addresses_with_failover.emplace_back(current);
Addresses shard_local_addresses;
Addresses all_addresses;
ConnectionPoolPtrs all_replicas;
all_replicas.reserve(current.size());
@ -585,6 +591,7 @@ Cluster::Cluster(
all_replicas.emplace_back(replica_pool);
if (replica.is_local && !treat_local_as_remote)
shard_local_addresses.push_back(replica);
all_addresses.push_back(replica);
}
ConnectionPoolWithFailoverPtr shard_pool = std::make_shared<ConnectionPoolWithFailover>(
@ -597,6 +604,7 @@ Cluster::Cluster(
current_shard_num,
default_weight,
std::move(shard_local_addresses),
std::move(all_addresses),
std::move(shard_pool),
std::move(all_replicas),
false // has_internal_replication
@ -680,6 +688,8 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
if (address.is_local)
info.local_addresses.push_back(address);
info.all_addresses.push_back(address);
auto pool = ConnectionPoolFactory::instance().get(
settings.distributed_connections_pool_size,
address.host_name,

View File

@ -202,6 +202,7 @@ public:
UInt32 shard_num = 0;
UInt32 weight = 1;
Addresses local_addresses;
Addresses all_addresses;
/// nullptr if there are no remote addresses
ConnectionPoolWithFailoverPtr pool;
/// Connection pool for each replica, contains nullptr for local replicas

View File

@ -1,65 +0,0 @@
#pragma once
#include <Client/ConnectionPool.h>
#include <Interpreters/Cluster.h>
#include <Parsers/IAST.h>
namespace DB
{
struct Settings;
class Cluster;
class Throttler;
struct SelectQueryInfo;
class Pipe;
using Pipes = std::vector<Pipe>;
class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>;
struct StorageID;
namespace ClusterProxy
{
/// Base class for the implementation of the details of distributed query
/// execution that are specific to the query type.
class IStreamFactory
{
public:
virtual ~IStreamFactory() = default;
struct Shard
{
/// Query and header may be changed depending on shard.
ASTPtr query;
Block header;
size_t shard_num = 0;
size_t num_replicas = 0;
ConnectionPoolWithFailoverPtr pool;
ConnectionPoolPtrs per_replica_pools;
/// If we connect to replicas lazily.
/// (When there is a local replica with big delay).
bool lazy = false;
UInt32 local_delay = 0;
};
using Shards = std::vector<Shard>;
virtual void createForShard(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count) = 0;
};
}
}

View File

@ -1,4 +1,5 @@
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/VirtualColumnUtils.h>
@ -10,14 +11,15 @@
#include <Interpreters/RequiredSourceColumnsVisitor.h>
#include <DataTypes/ObjectUtils.h>
#include <Client/IConnections.h>
#include <Common/logger_useful.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromRemote.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/DistributedCreateLocalPlan.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
namespace ProfileEvents
{
extern const Event DistributedConnectionMissingTable;
@ -63,7 +65,8 @@ void SelectStreamFactory::createForShard(
auto emplace_local_stream = [&]()
{
local_plans.emplace_back(createLocalPlan(query_ast, header, context, processed_stage, shard_info.shard_num, shard_count, /*coordinator=*/nullptr));
local_plans.emplace_back(createLocalPlan(
query_ast, header, context, processed_stage, shard_info.shard_num, shard_count, /*replica_num=*/0, /*replica_count=*/0, /*coordinator=*/nullptr));
};
auto emplace_remote_stream = [&](bool lazy = false, UInt32 local_delay = 0)
@ -71,10 +74,7 @@ void SelectStreamFactory::createForShard(
remote_shards.emplace_back(Shard{
.query = query_ast,
.header = header,
.shard_num = shard_info.shard_num,
.num_replicas = shard_info.getAllNodeCount(),
.pool = shard_info.pool,
.per_replica_pools = shard_info.per_replica_pools,
.shard_info = shard_info,
.lazy = lazy,
.local_delay = local_delay,
});
@ -173,5 +173,97 @@ void SelectStreamFactory::createForShard(
emplace_remote_stream();
}
SelectStreamFactory::ShardPlans SelectStreamFactory::createForShardWithParallelReplicas(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const StorageID & main_table,
const ASTPtr & table_function_ptr,
const ThrottlerPtr & throttler,
ContextPtr context,
UInt32 shard_count)
{
SelectStreamFactory::ShardPlans result;
if (auto it = objects_by_shard.find(shard_info.shard_num); it != objects_by_shard.end())
replaceMissedSubcolumnsByConstants(storage_snapshot->object_columns, it->second, query_ast);
const auto & settings = context->getSettingsRef();
auto is_local_replica_obsolete = [&]()
{
auto resolved_id = context->resolveStorageID(main_table);
auto main_table_storage = DatabaseCatalog::instance().tryGetTable(resolved_id, context);
const auto * replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get());
if (!replicated_storage)
return false;
UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;
if (!max_allowed_delay)
return false;
UInt32 local_delay = replicated_storage->getAbsoluteDelay();
return local_delay >= max_allowed_delay;
};
size_t next_replica_number = 0;
size_t all_replicas_count = shard_info.getRemoteNodeCount();
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>();
auto remote_plan = std::make_unique<QueryPlan>();
if (settings.prefer_localhost_replica && shard_info.isLocal())
{
/// We don't need more than one local replica in parallel reading
if (!is_local_replica_obsolete())
{
++all_replicas_count;
result.local_plan = createLocalPlan(
query_ast, header, context, processed_stage, shard_info.shard_num, shard_count, next_replica_number, all_replicas_count, coordinator);
++next_replica_number;
}
}
Scalars scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
scalars.emplace(
"_shard_count", Block{{DataTypeUInt32().createColumnConst(1, shard_count), std::make_shared<DataTypeUInt32>(), "_shard_count"}});
auto external_tables = context->getExternalTables();
auto shard = Shard{
.query = query_ast,
.header = header,
.shard_info = shard_info,
.lazy = false,
.local_delay = 0,
};
if (shard_info.hasRemoteConnections())
{
auto read_from_remote = std::make_unique<ReadFromParallelRemoteReplicasStep>(
coordinator,
shard,
header,
processed_stage,
main_table,
table_function_ptr,
context,
throttler,
std::move(scalars),
std::move(external_tables),
&Poco::Logger::get("ReadFromParallelRemoteReplicasStep"),
shard_count);
remote_plan->addStep(std::move(read_from_remote));
result.remote_plan = std::move(remote_plan);
}
return result;
}
}
}

View File

@ -1,22 +1,56 @@
#pragma once
#include <Core/QueryProcessingStage.h>
#include <Interpreters/ClusterProxy/IStreamFactory.h>
#include <Interpreters/StorageID.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/StorageSnapshot.h>
#include <Client/ConnectionPool.h>
#include <Interpreters/Cluster.h>
#include <Parsers/IAST.h>
namespace DB
{
struct Settings;
class Cluster;
class Throttler;
struct SelectQueryInfo;
class Pipe;
using Pipes = std::vector<Pipe>;
class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>;
struct StorageID;
namespace ClusterProxy
{
using ColumnsDescriptionByShardNum = std::unordered_map<UInt32, ColumnsDescription>;
class SelectStreamFactory final : public IStreamFactory
class SelectStreamFactory
{
public:
struct Shard
{
/// Query and header may be changed depending on shard.
ASTPtr query;
Block header;
Cluster::ShardInfo shard_info;
/// If we connect to replicas lazily.
/// (When there is a local replica with big delay).
bool lazy = false;
UInt32 local_delay = 0;
};
using Shards = std::vector<Shard>;
SelectStreamFactory(
const Block & header_,
const ColumnsDescriptionByShardNum & objects_by_shard_,
@ -31,7 +65,26 @@ public:
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count) override;
UInt32 shard_count);
struct ShardPlans
{
/// If a shard has local replicas this won't be nullptr
std::unique_ptr<QueryPlan> local_plan;
/// Contains several steps to read from all remote replicas
std::unique_ptr<QueryPlan> remote_plan;
};
ShardPlans createForShardWithParallelReplicas(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const StorageID & main_table,
const ASTPtr & table_function_ptr,
const ThrottlerPtr & throttler,
ContextPtr context,
UInt32 shard_count
);
private:
const Block header;

View File

@ -20,6 +20,7 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
extern const int LOGICAL_ERROR;
}
namespace ClusterProxy
@ -106,21 +107,19 @@ void executeQuery(
QueryProcessingStage::Enum processed_stage,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
IStreamFactory & stream_factory, Poco::Logger * log,
SelectStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster)
{
assert(log);
const Settings & settings = context->getSettingsRef();
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::vector<QueryPlanPtr> plans;
IStreamFactory::Shards remote_shards;
SelectStreamFactory::Shards remote_shards;
auto new_context = updateSettingsForCluster(*query_info.getCluster(), context, settings, log);
@ -213,6 +212,91 @@ void executeQuery(
query_plan.unitePlans(std::move(union_step), std::move(plans));
}
void executeQueryWithParallelReplicas(
QueryPlan & query_plan,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster)
{
const Settings & settings = context->getSettingsRef();
ThrottlerPtr user_level_throttler;
if (auto * process_list_element = context->getProcessListElement())
user_level_throttler = process_list_element->getUserNetworkThrottler();
/// Network bandwidth limit, if needed.
ThrottlerPtr throttler;
if (settings.max_network_bandwidth || settings.max_network_bytes)
{
throttler = std::make_shared<Throttler>(
settings.max_network_bandwidth,
settings.max_network_bytes,
"Limit for bytes to send or receive over network exceeded.",
user_level_throttler);
}
else
throttler = user_level_throttler;
std::vector<QueryPlanPtr> plans;
size_t shards = query_info.getCluster()->getShardCount();
for (const auto & shard_info : query_info.getCluster()->getShardsInfo())
{
ASTPtr query_ast_for_shard;
if (query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
{
query_ast_for_shard = query_ast->clone();
OptimizeShardingKeyRewriteInVisitor::Data visitor_data{
sharding_key_expr,
sharding_key_expr->getSampleBlock().getByPosition(0).type,
sharding_key_column_name,
shard_info,
not_optimized_cluster->getSlotToShard(),
};
OptimizeShardingKeyRewriteInVisitor visitor(visitor_data);
visitor.visit(query_ast_for_shard);
}
else
query_ast_for_shard = query_ast;
auto shard_plans = stream_factory.createForShardWithParallelReplicas(shard_info,
query_ast_for_shard, main_table, table_func_ptr, throttler, context, shards);
if (!shard_plans.local_plan && !shard_plans.remote_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No plans were generated for reading from shard. This is a bug");
if (shard_plans.local_plan)
plans.emplace_back(std::move(shard_plans.local_plan));
if (shard_plans.remote_plan)
plans.emplace_back(std::move(shard_plans.remote_plan));
}
if (plans.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No plans were generated for reading from Distributed. This is a bug");
if (plans.size() == 1)
{
query_plan = std::move(*plans.front());
return;
}
DataStreams input_streams;
input_streams.reserve(plans.size());
for (const auto & plan : plans)
input_streams.emplace_back(plan->getCurrentDataStream());
auto union_step = std::make_unique<UnionStep>(std::move(input_streams));
query_plan.unitePlans(std::move(union_step), std::move(plans));
}
}
}

View File

@ -23,7 +23,7 @@ struct StorageID;
namespace ClusterProxy
{
class IStreamFactory;
class SelectStreamFactory;
/// Update settings for Distributed query.
///
@ -46,7 +46,18 @@ void executeQuery(
QueryProcessingStage::Enum processed_stage,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
IStreamFactory & stream_factory, Poco::Logger * log,
SelectStreamFactory & stream_factory, Poco::Logger * log,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,
const ClusterPtr & not_optimized_cluster);
void executeQueryWithParallelReplicas(
QueryPlan & query_plan,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
SelectStreamFactory & stream_factory,
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
const ExpressionActionsPtr & sharding_key_expr,
const std::string & sharding_key_column_name,

View File

@ -1728,12 +1728,11 @@ void InterpreterSelectQuery::setMergeTreeReadTaskCallbackAndClientInfo(MergeTree
context->setMergeTreeReadTaskCallback(std::move(callback));
}
void InterpreterSelectQuery::setProperClientInfo()
void InterpreterSelectQuery::setProperClientInfo(size_t replica_num, size_t replica_count)
{
context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
assert(options.shard_count.has_value() && options.shard_num.has_value());
context->getClientInfo().count_participating_replicas = *options.shard_count;
context->getClientInfo().number_of_current_replica = *options.shard_num;
context->getClientInfo().count_participating_replicas = replica_count;
context->getClientInfo().number_of_current_replica = replica_num;
}
bool InterpreterSelectQuery::shouldMoveToPrewhere()

View File

@ -125,7 +125,7 @@ public:
void setMergeTreeReadTaskCallbackAndClientInfo(MergeTreeReadTaskCallback && callback);
/// It will set shard_num and shard_count to the client_info
void setProperClientInfo();
void setProperClientInfo(size_t replica_num, size_t replica_count);
private:
InterpreterSelectQuery(

View File

@ -41,6 +41,8 @@ std::unique_ptr<QueryPlan> createLocalPlan(
QueryProcessingStage::Enum processed_stage,
UInt32 shard_num,
UInt32 shard_count,
size_t replica_num,
size_t replica_count,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator)
{
checkStackSize();
@ -56,7 +58,7 @@ std::unique_ptr<QueryPlan> createLocalPlan(
.setShardInfo(shard_num, shard_count)
.ignoreASTOptimizations());
interpreter.setProperClientInfo();
interpreter.setProperClientInfo(replica_num, replica_count);
if (coordinator)
{
interpreter.setMergeTreeReadTaskCallbackAndClientInfo([coordinator](PartitionReadRequest request) -> std::optional<PartitionReadResponse>

View File

@ -15,6 +15,8 @@ std::unique_ptr<QueryPlan> createLocalPlan(
QueryProcessingStage::Enum processed_stage,
UInt32 shard_num,
UInt32 shard_count,
size_t replica_num,
size_t replica_count,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator);
}

View File

@ -16,6 +16,8 @@
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <boost/algorithm/string/join.hpp>
namespace DB
{
@ -62,7 +64,7 @@ static String formattedAST(const ASTPtr & ast)
}
ReadFromRemote::ReadFromRemote(
ClusterProxy::IStreamFactory::Shards shards_,
ClusterProxy::SelectStreamFactory::Shards shards_,
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
@ -87,10 +89,7 @@ ReadFromRemote::ReadFromRemote(
{
}
void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info)
void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard)
{
bool add_agg_info = stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
@ -103,10 +102,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
}
auto lazily_create_stream = [
replica_info = replica_info,
pool = pool ? pool : shard.pool,
coordinator = coordinator,
shard_num = shard.shard_num, shard_count = shard_count, query = shard.query, header = shard.header,
shard = shard, shard_count = shard_count, query = shard.query, header = shard.header,
context = context, throttler = throttler,
main_table = main_table, table_func_ptr = table_func_ptr,
scalars = scalars, external_tables = external_tables,
@ -122,15 +118,15 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
try
{
if (table_func_ptr)
try_results = pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY);
try_results = shard.shard_info.pool->getManyForTableFunction(timeouts, &current_settings, PoolMode::GET_MANY);
else
try_results = pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, main_table.getQualifiedName());
try_results = shard.shard_info.pool->getManyChecked(timeouts, &current_settings, PoolMode::GET_MANY, main_table.getQualifiedName());
}
catch (const Exception & ex)
{
if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
LOG_WARNING(&Poco::Logger::get("ClusterProxy::SelectStreamFactory"),
"Connections to remote replicas of local shard {} failed, will use stale local replica", shard_num);
"Connections to remote replicas of local shard {} failed, will use stale local replica", shard.shard_info.shard_num);
else
throw;
}
@ -144,7 +140,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
if (try_results.empty() || local_delay < max_remote_delay)
{
auto plan = createLocalPlan(query, header, context, stage, shard_num, shard_count, coordinator);
auto plan = createLocalPlan(query, header, context, stage, shard.shard_info.shard_num, shard_count, 0, 0, /*coordinator=*/nullptr);
return QueryPipelineBuilder::getPipe(std::move(*plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
@ -160,10 +156,9 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
String query_string = formattedAST(query);
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage,
RemoteQueryExecutor::Extension{.parallel_reading_coordinator = std::move(coordinator), .replica_info = replica_info});
shard.shard_info.pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read);
}
@ -174,10 +169,7 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
addConvertingActions(pipes.back(), output_stream->header);
}
void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info)
void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard)
{
bool add_agg_info = stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
@ -192,20 +184,15 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::
String query_string = formattedAST(shard.query);
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
std::shared_ptr<RemoteQueryExecutor> remote_query_executor;
remote_query_executor = std::make_shared<RemoteQueryExecutor>(
pool ? pool : shard.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage,
RemoteQueryExecutor::Extension{.parallel_reading_coordinator = std::move(coordinator), .replica_info = std::move(replica_info)});
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage);
remote_query_executor->setLogger(log);
/// In case of parallel reading from replicas we have a connection pool per replica.
/// Setting PoolMode will make no sense.
if (!pool)
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
remote_query_executor->setMainTable(main_table);
@ -219,52 +206,119 @@ void ReadFromRemote::initializePipeline(QueryPipelineBuilder & pipeline, const B
{
Pipes pipes;
const auto & settings = context->getSettingsRef();
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas;
/// We have to create a pipe for each replica
/// FIXME: The second condition is only for tests to work, because hedged connections enabled by default.
if (settings.max_parallel_replicas > 1 && !enable_sample_offset_parallel_processing && !context->getSettingsRef().use_hedged_requests)
for (const auto & shard : shards)
{
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard : shards)
{
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>();
for (size_t replica_num = 0; replica_num < shard.num_replicas; ++replica_num)
{
IConnections::ReplicaInfo replica_info
{
.all_replicas_count = shard.num_replicas,
.number_of_current_replica = replica_num
};
auto pool = shard.per_replica_pools[replica_num];
auto pool_with_failover = std::make_shared<ConnectionPoolWithFailover>(
ConnectionPoolPtrs{pool}, current_settings.load_balancing);
if (shard.lazy)
addLazyPipe(pipes, shard, coordinator, pool_with_failover, replica_info);
else
addPipe(pipes, shard, coordinator, pool_with_failover, replica_info);
}
}
}
else
{
for (const auto & shard : shards)
{
if (shard.lazy)
addLazyPipe(pipes, shard, /*coordinator=*/nullptr, /*pool*/{}, /*replica_info*/std::nullopt);
else
addPipe(pipes, shard, /*coordinator=*/nullptr, /*pool*/{}, /*replica_info*/std::nullopt);
}
if (shard.lazy)
addLazyPipe(pipes, shard);
else
addPipe(pipes, shard);
}
auto pipe = Pipe::unitePipes(std::move(pipes));
pipeline.init(std::move(pipe));
}
ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
ParallelReplicasReadingCoordinatorPtr coordinator_,
ClusterProxy::SelectStreamFactory::Shard shard_,
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ASTPtr table_func_ptr_,
ContextPtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_)
: ISourceStep(DataStream{.header = std::move(header_)})
, coordinator(std::move(coordinator_))
, shard(std::move(shard_))
, stage(std::move(stage_))
, main_table(std::move(main_table_))
, table_func_ptr(table_func_ptr_)
, context(context_)
, throttler(throttler_)
, scalars(scalars_)
, external_tables{external_tables_}
, log(log_)
, shard_count(shard_count_)
{
std::vector<String> description;
for (const auto & address : shard.shard_info.all_addresses)
if (!address.is_local)
description.push_back(fmt::format("Replica: {}", address.host_name));
setStepDescription(boost::algorithm::join(description, ", "));
}
void ReadFromParallelRemoteReplicasStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
Pipes pipes;
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (size_t replica_num = 0; replica_num < shard.shard_info.getAllNodeCount(); ++replica_num)
{
if (shard.shard_info.all_addresses[replica_num].is_local)
continue;
IConnections::ReplicaInfo replica_info
{
.all_replicas_count = shard.shard_info.getAllNodeCount(),
.number_of_current_replica = replica_num
};
auto pool = shard.shard_info.per_replica_pools[replica_num];
assert(pool);
auto pool_with_failover = std::make_shared<ConnectionPoolWithFailover>(
ConnectionPoolPtrs{pool}, current_settings.load_balancing);
addPipeForSingeReplica(pipes, pool_with_failover, replica_info);
}
auto pipe = Pipe::unitePipes(std::move(pipes));
pipeline.init(std::move(pipe));
}
void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica(Pipes & pipes, std::shared_ptr<ConnectionPoolWithFailover> pool, IConnections::ReplicaInfo replica_info)
{
bool add_agg_info = stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
bool add_extremes = false;
bool async_read = context->getSettingsRef().async_socket_for_remote;
if (stage == QueryProcessingStage::Complete)
{
add_totals = shard.query->as<ASTSelectQuery &>().group_by_with_totals;
add_extremes = context->getSettingsRef().extremes;
}
String query_string = formattedAST(shard.query);
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
std::shared_ptr<RemoteQueryExecutor> remote_query_executor;
remote_query_executor = std::make_shared<RemoteQueryExecutor>(
pool, query_string, shard.header, context, throttler, scalars, external_tables, stage,
RemoteQueryExecutor::Extension{.parallel_reading_coordinator = coordinator, .replica_info = std::move(replica_info)});
remote_query_executor->setLogger(log);
if (!table_func_ptr)
remote_query_executor->setMainTable(main_table);
pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read));
pipes.back().addInterpreterContext(context);
addConvertingActions(pipes.back(), output_stream->header);
}
}

View File

@ -4,7 +4,7 @@
#include <Client/IConnections.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/StorageID.h>
#include <Interpreters/ClusterProxy/IStreamFactory.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
namespace DB
@ -22,7 +22,7 @@ class ReadFromRemote final : public ISourceStep
{
public:
ReadFromRemote(
ClusterProxy::IStreamFactory::Shards shards_,
ClusterProxy::SelectStreamFactory::Shards shards_,
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
@ -45,7 +45,7 @@ private:
PerShard
};
ClusterProxy::IStreamFactory::Shards shards;
ClusterProxy::SelectStreamFactory::Shards shards;
QueryProcessingStage::Enum stage;
StorageID main_table;
@ -60,16 +60,52 @@ private:
Poco::Logger * log;
UInt32 shard_count;
void addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info);
void addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info);
void addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
void addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFactory::Shard & shard);
};
void addPipeForReplica();
class ReadFromParallelRemoteReplicasStep : public ISourceStep
{
public:
ReadFromParallelRemoteReplicasStep(
ParallelReplicasReadingCoordinatorPtr coordinator_,
ClusterProxy::SelectStreamFactory::Shard shard,
Block header_,
QueryProcessingStage::Enum stage_,
StorageID main_table_,
ASTPtr table_func_ptr_,
ContextPtr context_,
ThrottlerPtr throttler_,
Scalars scalars_,
Tables external_tables_,
Poco::Logger * log_,
UInt32 shard_count_);
String getName() const override { return "ReadFromRemoteParallelReplicas"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
private:
void addPipeForSingeReplica(Pipes & pipes, std::shared_ptr<ConnectionPoolWithFailover> pool, IConnections::ReplicaInfo replica_info);
ParallelReplicasReadingCoordinatorPtr coordinator;
ClusterProxy::SelectStreamFactory::Shard shard;
QueryProcessingStage::Enum stage;
StorageID main_table;
ASTPtr table_func_ptr;
ContextPtr context;
ThrottlerPtr throttler;
Scalars scalars;
Tables external_tables;
Poco::Logger * log;
UInt32 shard_count{0};
};
}

View File

@ -17,4 +17,6 @@ private:
std::unique_ptr<Impl> pimpl;
};
using ParallelReplicasReadingCoordinatorPtr = std::shared_ptr<ParallelReplicasReadingCoordinator>;
}

View File

@ -707,13 +707,25 @@ void StorageDistributed::read(
storage_snapshot,
processed_stage);
ClusterProxy::executeQuery(
query_plan, header, processed_stage,
main_table, remote_table_function_ptr,
select_stream_factory, log, modified_query_ast,
local_context, query_info,
sharding_key_expr, sharding_key_column_name,
query_info.cluster);
auto settings = local_context->getSettingsRef();
bool parallel_replicas = settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas && !settings.use_hedged_requests;
if (parallel_replicas)
ClusterProxy::executeQueryWithParallelReplicas(
query_plan, main_table, remote_table_function_ptr,
select_stream_factory, modified_query_ast,
local_context, query_info,
sharding_key_expr, sharding_key_column_name,
query_info.cluster);
else
ClusterProxy::executeQuery(
query_plan, header, processed_stage,
main_table, remote_table_function_ptr,
select_stream_factory, log, modified_query_ast,
local_context, query_info,
sharding_key_expr, sharding_key_column_name,
query_info.cluster);
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized())
@ -1523,4 +1535,3 @@ void registerStorageDistributed(StorageFactory & factory)
}
}

View File

@ -10,7 +10,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# All replicas are localhost, disable `prefer_localhost_replica` option to test network interface
# Currently this feature could not work with hedged requests
# Enabling `enable_sample_offset_parallel_processing` feature could lead to intersecting marks, so some of them would be thrown away and it will lead to incorrect result of SELECT query
SETTINGS="--max_parallel_replicas=3 --use_hedged_requests=false --async_socket_for_remote=false --allow_experimental_parallel_reading_from_replicas=true"
SETTINGS="--max_parallel_replicas=3 --use_hedged_requests=false --allow_experimental_parallel_reading_from_replicas=true"
# Prepare tables
$CLICKHOUSE_CLIENT $SETTINGS -nm -q '''