mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #26381 from ClickHouse/separate-step-for-distributed
Add separate step to read from remote.
This commit is contained in:
commit
149c5a0e9b
@ -18,6 +18,8 @@ using Pipes = std::vector<Pipe>;
|
||||
class QueryPlan;
|
||||
using QueryPlanPtr = std::unique_ptr<QueryPlan>;
|
||||
|
||||
struct StorageID;
|
||||
|
||||
namespace ClusterProxy
|
||||
{
|
||||
|
||||
@ -28,15 +30,31 @@ class IStreamFactory
|
||||
public:
|
||||
virtual ~IStreamFactory() = default;
|
||||
|
||||
struct Shard
|
||||
{
|
||||
/// Query and header may be changed depending on shard.
|
||||
ASTPtr query;
|
||||
Block header;
|
||||
|
||||
size_t shard_num = 0;
|
||||
ConnectionPoolWithFailoverPtr pool;
|
||||
|
||||
/// If we connect to replicas lazily.
|
||||
/// (When there is a local replica with big delay).
|
||||
bool lazy = false;
|
||||
UInt32 local_delay = 0;
|
||||
};
|
||||
|
||||
using Shards = std::vector<Shard>;
|
||||
|
||||
virtual void createForShard(
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const ASTPtr & query_ast,
|
||||
ContextPtr context, const ThrottlerPtr & throttler,
|
||||
const SelectQueryInfo & query_info,
|
||||
std::vector<QueryPlanPtr> & res,
|
||||
Pipes & remote_pipes,
|
||||
Pipes & delayed_pipes,
|
||||
Poco::Logger * log) = 0;
|
||||
const StorageID & main_table,
|
||||
const ASTPtr & table_func_ptr,
|
||||
ContextPtr context,
|
||||
std::vector<QueryPlanPtr> & local_plans,
|
||||
Shards & remote_shards) = 0;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,5 @@
|
||||
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <DataStreams/RemoteBlockInputStream.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Common/Exception.h>
|
||||
@ -11,10 +10,6 @@
|
||||
#include <Interpreters/RequiredSourceColumnsVisitor.h>
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
#include <Processors/Pipe.h>
|
||||
#include <Processors/Sources/RemoteSource.h>
|
||||
#include <Processors/Sources/DelayedSource.h>
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
|
||||
@ -32,7 +27,6 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ALL_CONNECTION_TRIES_FAILED;
|
||||
extern const int ALL_REPLICAS_ARE_STALE;
|
||||
}
|
||||
|
||||
@ -42,35 +36,13 @@ namespace ClusterProxy
|
||||
SelectStreamFactory::SelectStreamFactory(
|
||||
const Block & header_,
|
||||
QueryProcessingStage::Enum processed_stage_,
|
||||
StorageID main_table_,
|
||||
const Scalars & scalars_,
|
||||
bool has_virtual_shard_num_column_,
|
||||
const Tables & external_tables_)
|
||||
bool has_virtual_shard_num_column_)
|
||||
: header(header_),
|
||||
processed_stage{processed_stage_},
|
||||
main_table(std::move(main_table_)),
|
||||
table_func_ptr{nullptr},
|
||||
scalars{scalars_},
|
||||
has_virtual_shard_num_column(has_virtual_shard_num_column_),
|
||||
external_tables{external_tables_}
|
||||
has_virtual_shard_num_column(has_virtual_shard_num_column_)
|
||||
{
|
||||
}
|
||||
|
||||
SelectStreamFactory::SelectStreamFactory(
|
||||
const Block & header_,
|
||||
QueryProcessingStage::Enum processed_stage_,
|
||||
ASTPtr table_func_ptr_,
|
||||
const Scalars & scalars_,
|
||||
bool has_virtual_shard_num_column_,
|
||||
const Tables & external_tables_)
|
||||
: header(header_),
|
||||
processed_stage{processed_stage_},
|
||||
table_func_ptr{table_func_ptr_},
|
||||
scalars{scalars_},
|
||||
has_virtual_shard_num_column(has_virtual_shard_num_column_),
|
||||
external_tables{external_tables_}
|
||||
{
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
@ -152,18 +124,6 @@ void addConvertingActions(QueryPlan & plan, const Block & header)
|
||||
plan.addStep(std::move(converting));
|
||||
}
|
||||
|
||||
void addConvertingActions(Pipe & pipe, const Block & header)
|
||||
{
|
||||
if (blocksHaveEqualStructure(pipe.getHeader(), header))
|
||||
return;
|
||||
|
||||
auto convert_actions = std::make_shared<ExpressionActions>(getConvertingDAG(pipe.getHeader(), header));
|
||||
pipe.addSimpleTransform([&](const Block & cur_header, Pipe::StreamType) -> ProcessorPtr
|
||||
{
|
||||
return std::make_shared<ExpressionTransform>(cur_header, convert_actions);
|
||||
});
|
||||
}
|
||||
|
||||
std::unique_ptr<QueryPlan> createLocalPlan(
|
||||
const ASTPtr & query_ast,
|
||||
const Block & header,
|
||||
@ -182,37 +142,17 @@ std::unique_ptr<QueryPlan> createLocalPlan(
|
||||
return query_plan;
|
||||
}
|
||||
|
||||
String formattedAST(const ASTPtr & ast)
|
||||
{
|
||||
if (!ast)
|
||||
return {};
|
||||
WriteBufferFromOwnString buf;
|
||||
formatAST(*ast, buf, false, true);
|
||||
return buf.str();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void SelectStreamFactory::createForShard(
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const ASTPtr & query_ast,
|
||||
ContextPtr context, const ThrottlerPtr & throttler,
|
||||
const SelectQueryInfo &,
|
||||
std::vector<QueryPlanPtr> & plans,
|
||||
Pipes & remote_pipes,
|
||||
Pipes & delayed_pipes,
|
||||
Poco::Logger * log)
|
||||
const StorageID & main_table,
|
||||
const ASTPtr & table_func_ptr,
|
||||
ContextPtr context,
|
||||
std::vector<QueryPlanPtr> & local_plans,
|
||||
Shards & remote_shards)
|
||||
{
|
||||
bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
|
||||
bool add_totals = false;
|
||||
bool add_extremes = false;
|
||||
bool async_read = context->getSettingsRef().async_socket_for_remote;
|
||||
if (processed_stage == QueryProcessingStage::Complete)
|
||||
{
|
||||
add_totals = query_ast->as<ASTSelectQuery &>().group_by_with_totals;
|
||||
add_extremes = context->getSettingsRef().extremes;
|
||||
}
|
||||
|
||||
auto modified_query_ast = query_ast->clone();
|
||||
auto modified_header = header;
|
||||
if (has_virtual_shard_num_column)
|
||||
@ -231,25 +171,19 @@ void SelectStreamFactory::createForShard(
|
||||
|
||||
auto emplace_local_stream = [&]()
|
||||
{
|
||||
plans.emplace_back(createLocalPlan(modified_query_ast, modified_header, context, processed_stage));
|
||||
addConvertingActions(*plans.back(), header);
|
||||
local_plans.emplace_back(createLocalPlan(modified_query_ast, modified_header, context, processed_stage));
|
||||
addConvertingActions(*local_plans.back(), header);
|
||||
};
|
||||
|
||||
String modified_query = formattedAST(modified_query_ast);
|
||||
|
||||
auto emplace_remote_stream = [&]()
|
||||
{
|
||||
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||
shard_info.pool, modified_query, modified_header, context, throttler, scalars, external_tables, processed_stage);
|
||||
remote_query_executor->setLogger(log);
|
||||
|
||||
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
|
||||
if (!table_func_ptr)
|
||||
remote_query_executor->setMainTable(main_table);
|
||||
|
||||
remote_pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read));
|
||||
remote_pipes.back().addInterpreterContext(context);
|
||||
addConvertingActions(remote_pipes.back(), header);
|
||||
remote_shards.emplace_back(Shard{
|
||||
.query = modified_query_ast,
|
||||
.header = modified_header,
|
||||
.shard_num = shard_info.shard_num,
|
||||
.pool = shard_info.pool,
|
||||
.lazy = false
|
||||
});
|
||||
};
|
||||
|
||||
const auto & settings = context->getSettingsRef();
|
||||
@ -340,65 +274,14 @@ void SelectStreamFactory::createForShard(
|
||||
/// Try our luck with remote replicas, but if they are stale too, then fallback to local replica.
|
||||
/// Do it lazily to avoid connecting in the main thread.
|
||||
|
||||
auto lazily_create_stream = [
|
||||
pool = shard_info.pool, shard_num = shard_info.shard_num, modified_query, header = modified_header, modified_query_ast,
|
||||
context, throttler,
|
||||
main_table = main_table, table_func_ptr = table_func_ptr, scalars = scalars, external_tables = external_tables,
|
||||
stage = processed_stage, local_delay, add_agg_info, add_totals, add_extremes, async_read]()
|
||||
-> Pipe
|
||||
{
|
||||
auto current_settings = context->getSettingsRef();
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
|
||||
current_settings).getSaturated(
|
||||
current_settings.max_execution_time);
|
||||
std::vector<ConnectionPoolWithFailover::TryResult> try_results;
|
||||
try
|
||||
{
|
||||
if (table_func_ptr)
|
||||
try_results = pool->getManyForTableFunction(timeouts, ¤t_settings, PoolMode::GET_MANY);
|
||||
else
|
||||
try_results = pool->getManyChecked(timeouts, ¤t_settings, PoolMode::GET_MANY, main_table.getQualifiedName());
|
||||
}
|
||||
catch (const Exception & ex)
|
||||
{
|
||||
if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
|
||||
LOG_WARNING(&Poco::Logger::get("ClusterProxy::SelectStreamFactory"),
|
||||
"Connections to remote replicas of local shard {} failed, will use stale local replica", shard_num);
|
||||
else
|
||||
throw;
|
||||
}
|
||||
|
||||
double max_remote_delay = 0.0;
|
||||
for (const auto & try_result : try_results)
|
||||
{
|
||||
if (!try_result.is_up_to_date)
|
||||
max_remote_delay = std::max(try_result.staleness, max_remote_delay);
|
||||
}
|
||||
|
||||
if (try_results.empty() || local_delay < max_remote_delay)
|
||||
{
|
||||
auto plan = createLocalPlan(modified_query_ast, header, context, stage);
|
||||
return QueryPipeline::getPipe(std::move(*plan->buildQueryPipeline(
|
||||
QueryPlanOptimizationSettings::fromContext(context),
|
||||
BuildQueryPipelineSettings::fromContext(context))));
|
||||
}
|
||||
else
|
||||
{
|
||||
std::vector<IConnectionPool::Entry> connections;
|
||||
connections.reserve(try_results.size());
|
||||
for (auto & try_result : try_results)
|
||||
connections.emplace_back(std::move(try_result.entry));
|
||||
|
||||
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||
std::move(connections), modified_query, header, context, throttler, scalars, external_tables, stage);
|
||||
|
||||
return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read);
|
||||
}
|
||||
};
|
||||
|
||||
delayed_pipes.emplace_back(createDelayedPipe(modified_header, lazily_create_stream, add_totals, add_extremes));
|
||||
delayed_pipes.back().addInterpreterContext(context);
|
||||
addConvertingActions(delayed_pipes.back(), header);
|
||||
remote_shards.emplace_back(Shard{
|
||||
.query = modified_query_ast,
|
||||
.header = modified_header,
|
||||
.shard_num = shard_info.shard_num,
|
||||
.pool = shard_info.pool,
|
||||
.lazy = true,
|
||||
.local_delay = local_delay
|
||||
});
|
||||
}
|
||||
else
|
||||
emplace_remote_stream();
|
||||
|
@ -14,42 +14,25 @@ namespace ClusterProxy
|
||||
class SelectStreamFactory final : public IStreamFactory
|
||||
{
|
||||
public:
|
||||
/// Database in a query.
|
||||
SelectStreamFactory(
|
||||
const Block & header_,
|
||||
QueryProcessingStage::Enum processed_stage_,
|
||||
StorageID main_table_,
|
||||
const Scalars & scalars_,
|
||||
bool has_virtual_shard_num_column_,
|
||||
const Tables & external_tables);
|
||||
|
||||
/// TableFunction in a query.
|
||||
SelectStreamFactory(
|
||||
const Block & header_,
|
||||
QueryProcessingStage::Enum processed_stage_,
|
||||
ASTPtr table_func_ptr_,
|
||||
const Scalars & scalars_,
|
||||
bool has_virtual_shard_num_column_,
|
||||
const Tables & external_tables_);
|
||||
bool has_virtual_shard_num_column_);
|
||||
|
||||
void createForShard(
|
||||
const Cluster::ShardInfo & shard_info,
|
||||
const ASTPtr & query_ast,
|
||||
ContextPtr context, const ThrottlerPtr & throttler,
|
||||
const SelectQueryInfo & query_info,
|
||||
std::vector<QueryPlanPtr> & plans,
|
||||
Pipes & remote_pipes,
|
||||
Pipes & delayed_pipes,
|
||||
Poco::Logger * log) override;
|
||||
const StorageID & main_table,
|
||||
const ASTPtr & table_func_ptr,
|
||||
ContextPtr context,
|
||||
std::vector<QueryPlanPtr> & local_plans,
|
||||
Shards & remote_shards) override;
|
||||
|
||||
private:
|
||||
const Block header;
|
||||
QueryProcessingStage::Enum processed_stage;
|
||||
StorageID main_table = StorageID::createEmpty();
|
||||
ASTPtr table_func_ptr;
|
||||
Scalars scalars;
|
||||
|
||||
bool has_virtual_shard_num_column = false;
|
||||
Tables external_tables;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -8,7 +8,7 @@
|
||||
#include <Interpreters/OptimizeShardingKeyRewriteInVisitor.h>
|
||||
#include <Processors/Pipe.h>
|
||||
#include <Processors/QueryPlan/QueryPlan.h>
|
||||
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
|
||||
#include <Processors/QueryPlan/ReadFromRemote.h>
|
||||
#include <Processors/QueryPlan/UnionStep.h>
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
|
||||
@ -101,6 +101,10 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr c
|
||||
|
||||
void executeQuery(
|
||||
QueryPlan & query_plan,
|
||||
const Block & header,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const StorageID & main_table,
|
||||
const ASTPtr & table_func_ptr,
|
||||
IStreamFactory & stream_factory, Poco::Logger * log,
|
||||
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
|
||||
const ExpressionActionsPtr & sharding_key_expr,
|
||||
@ -115,8 +119,7 @@ void executeQuery(
|
||||
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
|
||||
|
||||
std::vector<QueryPlanPtr> plans;
|
||||
Pipes remote_pipes;
|
||||
Pipes delayed_pipes;
|
||||
IStreamFactory::Shards remote_shards;
|
||||
|
||||
auto new_context = updateSettingsForCluster(*query_info.getCluster(), context, settings, log);
|
||||
|
||||
@ -161,29 +164,33 @@ void executeQuery(
|
||||
query_ast_for_shard = query_ast;
|
||||
|
||||
stream_factory.createForShard(shard_info,
|
||||
query_ast_for_shard,
|
||||
new_context, throttler, query_info, plans,
|
||||
remote_pipes, delayed_pipes, log);
|
||||
query_ast_for_shard, main_table, table_func_ptr,
|
||||
new_context, plans, remote_shards);
|
||||
}
|
||||
|
||||
if (!remote_pipes.empty())
|
||||
if (!remote_shards.empty())
|
||||
{
|
||||
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
|
||||
auto external_tables = context->getExternalTables();
|
||||
|
||||
auto plan = std::make_unique<QueryPlan>();
|
||||
auto read_from_remote = std::make_unique<ReadFromPreparedSource>(Pipe::unitePipes(std::move(remote_pipes)));
|
||||
auto read_from_remote = std::make_unique<ReadFromRemote>(
|
||||
std::move(remote_shards),
|
||||
header,
|
||||
processed_stage,
|
||||
main_table,
|
||||
table_func_ptr,
|
||||
new_context,
|
||||
throttler,
|
||||
scalars,
|
||||
std::move(external_tables),
|
||||
log);
|
||||
|
||||
read_from_remote->setStepDescription("Read from remote replica");
|
||||
plan->addStep(std::move(read_from_remote));
|
||||
plans.emplace_back(std::move(plan));
|
||||
}
|
||||
|
||||
if (!delayed_pipes.empty())
|
||||
{
|
||||
auto plan = std::make_unique<QueryPlan>();
|
||||
auto read_from_remote = std::make_unique<ReadFromPreparedSource>(Pipe::unitePipes(std::move(delayed_pipes)));
|
||||
read_from_remote->setStepDescription("Read from delayed local replica");
|
||||
plan->addStep(std::move(read_from_remote));
|
||||
plans.emplace_back(std::move(plan));
|
||||
}
|
||||
|
||||
if (plans.empty())
|
||||
return;
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <Core/QueryProcessingStage.h>
|
||||
#include <Parsers/IAST.h>
|
||||
|
||||
namespace DB
|
||||
@ -17,6 +18,8 @@ class QueryPlan;
|
||||
class ExpressionActions;
|
||||
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||
|
||||
struct StorageID;
|
||||
|
||||
namespace ClusterProxy
|
||||
{
|
||||
|
||||
@ -38,6 +41,10 @@ ContextMutablePtr updateSettingsForCluster(const Cluster & cluster, ContextPtr c
|
||||
/// (currently SELECT, DESCRIBE).
|
||||
void executeQuery(
|
||||
QueryPlan & query_plan,
|
||||
const Block & header,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
const StorageID & main_table,
|
||||
const ASTPtr & table_func_ptr,
|
||||
IStreamFactory & stream_factory, Poco::Logger * log,
|
||||
const ASTPtr & query_ast, ContextPtr context, const SelectQueryInfo & query_info,
|
||||
const ExpressionActionsPtr & sharding_key_expr,
|
||||
|
228
src/Processors/QueryPlan/ReadFromRemote.cpp
Normal file
228
src/Processors/QueryPlan/ReadFromRemote.cpp
Normal file
@ -0,0 +1,228 @@
|
||||
#include <Processors/QueryPlan/ReadFromRemote.h>
|
||||
#include <Processors/QueryPlan/ExpressionStep.h>
|
||||
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
|
||||
#include <DataStreams/RemoteQueryExecutor.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/formatAST.h>
|
||||
#include <Processors/Sources/RemoteSource.h>
|
||||
#include <Processors/Sources/DelayedSource.h>
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
#include <Interpreters/ActionsDAG.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <IO/ConnectionTimeoutsContext.h>
|
||||
#include <Common/checkStackSize.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ALL_CONNECTION_TRIES_FAILED;
|
||||
}
|
||||
|
||||
static ActionsDAGPtr getConvertingDAG(const Block & block, const Block & header)
|
||||
{
|
||||
/// Convert header structure to expected.
|
||||
/// Also we ignore constants from result and replace it with constants from header.
|
||||
/// It is needed for functions like `now64()` or `randConstant()` because their values may be different.
|
||||
return ActionsDAG::makeConvertingActions(
|
||||
block.getColumnsWithTypeAndName(),
|
||||
header.getColumnsWithTypeAndName(),
|
||||
ActionsDAG::MatchColumnsMode::Name,
|
||||
true);
|
||||
}
|
||||
|
||||
void addConvertingActions(QueryPlan & plan, const Block & header)
|
||||
{
|
||||
if (blocksHaveEqualStructure(plan.getCurrentDataStream().header, header))
|
||||
return;
|
||||
|
||||
auto convert_actions_dag = getConvertingDAG(plan.getCurrentDataStream().header, header);
|
||||
auto converting = std::make_unique<ExpressionStep>(plan.getCurrentDataStream(), convert_actions_dag);
|
||||
plan.addStep(std::move(converting));
|
||||
}
|
||||
|
||||
static void addConvertingActions(Pipe & pipe, const Block & header)
|
||||
{
|
||||
if (blocksHaveEqualStructure(pipe.getHeader(), header))
|
||||
return;
|
||||
|
||||
auto convert_actions = std::make_shared<ExpressionActions>(getConvertingDAG(pipe.getHeader(), header));
|
||||
pipe.addSimpleTransform([&](const Block & cur_header, Pipe::StreamType) -> ProcessorPtr
|
||||
{
|
||||
return std::make_shared<ExpressionTransform>(cur_header, convert_actions);
|
||||
});
|
||||
}
|
||||
|
||||
static String formattedAST(const ASTPtr & ast)
|
||||
{
|
||||
if (!ast)
|
||||
return {};
|
||||
WriteBufferFromOwnString buf;
|
||||
formatAST(*ast, buf, false, true);
|
||||
return buf.str();
|
||||
}
|
||||
|
||||
static std::unique_ptr<QueryPlan> createLocalPlan(
|
||||
const ASTPtr & query_ast,
|
||||
const Block & header,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage)
|
||||
{
|
||||
checkStackSize();
|
||||
|
||||
auto query_plan = std::make_unique<QueryPlan>();
|
||||
|
||||
InterpreterSelectQuery interpreter(query_ast, context, SelectQueryOptions(processed_stage));
|
||||
interpreter.buildQueryPlan(*query_plan);
|
||||
|
||||
addConvertingActions(*query_plan, header);
|
||||
|
||||
return query_plan;
|
||||
}
|
||||
|
||||
|
||||
ReadFromRemote::ReadFromRemote(
|
||||
ClusterProxy::IStreamFactory::Shards shards_,
|
||||
Block header_,
|
||||
QueryProcessingStage::Enum stage_,
|
||||
StorageID main_table_,
|
||||
ASTPtr table_func_ptr_,
|
||||
ContextPtr context_,
|
||||
ThrottlerPtr throttler_,
|
||||
Scalars scalars_,
|
||||
Tables external_tables_,
|
||||
Poco::Logger * log_)
|
||||
: ISourceStep(DataStream{.header = std::move(header_)})
|
||||
, shards(std::move(shards_))
|
||||
, stage(stage_)
|
||||
, main_table(std::move(main_table_))
|
||||
, table_func_ptr(std::move(table_func_ptr_))
|
||||
, context(std::move(context_))
|
||||
, throttler(std::move(throttler_))
|
||||
, scalars(std::move(scalars_))
|
||||
, external_tables(std::move(external_tables_))
|
||||
, log(log_)
|
||||
{
|
||||
}
|
||||
|
||||
void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard)
|
||||
{
|
||||
bool add_agg_info = stage == QueryProcessingStage::WithMergeableState;
|
||||
bool add_totals = false;
|
||||
bool add_extremes = false;
|
||||
bool async_read = context->getSettingsRef().async_socket_for_remote;
|
||||
if (stage == QueryProcessingStage::Complete)
|
||||
{
|
||||
add_totals = shard.query->as<ASTSelectQuery &>().group_by_with_totals;
|
||||
add_extremes = context->getSettingsRef().extremes;
|
||||
}
|
||||
|
||||
auto lazily_create_stream = [
|
||||
pool = shard.pool, shard_num = shard.shard_num, query = shard.query, header = shard.header,
|
||||
context = context, throttler = throttler,
|
||||
main_table = main_table, table_func_ptr = table_func_ptr,
|
||||
scalars = scalars, external_tables = external_tables,
|
||||
stage = stage, local_delay = shard.local_delay,
|
||||
add_agg_info, add_totals, add_extremes, async_read]()
|
||||
-> Pipe
|
||||
{
|
||||
auto current_settings = context->getSettingsRef();
|
||||
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
|
||||
current_settings).getSaturated(
|
||||
current_settings.max_execution_time);
|
||||
std::vector<ConnectionPoolWithFailover::TryResult> try_results;
|
||||
try
|
||||
{
|
||||
if (table_func_ptr)
|
||||
try_results = pool->getManyForTableFunction(timeouts, ¤t_settings, PoolMode::GET_MANY);
|
||||
else
|
||||
try_results = pool->getManyChecked(timeouts, ¤t_settings, PoolMode::GET_MANY, main_table.getQualifiedName());
|
||||
}
|
||||
catch (const Exception & ex)
|
||||
{
|
||||
if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
|
||||
LOG_WARNING(&Poco::Logger::get("ClusterProxy::SelectStreamFactory"),
|
||||
"Connections to remote replicas of local shard {} failed, will use stale local replica", shard_num);
|
||||
else
|
||||
throw;
|
||||
}
|
||||
|
||||
double max_remote_delay = 0.0;
|
||||
for (const auto & try_result : try_results)
|
||||
{
|
||||
if (!try_result.is_up_to_date)
|
||||
max_remote_delay = std::max(try_result.staleness, max_remote_delay);
|
||||
}
|
||||
|
||||
if (try_results.empty() || local_delay < max_remote_delay)
|
||||
{
|
||||
auto plan = createLocalPlan(query, header, context, stage);
|
||||
return QueryPipeline::getPipe(std::move(*plan->buildQueryPipeline(
|
||||
QueryPlanOptimizationSettings::fromContext(context),
|
||||
BuildQueryPipelineSettings::fromContext(context))));
|
||||
}
|
||||
else
|
||||
{
|
||||
std::vector<IConnectionPool::Entry> connections;
|
||||
connections.reserve(try_results.size());
|
||||
for (auto & try_result : try_results)
|
||||
connections.emplace_back(std::move(try_result.entry));
|
||||
|
||||
String query_string = formattedAST(query);
|
||||
|
||||
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||
std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
|
||||
|
||||
return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read);
|
||||
}
|
||||
};
|
||||
|
||||
pipes.emplace_back(createDelayedPipe(shard.header, lazily_create_stream, add_totals, add_extremes));
|
||||
pipes.back().addInterpreterContext(context);
|
||||
addConvertingActions(pipes.back(), output_stream->header);
|
||||
}
|
||||
|
||||
void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard)
|
||||
{
|
||||
bool add_agg_info = stage == QueryProcessingStage::WithMergeableState;
|
||||
bool add_totals = false;
|
||||
bool add_extremes = false;
|
||||
bool async_read = context->getSettingsRef().async_socket_for_remote;
|
||||
if (stage == QueryProcessingStage::Complete)
|
||||
{
|
||||
add_totals = shard.query->as<ASTSelectQuery &>().group_by_with_totals;
|
||||
add_extremes = context->getSettingsRef().extremes;
|
||||
}
|
||||
|
||||
String query_string = formattedAST(shard.query);
|
||||
|
||||
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
|
||||
shard.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage);
|
||||
remote_query_executor->setLogger(log);
|
||||
|
||||
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
|
||||
if (!table_func_ptr)
|
||||
remote_query_executor->setMainTable(main_table);
|
||||
|
||||
pipes.emplace_back(createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read));
|
||||
pipes.back().addInterpreterContext(context);
|
||||
addConvertingActions(pipes.back(), output_stream->header);
|
||||
}
|
||||
|
||||
void ReadFromRemote::initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
Pipes pipes;
|
||||
for (const auto & shard : shards)
|
||||
{
|
||||
if (shard.lazy)
|
||||
addLazyPipe(pipes, shard);
|
||||
else
|
||||
addPipe(pipes, shard);
|
||||
}
|
||||
|
||||
auto pipe = Pipe::unitePipes(std::move(pipes));
|
||||
pipeline.init(std::move(pipe));
|
||||
}
|
||||
|
||||
}
|
57
src/Processors/QueryPlan/ReadFromRemote.h
Normal file
57
src/Processors/QueryPlan/ReadFromRemote.h
Normal file
@ -0,0 +1,57 @@
|
||||
#pragma once
|
||||
#include <Processors/QueryPlan/ISourceStep.h>
|
||||
#include <Core/QueryProcessingStage.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include <Interpreters/StorageID.h>
|
||||
#include <Interpreters/ClusterProxy/IStreamFactory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ConnectionPoolWithFailover;
|
||||
using ConnectionPoolWithFailoverPtr = std::shared_ptr<ConnectionPoolWithFailover>;
|
||||
|
||||
class Throttler;
|
||||
using ThrottlerPtr = std::shared_ptr<Throttler>;
|
||||
|
||||
/// Reading step from remote servers.
|
||||
/// Unite query results from several shards.
|
||||
class ReadFromRemote final : public ISourceStep
|
||||
{
|
||||
public:
|
||||
ReadFromRemote(
|
||||
ClusterProxy::IStreamFactory::Shards shards_,
|
||||
Block header_,
|
||||
QueryProcessingStage::Enum stage_,
|
||||
StorageID main_table_,
|
||||
ASTPtr table_func_ptr_,
|
||||
ContextPtr context_,
|
||||
ThrottlerPtr throttler_,
|
||||
Scalars scalars_,
|
||||
Tables external_tables_,
|
||||
Poco::Logger * log_);
|
||||
|
||||
String getName() const override { return "ReadFromRemote"; }
|
||||
|
||||
void initializePipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
|
||||
|
||||
private:
|
||||
ClusterProxy::IStreamFactory::Shards shards;
|
||||
QueryProcessingStage::Enum stage;
|
||||
|
||||
StorageID main_table;
|
||||
ASTPtr table_func_ptr;
|
||||
|
||||
ContextPtr context;
|
||||
|
||||
ThrottlerPtr throttler;
|
||||
Scalars scalars;
|
||||
Tables external_tables;
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
void addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard);
|
||||
void addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard);
|
||||
};
|
||||
|
||||
}
|
@ -126,6 +126,7 @@ SRCS(
|
||||
QueryPlan/QueryPlan.cpp
|
||||
QueryPlan/ReadFromMergeTree.cpp
|
||||
QueryPlan/ReadFromPreparedSource.cpp
|
||||
QueryPlan/ReadFromRemote.cpp
|
||||
QueryPlan/ReadNothingStep.cpp
|
||||
QueryPlan/RollupStep.cpp
|
||||
QueryPlan/SettingQuotaAndLimitsStep.cpp
|
||||
|
@ -602,25 +602,25 @@ void StorageDistributed::read(
|
||||
return;
|
||||
}
|
||||
|
||||
const Scalars & scalars = local_context->hasQueryContext() ? local_context->getQueryContext()->getScalars() : Scalars{};
|
||||
|
||||
bool has_virtual_shard_num_column = std::find(column_names.begin(), column_names.end(), "_shard_num") != column_names.end();
|
||||
if (has_virtual_shard_num_column && !isVirtualColumn("_shard_num", metadata_snapshot))
|
||||
has_virtual_shard_num_column = false;
|
||||
|
||||
ClusterProxy::SelectStreamFactory select_stream_factory = remote_table_function_ptr
|
||||
? ClusterProxy::SelectStreamFactory(
|
||||
header, processed_stage, remote_table_function_ptr, scalars, has_virtual_shard_num_column, local_context->getExternalTables())
|
||||
: ClusterProxy::SelectStreamFactory(
|
||||
StorageID main_table = StorageID::createEmpty();
|
||||
if (!remote_table_function_ptr)
|
||||
main_table = StorageID{remote_database, remote_table};
|
||||
|
||||
ClusterProxy::SelectStreamFactory select_stream_factory =
|
||||
ClusterProxy::SelectStreamFactory(
|
||||
header,
|
||||
processed_stage,
|
||||
StorageID{remote_database, remote_table},
|
||||
scalars,
|
||||
has_virtual_shard_num_column,
|
||||
local_context->getExternalTables());
|
||||
has_virtual_shard_num_column);
|
||||
|
||||
ClusterProxy::executeQuery(query_plan, select_stream_factory, log,
|
||||
modified_query_ast, local_context, query_info,
|
||||
ClusterProxy::executeQuery(
|
||||
query_plan, header, processed_stage,
|
||||
main_table, remote_table_function_ptr,
|
||||
select_stream_factory, log, modified_query_ast,
|
||||
local_context, query_info,
|
||||
sharding_key_expr, sharding_key_column_name,
|
||||
query_info.cluster);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user