Sending a query plan in a separate packet. Use query context to build functions for ActionsDAG.

This commit is contained in:
Nikolai Kochetov 2024-09-18 15:38:45 +00:00
parent 02fafdb48f
commit 1065d9e542
36 changed files with 220 additions and 107 deletions

View File

@ -471,7 +471,7 @@ private:
entry->disconnect();
RemoteQueryExecutor executor(
*entry, {.text = query, .stage = query_processing_stage}, {}, global_context, nullptr, Scalars(), Tables());
*entry, query, {}, global_context, nullptr, Scalars(), Tables(), query_processing_stage);
if (!query_id.empty())
executor.setQueryId(query_id);

View File

@ -205,9 +205,10 @@ std::vector<String> Client::loadWarningMessages()
std::vector<String> messages;
connection->sendQuery(connection_parameters.timeouts,
{.text = "SELECT * FROM viewIfPermitted(SELECT message FROM system.warnings ELSE null('message String'))", .stage = QueryProcessingStage::Complete},
"SELECT * FROM viewIfPermitted(SELECT message FROM system.warnings ELSE null('message String'))",
{} /* query_parameters */,
"" /* query_id */,
QueryProcessingStage::Complete,
&client_context->getSettingsRef(),
&client_context->getClientInfo(), false, {});
while (true)

View File

@ -1044,9 +1044,10 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
try {
connection->sendQuery(
connection_parameters.timeouts,
QueryToSend{.text = query, .stage = query_processing_stage},
query,
query_parameters,
client_context->getCurrentQueryId(),
query_processing_stage,
&client_context->getSettingsRef(),
&client_context->getClientInfo(),
true,
@ -1501,9 +1502,10 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
connection->sendQuery(
connection_parameters.timeouts,
QueryToSend{.text = query, .stage = query_processing_stage},
query,
query_parameters,
client_context->getCurrentQueryId(),
query_processing_stage,
&client_context->getSettingsRef(),
&client_context->getClientInfo(),
true,

View File

@ -749,9 +749,10 @@ TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & time
void Connection::sendQuery(
const ConnectionTimeouts & timeouts,
const QueryToSend & query,
const String & query,
const NameToNameMap & query_parameters,
const String & query_id_,
UInt64 stage,
const Settings * settings,
const ClientInfo * client_info,
bool with_pending_data,
@ -759,7 +760,7 @@ void Connection::sendQuery(
{
OpenTelemetry::SpanHolder span("Connection::sendQuery()", OpenTelemetry::SpanKind::CLIENT);
span.addAttribute("clickhouse.query_id", query_id_);
span.addAttribute("clickhouse.query", query.text);
span.addAttribute("clickhouse.query", query);
span.addAttribute("target", [this] () { return this->getHost() + ":" + std::to_string(this->getPort()); });
ClientInfo new_client_info;
@ -845,7 +846,7 @@ void Connection::sendQuery(
if (nonce.has_value())
data += std::to_string(nonce.value());
data += cluster_secret;
data += query.text;
data += query;
data += query_id;
data += client_info->initial_user;
/// TODO: add source/target host/ip-address
@ -861,13 +862,10 @@ void Connection::sendQuery(
writeStringBinary("", *out);
}
writeVarUInt(query.stage, *out);
writeVarUInt(stage, *out);
writeVarUInt(static_cast<bool>(compression), *out);
writeStringBinary(query.text, *out);
if (query.plan)
query.plan->serialize(*out);
writeStringBinary(query, *out);
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS)
{
@ -897,6 +895,12 @@ void Connection::sendQuery(
}
void Connection::sendQueryPlan(const QueryPlan & query_plan)
{
writeVarUInt(Protocol::Client::QueryPlan, *out);
query_plan.serialize(*out);
}
void Connection::sendCancel()
{
/// If we already disconnected.

View File

@ -101,14 +101,17 @@ public:
void sendQuery(
const ConnectionTimeouts & timeouts,
const QueryToSend & query,
const String & query,
const NameToNameMap& query_parameters,
const String & query_id_/* = "" */,
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings/* = nullptr */,
const ClientInfo * client_info/* = nullptr */,
bool with_pending_data/* = false */,
std::function<void(const Progress &)> process_progress_callback) override;
void sendQueryPlan(const QueryPlan & query_plan) override;
void sendCancel() override;
void sendData(const Block & block, const String & name/* = "" */, bool scalar/* = false */) override;

View File

@ -99,6 +99,23 @@ void HedgedConnections::sendScalarsData(Scalars & data)
pipeline_for_new_replicas.add(send_scalars_data);
}
void HedgedConnections::sendQueryPlan(const QueryPlan & query_plan)
{
std::lock_guard lock(cancel_mutex);
if (!sent_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send query plan: query not yet sent.");
auto send_query_plan = [&query_plan](ReplicaState & replica) { replica.connection->sendQueryPlan(query_plan); };
for (auto & offset_state : offset_states)
for (auto & replica : offset_state.replicas)
if (replica.connection)
send_query_plan(replica);
pipeline_for_new_replicas.add(send_query_plan);
}
void HedgedConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
std::lock_guard lock(cancel_mutex);
@ -142,8 +159,9 @@ void HedgedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
void HedgedConnections::sendQuery(
const ConnectionTimeouts & timeouts,
const QueryToSend & query,
const String & query,
const String & query_id,
UInt64 stage,
ClientInfo & client_info,
bool with_pending_data)
{
@ -172,7 +190,7 @@ void HedgedConnections::sendQuery(
hedged_connections_factory.skipReplicasWithTwoLevelAggregationIncompatibility();
}
auto send_query = [this, timeouts, query, query_id, client_info, with_pending_data](ReplicaState & replica)
auto send_query = [this, timeouts, query, query_id, stage, client_info, with_pending_data](ReplicaState & replica)
{
Settings modified_settings = settings;
@ -202,7 +220,7 @@ void HedgedConnections::sendQuery(
modified_settings.set("allow_experimental_analyzer", static_cast<bool>(modified_settings.allow_experimental_analyzer));
replica.connection->sendQuery(
timeouts, query, /* query_parameters */ {}, query_id, &modified_settings, &client_info, with_pending_data, {});
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
replica.change_replica_timeout.setRelative(timeouts.receive_data_timeout);
replica.packet_receiver->setTimeout(hedged_connections_factory.getConnectionTimeouts().receive_timeout);
};

View File

@ -86,11 +86,14 @@ public:
void sendQuery(
const ConnectionTimeouts & timeouts,
const QueryToSend & query,
const String & query,
const String & query_id,
UInt64 stage,
ClientInfo & client_info,
bool with_pending_data) override;
void sendQueryPlan(const QueryPlan & query_plan) override;
void sendReadTaskResponse(const String &) override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "sendReadTaskResponse in not supported with HedgedConnections");

View File

@ -19,11 +19,14 @@ public:
/// Send request to replicas.
virtual void sendQuery(
const ConnectionTimeouts & timeouts,
const QueryToSend & query,
const String & query,
const String & query_id,
UInt64 stage,
ClientInfo & client_info,
bool with_pending_data) = 0;
virtual void sendQueryPlan(const QueryPlan & query_plan) = 0;
virtual void sendReadTaskResponse(const String &) = 0;
virtual void sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) = 0;

View File

@ -64,13 +64,6 @@ using ExternalTablesData = std::vector<ExternalTableDataPtr>;
class QueryPlan;
struct QueryToSend
{
std::string text;
UInt64 stage;
std::shared_ptr<QueryPlan> plan{};
};
class IServerConnection : boost::noncopyable
{
public:
@ -103,14 +96,17 @@ public:
/// If last flag is true, you need to call sendExternalTablesData after.
virtual void sendQuery(
const ConnectionTimeouts & timeouts,
const QueryToSend & query,
const String & query,
const NameToNameMap & query_parameters,
const String & query_id_,
UInt64 stage,
const Settings * settings,
const ClientInfo * client_info,
bool with_pending_data,
std::function<void(const Progress &)> process_progress_callback) = 0;
virtual void sendQueryPlan(const QueryPlan & query_plan) = 0;
virtual void sendCancel() = 0;
/// Send block of data; if name is specified, server will write it to external (temporary) table of that name.

View File

@ -88,9 +88,10 @@ void LocalConnection::sendProfileEvents()
void LocalConnection::sendQuery(
const ConnectionTimeouts &,
const QueryToSend & query,
const String & query,
const NameToNameMap & query_parameters,
const String & query_id,
UInt64 stage,
const Settings *,
const ClientInfo * client_info,
bool,
@ -126,9 +127,9 @@ void LocalConnection::sendQuery(
state.emplace();
state->query_id = query_id;
state->query = query.text;
state->query = query;
state->query_scope_holder = std::make_unique<CurrentThread::QueryScope>(query_context);
state->stage = QueryProcessingStage::Enum(query.stage);
state->stage = QueryProcessingStage::Enum(stage);
state->profile_queue = std::make_shared<InternalProfileEventsQueue>(std::numeric_limits<int>::max());
CurrentThread::attachInternalProfileEventsQueue(state->profile_queue);
@ -281,6 +282,11 @@ void LocalConnection::sendQuery(
}
}
void LocalConnection::sendQueryPlan(const QueryPlan &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
void LocalConnection::sendData(const Block & block, const String &, bool)
{
if (!block)

View File

@ -107,14 +107,17 @@ public:
void sendQuery(
const ConnectionTimeouts & timeouts,
const QueryToSend & query,
const String & query,
const NameToNameMap & query_parameters,
const String & query_id/* = "" */,
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings/* = nullptr */,
const ClientInfo * client_info/* = nullptr */,
bool with_pending_data/* = false */,
std::function<void(const Progress &)> process_progress_callback) override;
void sendQueryPlan(const QueryPlan &) override;
void sendCancel() override;
void sendData(const Block & block, const String & name/* = "" */, bool scalar/* = false */) override;

View File

@ -90,6 +90,21 @@ void MultiplexedConnections::sendScalarsData(Scalars & data)
}
}
void MultiplexedConnections::sendQueryPlan(const QueryPlan & query_plan)
{
std::lock_guard lock(cancel_mutex);
if (!sent_query)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot send scalars data: query not yet sent.");
for (ReplicaState & state : replica_states)
{
Connection * connection = state.connection;
if (connection != nullptr)
connection->sendQueryPlan(query_plan);
}
}
void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
{
std::lock_guard lock(cancel_mutex);
@ -114,8 +129,9 @@ void MultiplexedConnections::sendExternalTablesData(std::vector<ExternalTablesDa
void MultiplexedConnections::sendQuery(
const ConnectionTimeouts & timeouts,
const QueryToSend & query,
const String & query,
const String & query_id,
UInt64 stage,
ClientInfo & client_info,
bool with_pending_data)
{
@ -170,19 +186,21 @@ void MultiplexedConnections::sendQuery(
modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(
timeouts, query, /* query_parameters */ {}, query_id, &modified_settings, &client_info, with_pending_data, {});
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
}
}
else
{
/// Use single replica.
replica_states[0].connection->sendQuery(
timeouts, query, /* query_parameters */ {}, query_id, &modified_settings, &client_info, with_pending_data, {});
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
}
sent_query = true;
}
void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
{
std::lock_guard lock(cancel_mutex);

View File

@ -32,11 +32,14 @@ public:
void sendQuery(
const ConnectionTimeouts & timeouts,
const QueryToSend & query,
const String & query,
const String & query_id,
UInt64 stage,
ClientInfo & client_info,
bool with_pending_data) override;
void sendQueryPlan(const QueryPlan & query_plan) override;
void sendReadTaskResponse(const String &) override;
void sendMergeTreeReadTaskResponse(const ParallelReadResponse & response) override;

View File

@ -164,7 +164,7 @@ void Suggest::load(IServerConnection & connection,
void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query, const ClientInfo & client_info)
{
connection.sendQuery(
timeouts, QueryToSend{.text = query, .stage = QueryProcessingStage::Complete}, {} /* query_parameters */, "" /* query_id */, nullptr, &client_info, false, {});
timeouts, query, {} /* query_parameters */, "" /* query_id */, QueryProcessingStage::Complete, nullptr, &client_info, false, {});
while (true)
{

View File

@ -166,7 +166,10 @@ namespace Protocol
SSHChallengeRequest = 11, /// Request SSH signature challenge
SSHChallengeResponse = 12, /// Reply to SSH signature challenge
MAX = SSHChallengeResponse,
QueryPlan = 13, /// Query plan
MAX = QueryPlan,
};
inline const char * toString(UInt64 packet)

View File

@ -176,7 +176,7 @@ QueryPipeline ClickHouseDictionarySource::createStreamForQuery(const String & qu
else
{
pipeline = QueryPipeline(std::make_shared<RemoteSource>(
std::make_shared<RemoteQueryExecutor>(pool, QueryToSend{.text = query, .stage = QueryProcessingStage::Complete}, empty_sample_block, context_copy), false, false, false));
std::make_shared<RemoteQueryExecutor>(pool, query, empty_sample_block, context_copy), false, false, false));
}
return pipeline;
@ -199,7 +199,7 @@ std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & re
/// We pass empty block to RemoteQueryExecutor, because we don't know the structure of the result.
Block invalidate_sample_block;
QueryPipeline pipeline(std::make_shared<RemoteSource>(
std::make_shared<RemoteQueryExecutor>(pool, QueryToSend{.text = request, .stage = QueryProcessingStage::Complete}, invalidate_sample_block, context_copy), false, false, false));
std::make_shared<RemoteQueryExecutor>(pool, request, invalidate_sample_block, context_copy), false, false, false));
return readInvalidateQuery(std::move(pipeline));
}
}

View File

@ -3382,7 +3382,7 @@ void ActionsDAG::serialize(WriteBuffer & out, SerializedSetsRegistry & registry)
writeVarUInt(node_to_id.at(output), out);
}
ActionsDAG ActionsDAG::deserialize(ReadBuffer & in, DeserializedSetsRegistry & registry)
ActionsDAG ActionsDAG::deserialize(ReadBuffer & in, DeserializedSetsRegistry & registry, const ContextPtr & context)
{
size_t nodes_size;
readVarUInt(nodes_size, in);
@ -3480,7 +3480,7 @@ ActionsDAG ActionsDAG::deserialize(ReadBuffer & in, DeserializedSetsRegistry & r
{
ExecutableFunctionCapture::Capture capture;
deserializeCapture(capture, in);
auto capture_dag = ActionsDAG::deserialize(in, registry);
auto capture_dag = ActionsDAG::deserialize(in, registry, context);
node.function_base = std::make_shared<FunctionCapture>(
std::make_shared<ActionsDAG>(std::move(capture_dag)),
@ -3493,7 +3493,7 @@ ActionsDAG ActionsDAG::deserialize(ReadBuffer & in, DeserializedSetsRegistry & r
}
else
{
auto function = FunctionFactory::instance().get(function_name, Context::getGlobalContextInstance());
auto function = FunctionFactory::instance().get(function_name, context);
node.function_base = function->build(arguments);
node.function = node.function_base->prepare(arguments);

View File

@ -134,7 +134,7 @@ public:
std::string dumpDAG() const;
void serialize(WriteBuffer & out, SerializedSetsRegistry & registry) const;
static ActionsDAG deserialize(ReadBuffer & in, DeserializedSetsRegistry & registry);
static ActionsDAG deserialize(ReadBuffer & in, DeserializedSetsRegistry & registry, const ContextPtr & context);
const Node & addInput(std::string name, DataTypePtr type);
const Node & addInput(ColumnWithTypeAndName column);

View File

@ -53,7 +53,7 @@ public:
ASTPtr query;
QueryTreeNodePtr query_tree;
std::shared_ptr<QueryPlan> query_plan;
std::shared_ptr<const QueryPlan> query_plan;
/// Used to check the table existence on remote node
StorageID main_table;

View File

@ -112,6 +112,7 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
extern const int SUPPORT_IS_DISABLED;
extern const int INCORRECT_QUERY;
extern const int INCORRECT_DATA;
}
namespace FailPoints
@ -1533,14 +1534,14 @@ static std::pair<ASTPtr, BlockIO> executeQueryImpl(
/// Load external tables if they were provided
context->initializeExternalTablesIfSet();
if (!query_plan)
throw Exception(ErrorCodes::INCORRECT_DATA, "Expected query plan packet for QueryPlan stage");
/// reset Input callbacks if query is not INSERT SELECT
context->resetInputCallbacks();
StreamLocalLimits limits;
std::shared_ptr<const EnabledQuota> quota;
//std::unique_ptr<IInterpreter> interpreter;
auto logger = getLogger("executeQuery");
@ -1732,7 +1733,7 @@ std::pair<ASTPtr, BlockIO> executeQuery(
ASTPtr ast;
BlockIO res;
if (query_plan)
if (stage == QueryProcessingStage::QueryPlan)
std::tie(ast, res) = executeQueryImpl(context, flags, query, query_plan);
else
std::tie(ast, res) = executeQueryImpl(query.data(), query.data() + query.size(), context, flags, stage, nullptr);

View File

@ -104,7 +104,7 @@ void ExpressionStep::serialize(Serialization & ctx) const
std::unique_ptr<IQueryPlanStep> ExpressionStep::deserialize(Deserialization & ctx)
{
ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry);
ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context);
if (ctx.input_streams.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "ExpressionStep must have one input stream");

View File

@ -154,7 +154,7 @@ std::unique_ptr<IQueryPlanStep> FilterStep::deserialize(Deserialization & ctx)
String filter_column_name;
readStringBinary(filter_column_name, ctx.in);
ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry);
ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context);
return std::make_unique<FilterStep>(ctx.input_streams.front(), std::move(actions_dag), std::move(filter_column_name), remove_filter_column);
}

View File

@ -61,7 +61,7 @@ public:
const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted())
void serialize(WriteBuffer & out) const;
static QueryPlanAndSets deserialize(ReadBuffer & in);
static QueryPlanAndSets deserialize(ReadBuffer & in, const ContextPtr & context);
static void resolveReadFromTable(QueryPlan & plan, const ContextPtr & context);
static QueryPlan resolveStorages(QueryPlanAndSets plan_and_sets, const ContextPtr & context);

View File

@ -208,15 +208,13 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::SelectStream
for (auto & try_result : try_results)
connections.emplace_back(std::move(try_result.entry));
QueryToSend query_to_send;
query_to_send.text = formattedAST(query);
query_to_send.plan = my_shard.query_plan;
query_to_send.stage = query_to_send.plan ? QueryProcessingStage::QueryPlan : my_stage;
String query_string = formattedAST(query);
auto stage_to_use = my_shard.query_plan ? QueryProcessingStage::QueryPlan : my_stage;
my_scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::move(connections), query_to_send, header, my_context, my_throttler, my_scalars, my_external_tables);
std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, stage_to_use);
auto pipe = createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending);
QueryPipelineBuilder builder;
@ -280,25 +278,26 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
select_query.setExpression(ASTSelectQuery::Expression::WHERE, std::move(shard_filter));
}
const String query_string = formattedAST(query);
if (!priority_func_factory.has_value())
priority_func_factory = GetPriorityForLoadBalancing(LoadBalancing::ROUND_ROBIN, randomSeed());
GetPriorityForLoadBalancing::Func priority_func
= priority_func_factory->getPriorityFunc(LoadBalancing::ROUND_ROBIN, 0, shard.shard_info.pool->getPoolSize());
QueryToSend query_to_send;
query_to_send.text = formattedAST(query);
query_to_send.plan = shard.query_plan;
query_to_send.stage = query_to_send.plan ? QueryProcessingStage::QueryPlan : stage;
auto stage_to_use = shard.query_plan ? QueryProcessingStage::QueryPlan : stage;
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool,
query_to_send,
query_string,
shard.header,
context,
throttler,
scalars,
external_tables,
stage_to_use,
shard.query_plan,
std::nullopt,
priority_func);
remote_query_executor->setLogger(log);
@ -315,13 +314,11 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
else
{
QueryToSend query_to_send;
query_to_send.text = formattedAST(shard.query);
query_to_send.plan = shard.query_plan;
query_to_send.stage = query_to_send.plan ? QueryProcessingStage::QueryPlan : stage;
const String query_string = formattedAST(shard.query);
auto stage_to_use = shard.query_plan ? QueryProcessingStage::QueryPlan : stage;
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.shard_info.pool, query_to_send, shard.header, context, throttler, scalars, external_tables);
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage_to_use, shard.query_plan);
remote_query_executor->setLogger(log);
if (context->canUseTaskBasedParallelReplicas())
@ -483,12 +480,13 @@ void ReadFromParallelRemoteReplicasStep::addPipeForSingeReplica(
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
pool,
QueryToSend{.text = query_string, .stage = stage},
query_string,
output_stream->header,
context,
throttler,
scalars,
external_tables,
stage,
RemoteQueryExecutor::Extension{.parallel_reading_coordinator = coordinator, .replica_info = std::move(replica_info)});
remote_query_executor->setLogger(log);

View File

@ -148,7 +148,7 @@ static void serializeSets(SerializedSetsRegistry & registry, WriteBuffer & out)
}
}
QueryPlanAndSets deserializeSets(QueryPlan plan, DeserializedSetsRegistry & registry, ReadBuffer & in)
QueryPlanAndSets deserializeSets(QueryPlan plan, DeserializedSetsRegistry & registry, ReadBuffer & in, const ContextPtr & context)
{
UInt64 num_sets;
readVarUInt(num_sets, in);
@ -201,7 +201,7 @@ QueryPlanAndSets deserializeSets(QueryPlan plan, DeserializedSetsRegistry & regi
}
else if (kind == UInt8(SetSerializationKind::SubqueryPlan))
{
auto plan_for_set = QueryPlan::deserialize(in);
auto plan_for_set = QueryPlan::deserialize(in, context);
res.sets_from_subquery.emplace_back(QueryPlanAndSets::SetFromSubquery{
{hash, std::move(columns)},
@ -278,7 +278,7 @@ void QueryPlan::serialize(WriteBuffer & out) const
serializeSets(registry, out);
}
QueryPlanAndSets QueryPlan::deserialize(ReadBuffer & in)
QueryPlanAndSets QueryPlan::deserialize(ReadBuffer & in, const ContextPtr & context)
{
QueryPlanStepRegistry & step_registry = QueryPlanStepRegistry::instance();
@ -330,7 +330,7 @@ QueryPlanAndSets QueryPlan::deserialize(ReadBuffer & in)
for (const auto & child : frame.children)
input_streams.push_back(child->step->getOutputStream());
IQueryPlanStep::Deserialization ctx{in, sets_registry, input_streams, &output_stream, settings};
IQueryPlanStep::Deserialization ctx{in, sets_registry, context, input_streams, &output_stream, settings};
auto step = step_registry.createStep(step_name, ctx);
if (step->hasOutputStream())
@ -349,7 +349,7 @@ QueryPlanAndSets QueryPlan::deserialize(ReadBuffer & in)
stack.pop();
}
return deserializeSets(std::move(plan), sets_registry, in);
return deserializeSets(std::move(plan), sets_registry, in, context);
}
static std::shared_ptr<TableNode> resolveTable(const Identifier & identifier, const ContextPtr & context)

View File

@ -1,5 +1,6 @@
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
@ -19,6 +20,7 @@ struct IQueryPlanStep::Deserialization
{
ReadBuffer & in;
DeserializedSetsRegistry & registry;
const ContextPtr & context;
const DataStreams & input_streams;
const DataStream * output_stream;

View File

@ -202,7 +202,7 @@ std::unique_ptr<IQueryPlanStep> TotalsHavingStep::deserialize(Deserialization &
{
readStringBinary(filter_column_name, ctx.in);
actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry);
actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context);
}
return std::make_unique<TotalsHavingStep>(

View File

@ -52,7 +52,7 @@ RemoteInserter::RemoteInserter(
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
connection.sendQuery(
timeouts, {.text = query, .stage = QueryProcessingStage::Complete}, /* query_parameters */ {}, "", &settings, &modified_client_info, false, {});
timeouts, query, /* query_parameters */ {}, "", QueryProcessingStage::Complete, &settings, &modified_client_info, false, {});
while (true)
{

View File

@ -46,18 +46,22 @@ namespace ErrorCodes
}
RemoteQueryExecutor::RemoteQueryExecutor(
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::shared_ptr<const QueryPlan> query_plan_,
std::optional<Extension> extension_,
GetPriorityForLoadBalancing::Func priority_func_)
: header(header_)
, query(query_)
, query_plan(std::move(query_plan_))
, context(context_)
, scalars(scalars_)
, external_tables(external_tables_)
, stage(stage_)
, extension(extension_)
, priority_func(priority_func_)
{
@ -65,14 +69,15 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
ConnectionPoolPtr pool,
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, nullptr, extension_)
{
create_connections = [this, pool, throttler, extension_](AsyncCallback)
{
@ -121,14 +126,15 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
Connection & connection,
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, nullptr, extension_)
{
create_connections = [this, &connection, throttler, extension_](AsyncCallback)
{
@ -141,14 +147,15 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
std::shared_ptr<Connection> connection_ptr,
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, nullptr, extension_)
{
create_connections = [this, connection_ptr, throttler, extension_](AsyncCallback)
{
@ -161,14 +168,16 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
std::vector<IConnectionPool::Entry> && connections_,
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::shared_ptr<const QueryPlan> query_plan_,
std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, std::move(query_plan_), extension_)
{
create_connections = [this, connections_, throttler, extension_](AsyncCallback) mutable
{
@ -181,15 +190,17 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::shared_ptr<const QueryPlan> query_plan_,
std::optional<Extension> extension_,
GetPriorityForLoadBalancing::Func priority_func_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, extension_, priority_func_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, std::move(query_plan_), extension_, priority_func_)
{
create_connections = [this, pool, throttler](AsyncCallback async_callback)->std::unique_ptr<IConnections>
{
@ -383,13 +394,17 @@ void RemoteQueryExecutor::sendQueryUnlocked(ClientInfo::QueryKind query_kind, As
if (!duplicated_part_uuids.empty())
connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
connections->sendQuery(timeouts, query, query_id, modified_client_info, true);
connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
established = false;
sent_query = true;
if (settings.enable_scalar_subquery_optimization)
sendScalars();
if (query_plan)
connections->sendQueryPlan(*query_plan);
sendExternalTables();
}

View File

@ -54,56 +54,64 @@ public:
/// Takes a connection pool for a node (not cluster)
RemoteQueryExecutor(
ConnectionPoolPtr pool,
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler = nullptr,
const Scalars & scalars_ = Scalars(),
const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
std::optional<Extension> extension_ = std::nullopt);
/// Takes already set connection.
RemoteQueryExecutor(
Connection & connection,
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler_ = nullptr,
const Scalars & scalars_ = Scalars(),
const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
std::optional<Extension> extension_ = std::nullopt);
/// Takes already set connection.
RemoteQueryExecutor(
std::shared_ptr<Connection> connection,
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler_ = nullptr,
const Scalars & scalars_ = Scalars(),
const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
std::optional<Extension> extension_ = std::nullopt);
/// Accepts several connections already taken from pool.
RemoteQueryExecutor(
std::vector<IConnectionPool::Entry> && connections_,
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler = nullptr,
const Scalars & scalars_ = Scalars(),
const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
std::shared_ptr<const QueryPlan> query_plan_ = nullptr,
std::optional<Extension> extension_ = std::nullopt);
/// Takes a pool and gets one or several connections from it.
RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler = nullptr,
const Scalars & scalars_ = Scalars(),
const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete,
std::shared_ptr<const QueryPlan> query_plan_ = nullptr,
std::optional<Extension> extension_ = std::nullopt,
GetPriorityForLoadBalancing::Func priority_func = {});
@ -221,11 +229,13 @@ public:
private:
RemoteQueryExecutor(
const QueryToSend & query_,
const String & query_,
const Block & header_,
ContextPtr context_,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_,
std::shared_ptr<const QueryPlan> query_plan_,
std::optional<Extension> extension_,
GetPriorityForLoadBalancing::Func priority_func = {});
@ -237,7 +247,8 @@ private:
std::unique_ptr<IConnections> connections;
std::unique_ptr<ReadContext> read_context;
const QueryToSend query;
const String query;
std::shared_ptr<const QueryPlan> query_plan;
String query_id;
ContextPtr context;
@ -248,6 +259,7 @@ private:
Scalars scalars;
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;
std::optional<Extension> extension;
/// Initiator identifier for distributed task processing

View File

@ -1743,13 +1743,15 @@ bool TCPHandler::receivePacket()
receiveQuery();
return true;
case Protocol::Client::Data:
case Protocol::Client::Scalar:
if (state.skipping_data)
return receiveUnexpectedData(false);
if (state.empty())
receiveUnexpectedData(true);
return receiveData(packet_type == Protocol::Client::Scalar);
return receiveData(/*scalar=*/ true);
case Protocol::Client::QueryPlan:
receiveQueryPlan();
return true;
case Protocol::Client::Data:
return receiveData(/*scalar=*/ false);
case Protocol::Client::Ping:
writeVarUInt(Protocol::Server::Pong, *out);
@ -1900,9 +1902,6 @@ void TCPHandler::receiveQuery()
readStringBinary(state.query, *in);
if (state.stage == QueryProcessingStage::QueryPlan)
state.plan_and_sets = std::make_shared<QueryPlanAndSets>(QueryPlan::deserialize(*in));
Settings passed_params;
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS)
passed_params.read(*in, settings_format);
@ -2078,8 +2077,26 @@ void TCPHandler::receiveUnexpectedQuery()
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet Query received from client");
}
void TCPHandler::receiveQueryPlan()
{
bool unexpected_packet = state.empty() || state.stage != QueryProcessingStage::QueryPlan || state.plan_and_sets || !query_context || state.read_all_data;
auto context = unexpected_packet ? Context::getGlobalContextInstance() : query_context;
auto plan_and_sets = QueryPlan::deserialize(*in, context);
if (unexpected_packet)
throw NetException(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected packet QueryPlan received from client");
state.plan_and_sets = std::make_shared<QueryPlanAndSets>(std::move(plan_and_sets));
}
bool TCPHandler::receiveData(bool scalar)
{
if (state.skipping_data)
return receiveUnexpectedData(false);
if (state.empty())
return receiveUnexpectedData(true);
initBlockInput();
/// The name of the temporary table for writing data, default to empty string

View File

@ -266,6 +266,7 @@ private:
void receiveAddendum();
bool receivePacket();
void receiveQuery();
void receiveQueryPlan();
void receiveIgnoredPartUUIDs();
String receiveReadTaskResponseAssumeLocked();
std::optional<ParallelReadResponse> receivePartitionMergeTreeReadTaskResponseAssumeLocked();

View File

@ -183,12 +183,14 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::vector<IConnectionPool::Entry>{try_result},
QueryToSend{.text = queryToString(query_to_send), .stage = processed_stage},
queryToString(query_to_send),
getOutputStream().header,
new_context,
/*throttler=*/nullptr,
scalars,
Tables(),
processed_stage,
nullptr,
extension);
remote_query_executor->setLogger(log);

View File

@ -1015,7 +1015,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteBetweenDistribu
/// INSERT SELECT query returns empty block
auto remote_query_executor
= std::make_shared<RemoteQueryExecutor>(std::move(connections), QueryToSend{.text = std::move(new_query_str), .stage = QueryProcessingStage::Complete}, Block{}, query_context);
= std::make_shared<RemoteQueryExecutor>(std::move(connections), new_query_str, Block{}, query_context);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote, settings.async_query_sending_for_remote));
remote_pipeline.complete(std::make_shared<EmptySink>(remote_query_executor->getHeader()));
@ -1125,12 +1125,14 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
{
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
std::vector<IConnectionPool::Entry>{try_result},
QueryToSend{.text = std::move(new_query_str), .stage = QueryProcessingStage::Complete},
new_query_str,
Block{},
query_context,
/*throttler=*/nullptr,
Scalars{},
Tables{},
QueryProcessingStage::Complete,
nullptr,
extension);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote, settings.async_query_sending_for_remote));

View File

@ -5715,12 +5715,13 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
connection,
QueryToSend{.text = std::move(query_str), .stage = QueryProcessingStage::Complete},
query_str,
Block{},
query_context,
/*throttler=*/nullptr,
Scalars{},
Tables{},
QueryProcessingStage::Complete,
extension);
QueryPipeline remote_pipeline(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote, settings.async_query_sending_for_remote));

View File

@ -81,7 +81,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
};
/// Execute remote query without restrictions (because it's not real user query, but part of implementation)
RemoteQueryExecutor executor(shard_info.pool, {.text = std::move(query), .stage = QueryProcessingStage::Complete}, sample_block, new_context);
RemoteQueryExecutor executor(shard_info.pool, query, sample_block, new_context);
executor.setPoolMode(PoolMode::GET_ONE);
if (!table_func_ptr)
executor.setMainTable(table_id);
@ -193,8 +193,7 @@ ColumnsDescriptionByShardNum getExtendedObjectsOfRemoteTables(
auto execute_query_on_shard = [&](const auto & shard_info)
{
/// Execute remote query without restrictions (because it's not real user query, but part of implementation)
RemoteQueryExecutor executor(shard_info.pool, {.text = std::move(query), .stage = QueryProcessingStage::Complete}, sample_block, new_context);
RemoteQueryExecutor executor(shard_info.pool, query, sample_block, new_context);
executor.setPoolMode(PoolMode::GET_ONE);
executor.setMainTable(remote_table_id);