Initial steps for QueryPlan serialization.

This commit is contained in:
Nikolai Kochetov 2024-08-28 17:19:32 +00:00
parent 1cdccd527f
commit d51f1eaf9f
17 changed files with 696 additions and 0 deletions

View File

@ -99,6 +99,9 @@ public:
void write(WriteBuffer & out, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT) const;
void read(ReadBuffer & in, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT);
void writeChangedBinary(WriteBuffer & out) const;
void readBinary(ReadBuffer & in);
// A debugging aid.
std::string toString() const;
@ -499,6 +502,46 @@ void BaseSettings<TTraits>::write(WriteBuffer & out, SettingsWriteFormat format)
BaseSettingsHelpers::writeString(std::string_view{}, out);
}
template <typename TTraits>
void BaseSettings<TTraits>::writeChangedBinary(WriteBuffer & out) const
{
const auto & accessor = Traits::Accessor::instance();
size_t num_settings = 0;
for (auto it = this->begin(); it != this->end(); ++it)
++num_settings;
writeVarUInt(num_settings, out);
for (const auto & field : *this)
{
BaseSettingsHelpers::writeString(field.getName(), out);
using Flags = BaseSettingsHelpers::Flags;
Flags flags{0};
BaseSettingsHelpers::writeFlags(flags, out);
accessor.writeBinary(*this, field.index, out);
}
}
template <typename TTraits>
void BaseSettings<TTraits>::readBinary(ReadBuffer & in)
{
const auto & accessor = Traits::Accessor::instance();
size_t num_settings = 0;
readVarUInt(num_settings, in);
for (size_t i = 0; i < num_settings; ++i)
{
String read_name = BaseSettingsHelpers::readString(in);
std::string_view name = TTraits::resolveName(read_name);
size_t index = accessor.find(name);
std::ignore = BaseSettingsHelpers::readFlags(in);
accessor.readBinary(*this, index, in);
}
}
template <typename TTraits>
void BaseSettings<TTraits>::read(ReadBuffer & in, SettingsWriteFormat format)
{

View File

@ -3,6 +3,7 @@
#include <Analyzer/FunctionNode.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnConst.h>
#include <Functions/IFunction.h>
#include <Functions/IFunctionAdaptors.h>
@ -3217,4 +3218,240 @@ const ActionsDAG::Node * FindAliasForInputName::find(const String & name)
return it->second;
}
void ActionsDAG::serialize(WriteBuffer & out) const
{
size_t nodes_size = nodes.size();
writeVarUInt(nodes_size, out);
std::unordered_map<const Node *, size_t> node_to_id;
for (const auto & node : nodes)
node_to_id.emplace(&node, node_to_id.size());
if (nodes_size != node_to_id.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Duplicate nodes in ActionsDAG");
for (const auto & node : nodes)
{
writeIntBinary(static_cast<UInt8>(node.type), out);
writeStringBinary(node.result_name, out);
writeStringBinary(node.result_type->getName(), out);
writeVarUInt(node.children.size(), out);
for (const auto * child : node.children)
writeVarUInt(node_to_id.at(child), out);
/// Serialize column if it is present
const bool has_column = node.column != nullptr;
UInt8 column_flags = 0;
if (has_column)
{
column_flags |= 1;
if (node.is_deterministic_constant)
column_flags |= 2;
}
writeIntBinary(column_flags, out);
if (has_column)
{
const auto * const_column = typeid_cast<const ColumnConst *>(node.column.get());
if (!const_column)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot serialize non-constant column {}", node.column->getName());
auto value = const_column->getField();
node.result_type->getDefaultSerialization()->serializeBinary(value, out, FormatSettings{});
}
if (node.type == ActionType::INPUT)
{
/// nothing to serialize
}
else if (node.type == ActionType::COLUMN)
{
/// nothing to serialize, column is already serialized
}
else if (node.type == ActionType::ALIAS)
{
/// nothing to serialize
}
else if (node.type == ActionType::FUNCTION)
{
writeStringBinary(node.function_base->getName(), out);
}
else if (node.type == ActionType::ARRAY_JOIN)
{
/// nothing to serialize
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown node type {}", static_cast<size_t>(node.type));
}
}
writeVarUInt(inputs.size(), out);
for (const auto * input : inputs)
writeVarUInt(node_to_id.at(input), out);
writeVarUInt(outputs.size(), out);
for (const auto * output : outputs)
writeVarUInt(node_to_id.at(output), out);
}
ActionsDAG ActionsDAG::deserialize(ReadBuffer & in)
{
size_t nodes_size;
readVarUInt(nodes_size, in);
std::list<Node> nodes;
std::unordered_map<size_t, Node *> id_to_node;
for (size_t i = 0; i < nodes_size; ++i)
id_to_node.emplace(i, &nodes.emplace_back(Node{}));
for (size_t i = 0; i < nodes_size; ++i)
{
Node & node = *id_to_node.at(i);
UInt8 action_type{0};
readIntBinary(action_type, in);
if (action_type > static_cast<UInt8>(ActionType::FUNCTION))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown action type {}", size_t(action_type));
node.type = static_cast<ActionType>(action_type);
readStringBinary(node.result_name, in);
String result_type_name;
readStringBinary(result_type_name, in);
node.result_type = DataTypeFactory::instance().get(result_type_name);
size_t children_size;
readVarUInt(children_size, in);
for (size_t j = 0; j < children_size; ++j)
{
size_t child_id;
readVarUInt(child_id, in);
node.children.push_back(id_to_node.at(child_id));
}
UInt8 column_flags = 0;
readIntBinary(column_flags, in);
/// Deserialize column if it is present
if (column_flags & 1)
{
if ((column_flags & 2) == 0)
node.is_deterministic_constant = false;
Field value;
node.result_type->getDefaultSerialization()->deserializeBinary(value, in, FormatSettings{});
node.column = node.result_type->createColumnConst(0, value);
}
if (node.type == ActionType::INPUT)
{
/// nothing to deserialize
if (!node.children.empty())
throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized input can't have children");
}
else if (node.type == ActionType::COLUMN)
{
/// Column is already deserialized
if (!node.children.empty())
throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized column can't have children");
}
else if (node.type == ActionType::ALIAS)
{
/// nothing to deserialize
if (node.children.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized alias must have one children");
}
else if (node.type == ActionType::FUNCTION)
{
String function_name;
readStringBinary(function_name, in);
auto function = FunctionFactory::instance().get(function_name, Context::getGlobalContextInstance());
ColumnsWithTypeAndName arguments;
arguments.reserve(node.children.size());
for (const auto * child : node.children)
{
ColumnWithTypeAndName argument;
argument.column = child->column;
argument.type = child->result_type;
argument.name = child->result_name;
arguments.emplace_back(std::move(argument));
}
node.function_base = function->build(arguments);
node.function = node.function_base->prepare(arguments);
node.is_function_compiled = false;
if (!node.function_base->getResultType()->equals(*node.result_type))
throw Exception(ErrorCodes::INCORRECT_DATA,
"Deserialized function {} has type. Expected {}, deserialzied {}.",
function_name,
node.function_base->getResultType()->getName(),
node.result_type->getName());
}
else if (node.type == ActionType::ARRAY_JOIN)
{
/// nothing to deserialize
if (node.children.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized array join must have one children");
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown node type {}", static_cast<size_t>(node.type));
}
}
size_t inputs_size;
readVarUInt(inputs_size, in);
std::vector<const Node *> inputs;
std::unordered_set<const Node *> inputs_set;
inputs.reserve(inputs_size);
for (size_t i = 0; i < inputs_size; ++i)
{
size_t input_id;
readVarUInt(input_id, in);
const auto * input = id_to_node.at(input_id);
if (input->type != ActionType::INPUT)
throw Exception(ErrorCodes::INCORRECT_DATA,
"Deserialized input {} has type {}",
input->result_name, magic_enum::enum_name(input->type));
if (!inputs_set.emplace(input).second)
throw Exception(ErrorCodes::INCORRECT_DATA,
"Duplicate input {}", input->result_name);
inputs.push_back(input);
}
size_t outputs_size;
readVarUInt(outputs_size, in);
std::vector<const Node *> outputs;
outputs.reserve(outputs_size);
for (size_t i = 0; i < outputs_size; ++i)
{
size_t output_id;
readVarUInt(output_id, in);
outputs.push_back(id_to_node.at(output_id));
}
for (const auto & node : nodes)
if (node.type == ActionType::INPUT && !inputs_set.contains(&node))
throw Exception(ErrorCodes::INCORRECT_DATA,
"Deserialized input {} is not in the inputs list",
node.result_name);
ActionsDAG dag;
dag.nodes = std::move(nodes);
dag.inputs = std::move(inputs);
dag.outputs = std::move(outputs);
return dag;
}
}

View File

@ -130,6 +130,9 @@ public:
std::string dumpNames() const;
std::string dumpDAG() const;
void serialize(WriteBuffer & out) const;
static ActionsDAG deserialize(ReadBuffer & in);
const Node & addInput(std::string name, DataTypePtr type);
const Node & addInput(ColumnWithTypeAndName column);
const Node & addColumn(ColumnWithTypeAndName column);

View File

@ -1,4 +1,6 @@
#include <Processors/QueryPlan/DistinctStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
#include <Processors/Transforms/DistinctSortedChunkTransform.h>
#include <Processors/Transforms/DistinctSortedTransform.h>
#include <Processors/Transforms/DistinctTransform.h>
@ -62,6 +64,16 @@ DistinctStep::DistinctStep(
{
}
void DistinctStep::updateLimitHint(UInt64 hint)
{
if (hint && limit_hint)
/// Both limits are set - take the min
limit_hint = std::min(hint, limit_hint);
else
/// Some limit is not set - take the other one
limit_hint = std::max(hint, limit_hint);
}
void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (!pre_distinct)
@ -181,4 +193,61 @@ void DistinctStep::updateOutputStream()
getTraits(pre_distinct).data_stream_traits);
}
void DistinctStep::serializeSettings(QueryPlanSerializationSettings & settings) const
{
settings.max_rows_in_distinct = set_size_limits.max_rows;
settings.max_bytes_in_distinct = set_size_limits.max_bytes;
settings.distinct_overflow_mode = set_size_limits.overflow_mode;
}
void DistinctStep::serialize(WriteBuffer & out) const
{
/// Let's not serialzie limit_hint.
/// Ideally, we can get if from a query plan optimization on the follower.
writeVarUInt(columns.size(), out);
for (const auto & column : columns)
writeStringBinary(column, out);
}
std::unique_ptr<IQueryPlanStep> DistinctStep::deserialize(
ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings & settings, bool pre_distinct_)
{
if (input_streams_.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "DistinctStep must have one input stream");
size_t columns_size;
readVarUInt(columns_size, in);
Names column_names(columns_size);
for (size_t i = 0; i < columns_size; ++i)
readStringBinary(column_names[i], in);
SizeLimits size_limits;
size_limits.max_rows = settings.max_rows_in_distinct;
size_limits.max_bytes = settings.max_bytes_in_distinct;
size_limits.overflow_mode = settings.distinct_overflow_mode;
return std::make_unique<DistinctStep>(
input_streams_.front(), size_limits, 0, column_names, pre_distinct_, false);
}
std::unique_ptr<IQueryPlanStep> DistinctStep::deserializeNormal(
ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings & settings)
{
return DistinctStep::deserialize(in, input_streams_, settings, false);
}
std::unique_ptr<IQueryPlanStep> DistinctStep::deserializePre(
ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings & settings)
{
return DistinctStep::deserialize(in, input_streams_, settings, true);
}
void registerDistinctStep(QueryPlanStepRegistry & registry)
{
/// Preliminary distinct probably can be a query plan optimization.
/// It's easier to serialzie it using different names, so that pre-distinct can be potentially removed later.
registry.registerStep("Distinct", DistinctStep::deserializeNormal);
registry.registerStep("PreDistinct", DistinctStep::deserializePre);
}
}

View File

@ -20,6 +20,8 @@ public:
String getName() const override { return "Distinct"; }
const Names & getColumnNames() const { return columns; }
String getSerializationName() const override { return pre_distinct ? "PreDistinct" : "Distinct"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
@ -28,6 +30,14 @@ public:
bool isPreliminary() const { return pre_distinct; }
UInt64 getLimitHint() const { return limit_hint; }
void updateLimitHint(UInt64 hint);
void serializeSettings(QueryPlanSerializationSettings & settings) const override;
void serialize(WriteBuffer & out) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &, bool pre_distinct_);
static std::unique_ptr<IQueryPlanStep> deserializeNormal(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &);
static std::unique_ptr<IQueryPlanStep> deserializePre(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &);
private:
void updateOutputStream() override;

View File

@ -1,4 +1,5 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Transforms/JoiningTransform.h>
@ -90,4 +91,23 @@ void ExpressionStep::updateOutputStream()
}
}
void ExpressionStep::serialize(WriteBuffer & out) const
{
actions_dag.serialize(out);
}
std::unique_ptr<IQueryPlanStep> ExpressionStep::deserialize(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &)
{
ActionsDAG actions_dag = ActionsDAG::deserialize(in);
if (input_streams_.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "ExpressionStep must have one input stream");
return std::make_unique<ExpressionStep>(input_streams_.front(), std::move(actions_dag));
}
void registerExpressionStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("Expression", ExpressionStep::deserialize);
}
}

View File

@ -25,6 +25,10 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void serialize(WriteBuffer & out) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &);
private:
void updateOutputStream() override;

View File

@ -63,6 +63,8 @@ using DataStreams = std::vector<DataStream>;
class QueryPlan;
using QueryPlanRawPtrs = std::list<QueryPlan *>;
struct QueryPlanSerializationSettings;
/// Single step of query plan.
class IQueryPlanStep
{
@ -70,6 +72,7 @@ public:
virtual ~IQueryPlanStep() = default;
virtual String getName() const = 0;
virtual String getSerializationName() const { return getName(); }
/// Add processors from current step to QueryPipeline.
/// Calling this method, we assume and don't check that:
@ -88,6 +91,9 @@ public:
const std::string & getStepDescription() const { return step_description; }
void setStepDescription(std::string description) { step_description = std::move(description); }
virtual void serializeSettings(QueryPlanSerializationSettings & /*settings*/) const {}
virtual void serialize(WriteBuffer & /*buf*/) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for {}", getName()); }
struct FormatSettings
{
WriteBuffer & out;

View File

@ -4,6 +4,7 @@
#include <Processors/QueryPlan/TotalsHavingStep.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/WindowStep.h>
#include <Processors/QueryPlan/DistinctStep.h>
#include <Common/typeid_cast.h>
namespace DB::QueryPlanOptimizations
@ -63,6 +64,12 @@ size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
if (tryUpdateLimitForSortingSteps(child_node, limit->getLimitForSorting()))
return 0;
if (auto * distinct = typeid_cast<DistinctStep *>(child.get()))
{
distinct->updateLimitHint(limit->getLimitForSorting());
return 0;
}
/// Special case for TotalsHaving. Totals may be incorrect if we push down limit.
if (typeid_cast<const TotalsHavingStep *>(child.get()))
return 0;

View File

@ -1,6 +1,7 @@
#include <stack>
#include <Common/JSONBuilder.h>
#include <DataTypes/DataTypesBinaryEncoding.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
@ -16,6 +17,8 @@
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Processors/QueryPlan/QueryPlanVisitor.h>
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -565,4 +568,148 @@ std::pair<QueryPlan::Nodes, QueryPlanResourceHolder> QueryPlan::detachNodesAndRe
return {std::move(plan.nodes), std::move(plan.resources)};
}
static void serializeHeader(const Block & header, WriteBuffer & out)
{
/// Write only names and types.
/// Constants should be filled by step.
writeVarUInt(header.columns(), out);
for (const auto & column : header)
{
writeStringBinary(column.name, out);
encodeDataType(column.type, out);
}
}
static Block deserializeHeader(ReadBuffer & in)
{
UInt64 num_columns;
readVarUInt(num_columns, in);
ColumnsWithTypeAndName columns(num_columns);
for (auto & column : columns)
{
readStringBinary(column.name, in);
column.type = decodeDataType(in);
}
return Block(std::move(columns));
}
void QueryPlan::serialize(WriteBuffer & out) const
{
checkInitialized();
struct Frame
{
Node * node = {};
size_t next_child = 0;
};
std::stack<Frame> stack;
stack.push(Frame{.node = root});
while (!stack.empty())
{
auto & frame = stack.top();
auto * node = frame.node;
if (frame.next_child == 0)
{
writeVarUInt(node->children.size(), out);
}
if (frame.next_child < node->children.size())
{
stack.push(Frame{.node = node->children[frame.next_child]});
++frame.next_child;
continue;
}
stack.pop();
writeStringBinary(node->step->getSerializationName(), out);
writeStringBinary(node->step->getStepDescription(), out);
QueryPlanSerializationSettings settings;
node->step->serializeSettings(settings);
settings.writeChangedBinary(out);
node->step->serialize(out);
if (node->step->hasOutputStream())
serializeHeader(node->step->getOutputStream().header, out);
else
serializeHeader({}, out);
}
}
QueryPlan QueryPlan::deserialize(ReadBuffer & in)
{
QueryPlanStepRegistry & step_registry = QueryPlanStepRegistry::instance();
using NodePtr = Node *;
struct Frame
{
NodePtr & to_fill;
size_t next_child = 0;
std::vector<Node *> children = {};
};
std::stack<Frame> stack;
QueryPlan plan;
stack.push(Frame{.to_fill = plan.root});
while (!stack.empty())
{
auto & frame = stack.top();
if (frame.next_child == 0)
{
UInt64 num_children;
readVarUInt(num_children, in);
frame.children.resize(num_children);
}
if (frame.next_child < frame.children.size())
{
stack.push(Frame{.to_fill = frame.children[frame.next_child]});
++frame.next_child;
continue;
}
std::string step_name;
std::string step_description;
readStringBinary(step_name, in);
readStringBinary(step_description, in);
QueryPlanSerializationSettings settings;
settings.readBinary(in);
DataStreams input_streams;
input_streams.reserve(frame.children.size());
for (const auto & child : frame.children)
input_streams.push_back(child->step->getOutputStream());
auto step = step_registry.createStep(in, step_name, input_streams, settings);
auto header = deserializeHeader(in);
if (step->hasOutputStream())
{
assertCompatibleHeader(step->getOutputStream().header, header,
fmt::format("deserialization of query plan step {}", step_name));
}
else if (header.columns())
throw Exception(ErrorCodes::INCORRECT_DATA,
"Deserialized step {} has no output stream, but deserialized header is not empty : {}",
step_name, header.dumpStructure());
auto & node = plan.nodes.emplace_back(std::move(step), std::move(frame.children));
frame.to_fill = &node;
stack.pop();
}
return plan;
}
}

View File

@ -20,6 +20,7 @@ using QueryPlanStepPtr = std::unique_ptr<IQueryPlanStep>;
class QueryPipelineBuilder;
using QueryPipelineBuilderPtr = std::unique_ptr<QueryPipelineBuilder>;
class ReadBuffer;
class WriteBuffer;
class QueryPlan;
@ -54,6 +55,9 @@ public:
bool isCompleted() const; /// Tree is not empty and root hasOutputStream()
const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted())
void serialize(WriteBuffer & out) const;
static QueryPlan deserialize(ReadBuffer & in);
void optimize(const QueryPlanOptimizationSettings & optimization_settings);
QueryPipelineBuilderPtr buildQueryPipeline(

View File

@ -0,0 +1,8 @@
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
namespace DB
{
IMPLEMENT_SETTINGS_TRAITS(QueryPlanSerializationSettingsTraits, PLAN_SERIALIZATION_SETTINGS);
}

View File

@ -0,0 +1,23 @@
#pragma once
#include <Core/BaseSettings.h>
#include <Core/SettingsEnums.h>
#include <Core/Defines.h>
namespace DB
{
#define PLAN_SERIALIZATION_SETTINGS(M, ALIAS) \
M(UInt64, max_rows_in_distinct, 0, "Maximum number of elements during execution of DISTINCT.", 0) \
M(UInt64, max_bytes_in_distinct, 0, "Maximum total size of state (in uncompressed bytes) in memory for the execution of DISTINCT.", 0) \
M(OverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
DECLARE_SETTINGS_TRAITS(QueryPlanSerializationSettingsTraits, PLAN_SERIALIZATION_SETTINGS)
struct QueryPlanSerializationSettings : public BaseSettings<QueryPlanSerializationSettingsTraits>
{
QueryPlanSerializationSettings() = default;
};
}

View File

@ -0,0 +1,54 @@
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_IDENTIFIER;
extern const int LOGICAL_ERROR;
}
QueryPlanStepRegistry & QueryPlanStepRegistry::instance()
{
static QueryPlanStepRegistry registry;
return registry;
}
void QueryPlanStepRegistry::registerStep(const std::string & name, StepCreateFunction && create_function)
{
if (steps.contains(name))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan step '{}' is already registered", name);
steps[name] = std::move(create_function);
}
QueryPlanStepPtr QueryPlanStepRegistry::createStep(
ReadBuffer & buf,
const std::string & name,
const DataStreams & input_streams,
QueryPlanSerializationSettings & settings) const
{
StepCreateFunction create_function;
{
auto it = steps.find(name);
if (it == steps.end())
throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown query plan step: {}", name);
create_function = it->second;
}
return create_function(buf, input_streams, settings);
}
void registerExpressionStep(QueryPlanStepRegistry & registry);
void registerUnionStep(QueryPlanStepRegistry & registry);
void registerDistinctStep(QueryPlanStepRegistry & registry);
void registerPlanSteps()
{
QueryPlanStepRegistry & registry = QueryPlanStepRegistry::instance();
registerExpressionStep(registry);
registerUnionStep(registry);
registerDistinctStep(registry);
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
namespace DB
{
class QueryPlanStepRegistry
{
public:
using StepCreateFunction = std::function<QueryPlanStepPtr(ReadBuffer &, const DataStreams &, QueryPlanSerializationSettings &)>;
QueryPlanStepRegistry() = default;
QueryPlanStepRegistry(const QueryPlanStepRegistry &) = delete;
QueryPlanStepRegistry & operator=(const QueryPlanStepRegistry &) = delete;
static QueryPlanStepRegistry & instance();
static void registerPlanSteps();
void registerStep(const std::string & name, StepCreateFunction && create_function);
QueryPlanStepPtr createStep(
ReadBuffer & buf,
const std::string & name,
const DataStreams & input_streams,
QueryPlanSerializationSettings & settings) const;
private:
std::unordered_map<std::string, StepCreateFunction> steps;
};
}

View File

@ -1,6 +1,8 @@
#include <type_traits>
#include <Interpreters/ExpressionActions.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -104,4 +106,25 @@ void UnionStep::describePipeline(FormatSettings & settings) const
IQueryPlanStep::describePipeline(processors, settings);
}
// void UnionStep::serializeSettings(QueryPlanSerializationSettings & settings) const
// {
// // settings.max_threads = max_threads;
// }
void UnionStep::serialize(WriteBuffer & out) const
{
(void)out;
}
std::unique_ptr<IQueryPlanStep> UnionStep::deserialize(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &)
{
(void)in;
return std::make_unique<UnionStep>(input_streams_);
}
void registerUnionStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("Union", &UnionStep::deserialize);
}
}

View File

@ -21,6 +21,10 @@ public:
bool canUpdateInputStream() const override { return true; }
// void serializeSettings(QueryPlanSerializationSettings & settings) const override;
void serialize(WriteBuffer & out) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &);
private:
void updateOutputStream() override;