diff --git a/src/Core/BaseSettings.h b/src/Core/BaseSettings.h index 6242d78aee7..311a8c55a09 100644 --- a/src/Core/BaseSettings.h +++ b/src/Core/BaseSettings.h @@ -99,6 +99,9 @@ public: void write(WriteBuffer & out, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT) const; void read(ReadBuffer & in, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT); + void writeChangedBinary(WriteBuffer & out) const; + void readBinary(ReadBuffer & in); + // A debugging aid. std::string toString() const; @@ -499,6 +502,46 @@ void BaseSettings::write(WriteBuffer & out, SettingsWriteFormat format) BaseSettingsHelpers::writeString(std::string_view{}, out); } +template +void BaseSettings::writeChangedBinary(WriteBuffer & out) const +{ + const auto & accessor = Traits::Accessor::instance(); + + size_t num_settings = 0; + for (auto it = this->begin(); it != this->end(); ++it) + ++num_settings; + + writeVarUInt(num_settings, out); + + for (const auto & field : *this) + { + BaseSettingsHelpers::writeString(field.getName(), out); + using Flags = BaseSettingsHelpers::Flags; + Flags flags{0}; + BaseSettingsHelpers::writeFlags(flags, out); + accessor.writeBinary(*this, field.index, out); + } +} + +template +void BaseSettings::readBinary(ReadBuffer & in) +{ + const auto & accessor = Traits::Accessor::instance(); + + size_t num_settings = 0; + readVarUInt(num_settings, in); + + for (size_t i = 0; i < num_settings; ++i) + { + String read_name = BaseSettingsHelpers::readString(in); + std::string_view name = TTraits::resolveName(read_name); + size_t index = accessor.find(name); + + std::ignore = BaseSettingsHelpers::readFlags(in); + accessor.readBinary(*this, index, in); + } +} + template void BaseSettings::read(ReadBuffer & in, SettingsWriteFormat format) { diff --git a/src/Interpreters/ActionsDAG.cpp b/src/Interpreters/ActionsDAG.cpp index 2a594839c6a..e9f4ffe3d80 100644 --- a/src/Interpreters/ActionsDAG.cpp +++ b/src/Interpreters/ActionsDAG.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -3217,4 +3218,240 @@ const ActionsDAG::Node * FindAliasForInputName::find(const String & name) return it->second; } +void ActionsDAG::serialize(WriteBuffer & out) const +{ + size_t nodes_size = nodes.size(); + writeVarUInt(nodes_size, out); + + std::unordered_map node_to_id; + for (const auto & node : nodes) + node_to_id.emplace(&node, node_to_id.size()); + + if (nodes_size != node_to_id.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Duplicate nodes in ActionsDAG"); + + for (const auto & node : nodes) + { + writeIntBinary(static_cast(node.type), out); + writeStringBinary(node.result_name, out); + writeStringBinary(node.result_type->getName(), out); + + writeVarUInt(node.children.size(), out); + for (const auto * child : node.children) + writeVarUInt(node_to_id.at(child), out); + + /// Serialize column if it is present + const bool has_column = node.column != nullptr; + UInt8 column_flags = 0; + if (has_column) + { + column_flags |= 1; + if (node.is_deterministic_constant) + column_flags |= 2; + } + + writeIntBinary(column_flags, out); + if (has_column) + { + const auto * const_column = typeid_cast(node.column.get()); + if (!const_column) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot serialize non-constant column {}", node.column->getName()); + + auto value = const_column->getField(); + node.result_type->getDefaultSerialization()->serializeBinary(value, out, FormatSettings{}); + } + + if (node.type == ActionType::INPUT) + { + /// nothing to serialize + } + else if (node.type == ActionType::COLUMN) + { + /// nothing to serialize, column is already serialized + } + else if (node.type == ActionType::ALIAS) + { + /// nothing to serialize + } + else if (node.type == ActionType::FUNCTION) + { + writeStringBinary(node.function_base->getName(), out); + } + else if (node.type == ActionType::ARRAY_JOIN) + { + /// nothing to serialize + } + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown node type {}", static_cast(node.type)); + } + } + + writeVarUInt(inputs.size(), out); + for (const auto * input : inputs) + writeVarUInt(node_to_id.at(input), out); + + writeVarUInt(outputs.size(), out); + for (const auto * output : outputs) + writeVarUInt(node_to_id.at(output), out); +} + +ActionsDAG ActionsDAG::deserialize(ReadBuffer & in) +{ + size_t nodes_size; + readVarUInt(nodes_size, in); + + std::list nodes; + std::unordered_map id_to_node; + for (size_t i = 0; i < nodes_size; ++i) + id_to_node.emplace(i, &nodes.emplace_back(Node{})); + + for (size_t i = 0; i < nodes_size; ++i) + { + Node & node = *id_to_node.at(i); + + UInt8 action_type{0}; + readIntBinary(action_type, in); + if (action_type > static_cast(ActionType::FUNCTION)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown action type {}", size_t(action_type)); + node.type = static_cast(action_type); + + readStringBinary(node.result_name, in); + + String result_type_name; + readStringBinary(result_type_name, in); + node.result_type = DataTypeFactory::instance().get(result_type_name); + + size_t children_size; + readVarUInt(children_size, in); + for (size_t j = 0; j < children_size; ++j) + { + size_t child_id; + readVarUInt(child_id, in); + node.children.push_back(id_to_node.at(child_id)); + } + + UInt8 column_flags = 0; + readIntBinary(column_flags, in); + + /// Deserialize column if it is present + if (column_flags & 1) + { + if ((column_flags & 2) == 0) + node.is_deterministic_constant = false; + + Field value; + node.result_type->getDefaultSerialization()->deserializeBinary(value, in, FormatSettings{}); + node.column = node.result_type->createColumnConst(0, value); + } + + if (node.type == ActionType::INPUT) + { + /// nothing to deserialize + if (!node.children.empty()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized input can't have children"); + } + else if (node.type == ActionType::COLUMN) + { + /// Column is already deserialized + if (!node.children.empty()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized column can't have children"); + } + else if (node.type == ActionType::ALIAS) + { + /// nothing to deserialize + if (node.children.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized alias must have one children"); + } + else if (node.type == ActionType::FUNCTION) + { + String function_name; + readStringBinary(function_name, in); + + auto function = FunctionFactory::instance().get(function_name, Context::getGlobalContextInstance()); + + ColumnsWithTypeAndName arguments; + arguments.reserve(node.children.size()); + for (const auto * child : node.children) + { + ColumnWithTypeAndName argument; + argument.column = child->column; + argument.type = child->result_type; + argument.name = child->result_name; + + arguments.emplace_back(std::move(argument)); + } + + node.function_base = function->build(arguments); + node.function = node.function_base->prepare(arguments); + node.is_function_compiled = false; + + if (!node.function_base->getResultType()->equals(*node.result_type)) + throw Exception(ErrorCodes::INCORRECT_DATA, + "Deserialized function {} has type. Expected {}, deserialzied {}.", + function_name, + node.function_base->getResultType()->getName(), + node.result_type->getName()); + } + else if (node.type == ActionType::ARRAY_JOIN) + { + /// nothing to deserialize + if (node.children.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized array join must have one children"); + } + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown node type {}", static_cast(node.type)); + } + } + + size_t inputs_size; + readVarUInt(inputs_size, in); + std::vector inputs; + std::unordered_set inputs_set; + inputs.reserve(inputs_size); + for (size_t i = 0; i < inputs_size; ++i) + { + size_t input_id; + readVarUInt(input_id, in); + const auto * input = id_to_node.at(input_id); + + if (input->type != ActionType::INPUT) + throw Exception(ErrorCodes::INCORRECT_DATA, + "Deserialized input {} has type {}", + input->result_name, magic_enum::enum_name(input->type)); + + if (!inputs_set.emplace(input).second) + throw Exception(ErrorCodes::INCORRECT_DATA, + "Duplicate input {}", input->result_name); + + inputs.push_back(input); + } + + size_t outputs_size; + readVarUInt(outputs_size, in); + std::vector outputs; + outputs.reserve(outputs_size); + for (size_t i = 0; i < outputs_size; ++i) + { + size_t output_id; + readVarUInt(output_id, in); + outputs.push_back(id_to_node.at(output_id)); + } + + for (const auto & node : nodes) + if (node.type == ActionType::INPUT && !inputs_set.contains(&node)) + throw Exception(ErrorCodes::INCORRECT_DATA, + "Deserialized input {} is not in the inputs list", + node.result_name); + + ActionsDAG dag; + dag.nodes = std::move(nodes); + dag.inputs = std::move(inputs); + dag.outputs = std::move(outputs); + + return dag; +} + } diff --git a/src/Interpreters/ActionsDAG.h b/src/Interpreters/ActionsDAG.h index ee2b3fbf4f2..5b74bc2447a 100644 --- a/src/Interpreters/ActionsDAG.h +++ b/src/Interpreters/ActionsDAG.h @@ -130,6 +130,9 @@ public: std::string dumpNames() const; std::string dumpDAG() const; + void serialize(WriteBuffer & out) const; + static ActionsDAG deserialize(ReadBuffer & in); + const Node & addInput(std::string name, DataTypePtr type); const Node & addInput(ColumnWithTypeAndName column); const Node & addColumn(ColumnWithTypeAndName column); diff --git a/src/Processors/QueryPlan/DistinctStep.cpp b/src/Processors/QueryPlan/DistinctStep.cpp index b1c24fc01ce..5de2968a90d 100644 --- a/src/Processors/QueryPlan/DistinctStep.cpp +++ b/src/Processors/QueryPlan/DistinctStep.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include #include #include @@ -62,6 +64,16 @@ DistinctStep::DistinctStep( { } +void DistinctStep::updateLimitHint(UInt64 hint) +{ + if (hint && limit_hint) + /// Both limits are set - take the min + limit_hint = std::min(hint, limit_hint); + else + /// Some limit is not set - take the other one + limit_hint = std::max(hint, limit_hint); +} + void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { if (!pre_distinct) @@ -181,4 +193,61 @@ void DistinctStep::updateOutputStream() getTraits(pre_distinct).data_stream_traits); } +void DistinctStep::serializeSettings(QueryPlanSerializationSettings & settings) const +{ + settings.max_rows_in_distinct = set_size_limits.max_rows; + settings.max_bytes_in_distinct = set_size_limits.max_bytes; + settings.distinct_overflow_mode = set_size_limits.overflow_mode; +} + +void DistinctStep::serialize(WriteBuffer & out) const +{ + /// Let's not serialzie limit_hint. + /// Ideally, we can get if from a query plan optimization on the follower. + + writeVarUInt(columns.size(), out); + for (const auto & column : columns) + writeStringBinary(column, out); +} + +std::unique_ptr DistinctStep::deserialize( + ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings & settings, bool pre_distinct_) +{ + if (input_streams_.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "DistinctStep must have one input stream"); + + size_t columns_size; + readVarUInt(columns_size, in); + Names column_names(columns_size); + for (size_t i = 0; i < columns_size; ++i) + readStringBinary(column_names[i], in); + + SizeLimits size_limits; + size_limits.max_rows = settings.max_rows_in_distinct; + size_limits.max_bytes = settings.max_bytes_in_distinct; + size_limits.overflow_mode = settings.distinct_overflow_mode; + + return std::make_unique( + input_streams_.front(), size_limits, 0, column_names, pre_distinct_, false); +} + +std::unique_ptr DistinctStep::deserializeNormal( + ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings & settings) +{ + return DistinctStep::deserialize(in, input_streams_, settings, false); +} +std::unique_ptr DistinctStep::deserializePre( + ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings & settings) +{ + return DistinctStep::deserialize(in, input_streams_, settings, true); +} + +void registerDistinctStep(QueryPlanStepRegistry & registry) +{ + /// Preliminary distinct probably can be a query plan optimization. + /// It's easier to serialzie it using different names, so that pre-distinct can be potentially removed later. + registry.registerStep("Distinct", DistinctStep::deserializeNormal); + registry.registerStep("PreDistinct", DistinctStep::deserializePre); +} + } diff --git a/src/Processors/QueryPlan/DistinctStep.h b/src/Processors/QueryPlan/DistinctStep.h index d59adbbf92d..80a37dee535 100644 --- a/src/Processors/QueryPlan/DistinctStep.h +++ b/src/Processors/QueryPlan/DistinctStep.h @@ -20,6 +20,8 @@ public: String getName() const override { return "Distinct"; } const Names & getColumnNames() const { return columns; } + String getSerializationName() const override { return pre_distinct ? "PreDistinct" : "Distinct"; } + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; void describeActions(JSONBuilder::JSONMap & map) const override; @@ -28,6 +30,14 @@ public: bool isPreliminary() const { return pre_distinct; } UInt64 getLimitHint() const { return limit_hint; } + void updateLimitHint(UInt64 hint); + + void serializeSettings(QueryPlanSerializationSettings & settings) const override; + void serialize(WriteBuffer & out) const override; + + static std::unique_ptr deserialize(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &, bool pre_distinct_); + static std::unique_ptr deserializeNormal(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &); + static std::unique_ptr deserializePre(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &); private: void updateOutputStream() override; diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index 6f88c4527a4..41366ae6170 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -90,4 +91,23 @@ void ExpressionStep::updateOutputStream() } } +void ExpressionStep::serialize(WriteBuffer & out) const +{ + actions_dag.serialize(out); +} + +std::unique_ptr ExpressionStep::deserialize(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &) +{ + ActionsDAG actions_dag = ActionsDAG::deserialize(in); + if (input_streams_.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "ExpressionStep must have one input stream"); + + return std::make_unique(input_streams_.front(), std::move(actions_dag)); +} + +void registerExpressionStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("Expression", ExpressionStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/ExpressionStep.h b/src/Processors/QueryPlan/ExpressionStep.h index f2926318cbc..792ee99ba5c 100644 --- a/src/Processors/QueryPlan/ExpressionStep.h +++ b/src/Processors/QueryPlan/ExpressionStep.h @@ -25,6 +25,10 @@ public: void describeActions(JSONBuilder::JSONMap & map) const override; + void serialize(WriteBuffer & out) const override; + + static std::unique_ptr deserialize(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &); + private: void updateOutputStream() override; diff --git a/src/Processors/QueryPlan/IQueryPlanStep.h b/src/Processors/QueryPlan/IQueryPlanStep.h index 44eb7ea0c59..3a8f2e0085d 100644 --- a/src/Processors/QueryPlan/IQueryPlanStep.h +++ b/src/Processors/QueryPlan/IQueryPlanStep.h @@ -63,6 +63,8 @@ using DataStreams = std::vector; class QueryPlan; using QueryPlanRawPtrs = std::list; +struct QueryPlanSerializationSettings; + /// Single step of query plan. class IQueryPlanStep { @@ -70,6 +72,7 @@ public: virtual ~IQueryPlanStep() = default; virtual String getName() const = 0; + virtual String getSerializationName() const { return getName(); } /// Add processors from current step to QueryPipeline. /// Calling this method, we assume and don't check that: @@ -88,6 +91,9 @@ public: const std::string & getStepDescription() const { return step_description; } void setStepDescription(std::string description) { step_description = std::move(description); } + virtual void serializeSettings(QueryPlanSerializationSettings & /*settings*/) const {} + virtual void serialize(WriteBuffer & /*buf*/) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for {}", getName()); } + struct FormatSettings { WriteBuffer & out; diff --git a/src/Processors/QueryPlan/Optimizations/limitPushDown.cpp b/src/Processors/QueryPlan/Optimizations/limitPushDown.cpp index cb731376473..b1997712d25 100644 --- a/src/Processors/QueryPlan/Optimizations/limitPushDown.cpp +++ b/src/Processors/QueryPlan/Optimizations/limitPushDown.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include namespace DB::QueryPlanOptimizations @@ -63,6 +64,12 @@ size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes &) if (tryUpdateLimitForSortingSteps(child_node, limit->getLimitForSorting())) return 0; + if (auto * distinct = typeid_cast(child.get())) + { + distinct->updateLimitHint(limit->getLimitForSorting()); + return 0; + } + /// Special case for TotalsHaving. Totals may be incorrect if we push down limit. if (typeid_cast(child.get())) return 0; diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index b78f7a29cde..066bde5e143 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include @@ -16,6 +17,8 @@ #include #include #include +#include +#include #include @@ -565,4 +568,148 @@ std::pair QueryPlan::detachNodesAndRe return {std::move(plan.nodes), std::move(plan.resources)}; } +static void serializeHeader(const Block & header, WriteBuffer & out) +{ + /// Write only names and types. + /// Constants should be filled by step. + + writeVarUInt(header.columns(), out); + for (const auto & column : header) + { + writeStringBinary(column.name, out); + encodeDataType(column.type, out); + } +} + +static Block deserializeHeader(ReadBuffer & in) +{ + UInt64 num_columns; + readVarUInt(num_columns, in); + + ColumnsWithTypeAndName columns(num_columns); + + for (auto & column : columns) + { + readStringBinary(column.name, in); + column.type = decodeDataType(in); + } + + return Block(std::move(columns)); +} + +void QueryPlan::serialize(WriteBuffer & out) const +{ + checkInitialized(); + + struct Frame + { + Node * node = {}; + size_t next_child = 0; + }; + + std::stack stack; + stack.push(Frame{.node = root}); + while (!stack.empty()) + { + auto & frame = stack.top(); + auto * node = frame.node; + if (frame.next_child == 0) + { + writeVarUInt(node->children.size(), out); + } + + if (frame.next_child < node->children.size()) + { + stack.push(Frame{.node = node->children[frame.next_child]}); + ++frame.next_child; + continue; + } + + stack.pop(); + + writeStringBinary(node->step->getSerializationName(), out); + writeStringBinary(node->step->getStepDescription(), out); + + QueryPlanSerializationSettings settings; + node->step->serializeSettings(settings); + + settings.writeChangedBinary(out); + node->step->serialize(out); + + if (node->step->hasOutputStream()) + serializeHeader(node->step->getOutputStream().header, out); + else + serializeHeader({}, out); + } +} + +QueryPlan QueryPlan::deserialize(ReadBuffer & in) +{ + QueryPlanStepRegistry & step_registry = QueryPlanStepRegistry::instance(); + + using NodePtr = Node *; + struct Frame + { + NodePtr & to_fill; + size_t next_child = 0; + std::vector children = {}; + }; + + std::stack stack; + + QueryPlan plan; + stack.push(Frame{.to_fill = plan.root}); + + while (!stack.empty()) + { + auto & frame = stack.top(); + if (frame.next_child == 0) + { + UInt64 num_children; + readVarUInt(num_children, in); + frame.children.resize(num_children); + } + + if (frame.next_child < frame.children.size()) + { + stack.push(Frame{.to_fill = frame.children[frame.next_child]}); + ++frame.next_child; + continue; + } + + std::string step_name; + std::string step_description; + readStringBinary(step_name, in); + readStringBinary(step_description, in); + + QueryPlanSerializationSettings settings; + settings.readBinary(in); + + DataStreams input_streams; + input_streams.reserve(frame.children.size()); + for (const auto & child : frame.children) + input_streams.push_back(child->step->getOutputStream()); + + auto step = step_registry.createStep(in, step_name, input_streams, settings); + + auto header = deserializeHeader(in); + if (step->hasOutputStream()) + { + assertCompatibleHeader(step->getOutputStream().header, header, + fmt::format("deserialization of query plan step {}", step_name)); + } + else if (header.columns()) + throw Exception(ErrorCodes::INCORRECT_DATA, + "Deserialized step {} has no output stream, but deserialized header is not empty : {}", + step_name, header.dumpStructure()); + + auto & node = plan.nodes.emplace_back(std::move(step), std::move(frame.children)); + frame.to_fill = &node; + + stack.pop(); + } + + return plan; +} + } diff --git a/src/Processors/QueryPlan/QueryPlan.h b/src/Processors/QueryPlan/QueryPlan.h index 75c577af24e..bb2a8a5a618 100644 --- a/src/Processors/QueryPlan/QueryPlan.h +++ b/src/Processors/QueryPlan/QueryPlan.h @@ -20,6 +20,7 @@ using QueryPlanStepPtr = std::unique_ptr; class QueryPipelineBuilder; using QueryPipelineBuilderPtr = std::unique_ptr; +class ReadBuffer; class WriteBuffer; class QueryPlan; @@ -54,6 +55,9 @@ public: bool isCompleted() const; /// Tree is not empty and root hasOutputStream() const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted()) + void serialize(WriteBuffer & out) const; + static QueryPlan deserialize(ReadBuffer & in); + void optimize(const QueryPlanOptimizationSettings & optimization_settings); QueryPipelineBuilderPtr buildQueryPipeline( diff --git a/src/Processors/QueryPlan/QueryPlanSerializationSettings.cpp b/src/Processors/QueryPlan/QueryPlanSerializationSettings.cpp new file mode 100644 index 00000000000..9b11132e27c --- /dev/null +++ b/src/Processors/QueryPlan/QueryPlanSerializationSettings.cpp @@ -0,0 +1,8 @@ +#include + +namespace DB +{ + +IMPLEMENT_SETTINGS_TRAITS(QueryPlanSerializationSettingsTraits, PLAN_SERIALIZATION_SETTINGS); + +} diff --git a/src/Processors/QueryPlan/QueryPlanSerializationSettings.h b/src/Processors/QueryPlan/QueryPlanSerializationSettings.h new file mode 100644 index 00000000000..98ec3610a45 --- /dev/null +++ b/src/Processors/QueryPlan/QueryPlanSerializationSettings.h @@ -0,0 +1,23 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ + +#define PLAN_SERIALIZATION_SETTINGS(M, ALIAS) \ + M(UInt64, max_rows_in_distinct, 0, "Maximum number of elements during execution of DISTINCT.", 0) \ + M(UInt64, max_bytes_in_distinct, 0, "Maximum total size of state (in uncompressed bytes) in memory for the execution of DISTINCT.", 0) \ + M(OverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ + +DECLARE_SETTINGS_TRAITS(QueryPlanSerializationSettingsTraits, PLAN_SERIALIZATION_SETTINGS) + +struct QueryPlanSerializationSettings : public BaseSettings +{ + QueryPlanSerializationSettings() = default; +}; + +} diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp new file mode 100644 index 00000000000..68ed2347d13 --- /dev/null +++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp @@ -0,0 +1,54 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_IDENTIFIER; + extern const int LOGICAL_ERROR; +} + +QueryPlanStepRegistry & QueryPlanStepRegistry::instance() +{ + static QueryPlanStepRegistry registry; + return registry; +} + +void QueryPlanStepRegistry::registerStep(const std::string & name, StepCreateFunction && create_function) +{ + if (steps.contains(name)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan step '{}' is already registered", name); + steps[name] = std::move(create_function); +} + +QueryPlanStepPtr QueryPlanStepRegistry::createStep( + ReadBuffer & buf, + const std::string & name, + const DataStreams & input_streams, + QueryPlanSerializationSettings & settings) const +{ + StepCreateFunction create_function; + { + auto it = steps.find(name); + if (it == steps.end()) + throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown query plan step: {}", name); + create_function = it->second; + } + return create_function(buf, input_streams, settings); +} + +void registerExpressionStep(QueryPlanStepRegistry & registry); +void registerUnionStep(QueryPlanStepRegistry & registry); +void registerDistinctStep(QueryPlanStepRegistry & registry); + +void registerPlanSteps() +{ + QueryPlanStepRegistry & registry = QueryPlanStepRegistry::instance(); + + registerExpressionStep(registry); + registerUnionStep(registry); + registerDistinctStep(registry); +} + +} diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.h b/src/Processors/QueryPlan/QueryPlanStepRegistry.h new file mode 100644 index 00000000000..b15e12eb95f --- /dev/null +++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.h @@ -0,0 +1,34 @@ +#pragma once + +#include + +namespace DB +{ + +class QueryPlanStepRegistry +{ +public: + using StepCreateFunction = std::function; + + QueryPlanStepRegistry() = default; + QueryPlanStepRegistry(const QueryPlanStepRegistry &) = delete; + QueryPlanStepRegistry & operator=(const QueryPlanStepRegistry &) = delete; + + static QueryPlanStepRegistry & instance(); + + static void registerPlanSteps(); + + void registerStep(const std::string & name, StepCreateFunction && create_function); + + QueryPlanStepPtr createStep( + ReadBuffer & buf, + const std::string & name, + const DataStreams & input_streams, + QueryPlanSerializationSettings & settings) const; + +private: + std::unordered_map steps; + +}; + +} diff --git a/src/Processors/QueryPlan/UnionStep.cpp b/src/Processors/QueryPlan/UnionStep.cpp index dde12271de1..95330af37ee 100644 --- a/src/Processors/QueryPlan/UnionStep.cpp +++ b/src/Processors/QueryPlan/UnionStep.cpp @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include #include #include @@ -104,4 +106,25 @@ void UnionStep::describePipeline(FormatSettings & settings) const IQueryPlanStep::describePipeline(processors, settings); } +// void UnionStep::serializeSettings(QueryPlanSerializationSettings & settings) const +// { +// // settings.max_threads = max_threads; +// } + +void UnionStep::serialize(WriteBuffer & out) const +{ + (void)out; +} + +std::unique_ptr UnionStep::deserialize(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &) +{ + (void)in; + return std::make_unique(input_streams_); +} + +void registerUnionStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("Union", &UnionStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/UnionStep.h b/src/Processors/QueryPlan/UnionStep.h index 4ab08785b01..58f3c18d886 100644 --- a/src/Processors/QueryPlan/UnionStep.h +++ b/src/Processors/QueryPlan/UnionStep.h @@ -21,6 +21,10 @@ public: bool canUpdateInputStream() const override { return true; } + // void serializeSettings(QueryPlanSerializationSettings & settings) const override; + void serialize(WriteBuffer & out) const override; + static std::unique_ptr deserialize(ReadBuffer & in, const DataStreams & input_streams_, QueryPlanSerializationSettings &); + private: void updateOutputStream() override;