diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 7d98178e096..6b5aa1ac0b2 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -924,6 +925,8 @@ try registerRemoteFileMetadatas(); registerSchedulerNodes(); + QueryPlanStepRegistry::registerPlanSteps(); + CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision()); CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger()); diff --git a/src/Analyzer/Resolve/IdentifierResolver.cpp b/src/Analyzer/Resolve/IdentifierResolver.cpp index 317a02a60f2..a59867091bd 100644 --- a/src/Analyzer/Resolve/IdentifierResolver.cpp +++ b/src/Analyzer/Resolve/IdentifierResolver.cpp @@ -393,7 +393,7 @@ QueryTreeNodePtr IdentifierResolver::wrapExpressionNodeInTupleElement(QueryTreeN /// Resolve identifier functions implementation /// Try resolve table identifier from database catalog -QueryTreeNodePtr IdentifierResolver::tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context) +std::shared_ptr IdentifierResolver::tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context) { size_t parts_size = table_identifier.getPartsSize(); if (parts_size < 1 || parts_size > 2) diff --git a/src/Analyzer/Resolve/IdentifierResolver.h b/src/Analyzer/Resolve/IdentifierResolver.h index cdbd7610b5e..7402c69db72 100644 --- a/src/Analyzer/Resolve/IdentifierResolver.h +++ b/src/Analyzer/Resolve/IdentifierResolver.h @@ -21,6 +21,7 @@ class QueryExpressionsAliasVisitor ; class QueryNode; class JoinNode; class ColumnNode; +class TableNode; using ProjectionName = String; using ProjectionNames = std::vector; @@ -86,7 +87,7 @@ public: /// Resolve identifier functions - static QueryTreeNodePtr tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context); + static std::shared_ptr tryResolveTableIdentifierFromDatabaseCatalog(const Identifier & table_identifier, ContextPtr context); QueryTreeNodePtr tryResolveIdentifierFromCompoundExpression(const Identifier & expression_identifier, size_t identifier_bind_size, diff --git a/src/Analyzer/Resolve/QueryAnalyzer.cpp b/src/Analyzer/Resolve/QueryAnalyzer.cpp index d118cb281ae..f9b62618e9a 100644 --- a/src/Analyzer/Resolve/QueryAnalyzer.cpp +++ b/src/Analyzer/Resolve/QueryAnalyzer.cpp @@ -3470,11 +3470,8 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi auto set = std::make_shared(size_limits_for_set, 0, settings[Setting::transform_null_in]); - set->setHeader(result_block.cloneEmpty().getColumnsWithTypeAndName()); - set->insertFromBlock(result_block.getColumnsWithTypeAndName()); - set->finishInsert(); - - auto future_set = std::make_shared(std::move(set)); + auto hash = function_arguments[1]->getTreeHash(); + auto future_set = std::make_shared(hash, std::move(result_block), settings[Setting::transform_null_in], size_limits_for_set); /// Create constant set column for constant folding diff --git a/src/Analyzer/SetUtils.cpp b/src/Analyzer/SetUtils.cpp index 59a243b27f3..5ffa4f58307 100644 --- a/src/Analyzer/SetUtils.cpp +++ b/src/Analyzer/SetUtils.cpp @@ -62,7 +62,7 @@ size_t getCompoundTypeDepth(const IDataType & type) } template -Block createBlockFromCollection(const Collection & collection, const DataTypes& value_types, const DataTypes & block_types, bool transform_null_in) +ColumnsWithTypeAndName createBlockFromCollection(const Collection & collection, const DataTypes& value_types, const DataTypes & block_types, bool transform_null_in) { assert(collection.size() == value_types.size()); size_t columns_size = block_types.size(); @@ -132,16 +132,19 @@ Block createBlockFromCollection(const Collection & collection, const DataTypes& columns[i]->insert(tuple_values[i]); } - Block res; + ColumnsWithTypeAndName res(columns_size); for (size_t i = 0; i < columns_size; ++i) - res.insert(ColumnWithTypeAndName{std::move(columns[i]), block_types[i], "argument_" + toString(i)}); + { + res[i].type = block_types[i]; + res[i].column = std::move(columns[i]); + } return res; } } -Block getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in) +ColumnsWithTypeAndName getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in) { DataTypes set_element_types = {expression_type}; const auto * lhs_tuple_type = typeid_cast(expression_type.get()); @@ -158,7 +161,7 @@ Block getSetElementsForConstantValue(const DataTypePtr & expression_type, const size_t lhs_type_depth = getCompoundTypeDepth(*expression_type); size_t rhs_type_depth = getCompoundTypeDepth(*value_type); - Block result_block; + ColumnsWithTypeAndName result_block; if (lhs_type_depth == rhs_type_depth) { diff --git a/src/Analyzer/SetUtils.h b/src/Analyzer/SetUtils.h index aef906a6576..17f9721b7d9 100644 --- a/src/Analyzer/SetUtils.h +++ b/src/Analyzer/SetUtils.h @@ -19,6 +19,6 @@ using SetPtr = std::shared_ptr; * Example: SELECT id FROM test_table WHERE id IN (1, 2, 3, 4); * Example: SELECT id FROM test_table WHERE id IN ((1, 2), (3, 4)); */ -Block getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in); +ColumnsWithTypeAndName getSetElementsForConstantValue(const DataTypePtr & expression_type, const Field & value, const DataTypePtr & value_type, bool transform_null_in); } diff --git a/src/Columns/ColumnFunction.h b/src/Columns/ColumnFunction.h index 8df9e23c0e8..a0551a9847a 100644 --- a/src/Columns/ColumnFunction.h +++ b/src/Columns/ColumnFunction.h @@ -198,6 +198,9 @@ public: /// Create copy of this column, but with recursively_convert_result_to_full_column_if_low_cardinality = true ColumnPtr recursivelyConvertResultToFullColumnIfLowCardinality() const; + const FunctionBasePtr & getFunction() const { return function; } + const ColumnsWithTypeAndName & getCapturedColumns() const { return captured_columns; } + private: size_t elements_size; FunctionBasePtr function; diff --git a/src/Core/BaseSettings.h b/src/Core/BaseSettings.h index 201b586f067..04f80210443 100644 --- a/src/Core/BaseSettings.h +++ b/src/Core/BaseSettings.h @@ -145,6 +145,9 @@ public: void write(WriteBuffer & out, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT) const; void read(ReadBuffer & in, SettingsWriteFormat format = SettingsWriteFormat::DEFAULT); + void writeChangedBinary(WriteBuffer & out) const; + void readBinary(ReadBuffer & in); + // A debugging aid. std::string toString() const; @@ -479,6 +482,46 @@ void BaseSettings::write(WriteBuffer & out, SettingsWriteFormat format) BaseSettingsHelpers::writeString(std::string_view{}, out); } +template +void BaseSettings::writeChangedBinary(WriteBuffer & out) const +{ + const auto & accessor = Traits::Accessor::instance(); + + size_t num_settings = 0; + for (auto it = this->begin(); it != this->end(); ++it) + ++num_settings; + + writeVarUInt(num_settings, out); + + for (const auto & field : *this) + { + BaseSettingsHelpers::writeString(field.getName(), out); + using Flags = BaseSettingsHelpers::Flags; + Flags flags{0}; + BaseSettingsHelpers::writeFlags(flags, out); + accessor.writeBinary(*this, field.index, out); + } +} + +template +void BaseSettings::readBinary(ReadBuffer & in) +{ + const auto & accessor = Traits::Accessor::instance(); + + size_t num_settings = 0; + readVarUInt(num_settings, in); + + for (size_t i = 0; i < num_settings; ++i) + { + String read_name = BaseSettingsHelpers::readString(in); + std::string_view name = TTraits::resolveName(read_name); + size_t index = accessor.find(name); + + std::ignore = BaseSettingsHelpers::readFlags(in); + accessor.readBinary(*this, index, in); + } +} + template void BaseSettings::read(ReadBuffer & in, SettingsWriteFormat format) { diff --git a/src/Core/Field.h b/src/Core/Field.h index 5a6ee9cdf29..b7f7225dae1 100644 --- a/src/Core/Field.h +++ b/src/Core/Field.h @@ -1004,6 +1004,10 @@ void readQuoted(DecimalField & x, ReadBuffer & buf); void writeFieldText(const Field & x, WriteBuffer & buf); + +void writeFieldBinary(const Field & x, WriteBuffer & buf); +Field readFieldBinary(ReadBuffer & buf); + String toString(const Field & x); } diff --git a/src/Core/SortDescription.cpp b/src/Core/SortDescription.cpp index 1b3f81f8547..01f22b9b9e5 100644 --- a/src/Core/SortDescription.cpp +++ b/src/Core/SortDescription.cpp @@ -15,6 +15,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + void dumpSortDescription(const SortDescription & description, WriteBuffer & out) { bool first = true; @@ -209,4 +214,58 @@ JSONBuilder::ItemPtr explainSortDescription(const SortDescription & description) return json_array; } +void serializeSortDescription(const SortDescription & sort_description, WriteBuffer & out) +{ + writeVarUInt(sort_description.size(), out); + for (const auto & desc : sort_description) + { + writeStringBinary(desc.column_name, out); + + UInt8 flags = 0; + if (desc.direction > 0) + flags |= 1; + if (desc.nulls_direction > 0) + flags |= 2; + if (desc.collator) + flags |= 4; + if (desc.with_fill) + flags |= 8; + + writeIntBinary(flags, out); + + if (desc.collator) + writeStringBinary(desc.collator->getLocale(), out); + + if (desc.with_fill) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH FILL is not supported in serialized sort description"); + } +} + +void deserializeSortDescription(SortDescription & sort_description, ReadBuffer & in) +{ + size_t size = 0; + readVarUInt(size, in); + sort_description.resize(size); + for (auto & desc : sort_description) + { + readStringBinary(desc.column_name, in); + UInt8 flags = 0; + readIntBinary(flags, in); + + desc.direction = (flags & 1) ? 1 : -1; + desc.nulls_direction = (flags & 2) ? 1 : -1; + + if (flags & 4) + { + String collator_locale; + readStringBinary(collator_locale, in); + if (!collator_locale.empty()) + desc.collator = std::make_shared(collator_locale); + } + + if (flags & 8) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH FILL is not supported in deserialized sort description"); + } +} + } diff --git a/src/Core/SortDescription.h b/src/Core/SortDescription.h index 522732dad90..48a87e9e2e9 100644 --- a/src/Core/SortDescription.h +++ b/src/Core/SortDescription.h @@ -143,4 +143,11 @@ void dumpSortDescription(const SortDescription & description, WriteBuffer & out) std::string dumpSortDescription(const SortDescription & description); JSONBuilder::ItemPtr explainSortDescription(const SortDescription & description); + +class WriteBuffer; +class ReadBuffer; + +void serializeSortDescription(const SortDescription & sort_description, WriteBuffer & out); +void deserializeSortDescription(SortDescription & sort_description, ReadBuffer & in); + } diff --git a/src/Functions/FunctionsMiscellaneous.h b/src/Functions/FunctionsMiscellaneous.h index cea11cfe677..93ce7660f45 100644 --- a/src/Functions/FunctionsMiscellaneous.h +++ b/src/Functions/FunctionsMiscellaneous.h @@ -1,13 +1,13 @@ #pragma once -#include -#include -#include -#include -#include -#include #include +#include +#include #include +#include +#include +#include +#include namespace DB @@ -18,6 +18,18 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +struct LambdaCapture +{ + Names captured_names; + DataTypes captured_types; + NamesAndTypesList lambda_arguments; + String return_name; + DataTypePtr return_type; + bool allow_constant_folding; +}; + +using LambdaCapturePtr = std::shared_ptr; + class ExecutableFunctionExpression : public IExecutableFunction { public: @@ -30,14 +42,20 @@ public: using SignaturePtr = std::shared_ptr; ExecutableFunctionExpression(ExpressionActionsPtr expression_actions_, SignaturePtr signature_) - : expression_actions(std::move(expression_actions_)) - , signature(std::move(signature_)) - {} + : expression_actions(std::move(expression_actions_)), signature(std::move(signature_)) + { + } String getName() const override { return "FunctionExpression"; } - ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override + ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override { + if (input_rows_count == 0) + return result_type->createColumn(); + + if (!expression_actions) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No actions were passed to FunctionExpression"); + DB::Block expr_columns; for (size_t i = 0; i < arguments.size(); ++i) { @@ -48,7 +66,7 @@ public: expression_actions->execute(expr_columns); - return expr_columns.getByName(signature->return_name).column; + return expr_columns.getByName(signature->return_name).column; } bool useDefaultImplementationForNulls() const override { return false; } @@ -71,13 +89,27 @@ public: using Signature = ExecutableFunctionExpression::Signature; using SignaturePtr = ExecutableFunctionExpression::SignaturePtr; - FunctionExpression(ExpressionActionsPtr expression_actions_, - DataTypes argument_types_, const Names & argument_names_, - DataTypePtr return_type_, const std::string & return_name_) - : expression_actions(std::move(expression_actions_)) - , signature(std::make_shared(Signature{argument_names_, return_name_})) - , argument_types(std::move(argument_types_)), return_type(std::move(return_type_)) + FunctionExpression(LambdaCapturePtr capture_, ExpressionActionsPtr expression_actions_) + : expression_actions(std::move(expression_actions_)) + , capture(std::move(capture_)) { + Names names; + DataTypes types; + + names.reserve(capture->captured_names.size() + capture->lambda_arguments.size()); + names.insert(names.end(), capture->captured_names.begin(), capture->captured_names.end()); + + types.reserve(capture->captured_types.size() + capture->lambda_arguments.size()); + types.insert(types.end(), capture->captured_types.begin(), capture->captured_types.end()); + + for (const auto & lambda_argument : capture->lambda_arguments) + { + names.push_back(lambda_argument.name); + types.push_back(lambda_argument.type); + } + + argument_types = std::move(types); + signature = std::make_shared(Signature{names, capture->return_name}); } String getName() const override { return "FunctionExpression"; } @@ -85,7 +117,10 @@ public: bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; } const DataTypes & getArgumentTypes() const override { return argument_types; } - const DataTypePtr & getResultType() const override { return return_type; } + const DataTypePtr & getResultType() const override { return capture->return_type; } + + const LambdaCapture & getCapture() const { return *capture; } + const ActionsDAG & getAcionsDAG() const { return expression_actions->getActionsDAG(); } ExecutableFunctionPtr prepare(const ColumnsWithTypeAndName &) const override { @@ -94,9 +129,11 @@ public: private: ExpressionActionsPtr expression_actions; + LambdaCapturePtr capture; + + /// This is redundant and is built from capture. SignaturePtr signature; DataTypes argument_types; - DataTypePtr return_type; }; /// Captures columns which are used by lambda function but not in argument list. @@ -106,20 +143,10 @@ private: class ExecutableFunctionCapture : public IExecutableFunction { public: - struct Capture + ExecutableFunctionCapture(ExpressionActionsPtr expression_actions_, LambdaCapturePtr capture_) + : expression_actions(std::move(expression_actions_)), capture(std::move(capture_)) { - Names captured_names; - DataTypes captured_types; - NamesAndTypesList lambda_arguments; - String return_name; - DataTypePtr return_type; - bool allow_constant_folding; - }; - - using CapturePtr = std::shared_ptr; - - ExecutableFunctionCapture(ExpressionActionsPtr expression_actions_, CapturePtr capture_) - : expression_actions(std::move(expression_actions_)), capture(std::move(capture_)) {} + } String getName() const override { return "FunctionCapture"; } @@ -148,8 +175,7 @@ public: types.push_back(lambda_argument.type); } - auto function = std::make_unique(expression_actions, types, names, - capture->return_type, capture->return_name); + auto function = std::make_unique(capture, expression_actions); /// If all the captured arguments are constant, let's also return ColumnConst (with ColumnFunction inside it). /// Consequently, it allows to treat higher order functions with constant arrays and constant captured columns @@ -175,17 +201,15 @@ public: private: ExpressionActionsPtr expression_actions; - CapturePtr capture; + LambdaCapturePtr capture; }; class FunctionCapture : public IFunctionBase { public: - using CapturePtr = ExecutableFunctionCapture::CapturePtr; - FunctionCapture( ExpressionActionsPtr expression_actions_, - CapturePtr capture_, + LambdaCapturePtr capture_, DataTypePtr return_type_, String name_) : expression_actions(std::move(expression_actions_)) @@ -207,9 +231,12 @@ public: return std::make_unique(expression_actions, capture); } + const LambdaCapture & getCapture() const { return *capture; } + const ActionsDAG & getAcionsDAG() const { return expression_actions->getActionsDAG(); } + private: ExpressionActionsPtr expression_actions; - CapturePtr capture; + LambdaCapturePtr capture; DataTypePtr return_type; String name; }; @@ -217,28 +244,23 @@ private: class FunctionCaptureOverloadResolver : public IFunctionOverloadResolver { public: - using Capture = ExecutableFunctionCapture::Capture; - using CapturePtr = ExecutableFunctionCapture::CapturePtr; - FunctionCaptureOverloadResolver( - ExpressionActionsPtr expression_actions_, - const Names & captured_names, - const NamesAndTypesList & lambda_arguments, - const DataTypePtr & function_return_type, - const String & expression_return_name, - bool allow_constant_folding) - : expression_actions(std::move(expression_actions_)) + ActionsDAG actions_dag, + const ExpressionActionsSettings & actions_settings, + const Names & captured_names, + const NamesAndTypesList & lambda_arguments, + const DataTypePtr & function_return_type, + const String & expression_return_name, + bool allow_constant_folding) { /// Check that expression does not contain unusual actions that will break columns structure. - for (const auto & action : expression_actions->getActions()) - if (action.node->type == ActionsDAG::ActionType::ARRAY_JOIN) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression with arrayJoin or other unusual action cannot be captured"); + if (actions_dag.hasArrayJoin()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expression with arrayJoin or other unusual action cannot be captured"); std::unordered_map arguments_map; - const auto & all_arguments = expression_actions->getRequiredColumnsWithTypes(); - for (const auto & arg : all_arguments) - arguments_map[arg.name] = arg.type; + for (const auto * input : actions_dag.getInputs()) + arguments_map[input->result_name] = input->result_type; DataTypes captured_types; captured_types.reserve(captured_names.size()); @@ -263,14 +285,16 @@ public: name = "Capture[" + toString(captured_types) + "](" + toString(argument_types) + ") -> " + function_return_type->getName(); - capture = std::make_shared(Capture{ - .captured_names = captured_names, - .captured_types = std::move(captured_types), - .lambda_arguments = lambda_arguments, - .return_name = expression_return_name, - .return_type = function_return_type, - .allow_constant_folding = allow_constant_folding, + capture = std::make_shared(LambdaCapture{ + .captured_names = captured_names, + .captured_types = std::move(captured_types), + .lambda_arguments = lambda_arguments, + .return_name = expression_return_name, + .return_type = function_return_type, + .allow_constant_folding = allow_constant_folding, }); + + expression_actions = std::make_shared(std::move(actions_dag), actions_settings); } String getName() const override { return name; } @@ -288,7 +312,7 @@ public: private: ExpressionActionsPtr expression_actions; - CapturePtr capture; + LambdaCapturePtr capture; DataTypePtr return_type; String name; diff --git a/src/Interpreters/ActionsDAG.cpp b/src/Interpreters/ActionsDAG.cpp index 8d95b6520c1..2ef874fb84a 100644 --- a/src/Interpreters/ActionsDAG.cpp +++ b/src/Interpreters/ActionsDAG.cpp @@ -3,15 +3,21 @@ #include #include #include +#include +#include +#include #include +#include #include #include #include +#include #include #include #include #include #include +#include #include #include #include @@ -20,6 +26,7 @@ #include #include #include +#include #include #include @@ -38,6 +45,7 @@ namespace ErrorCodes extern const int ILLEGAL_COLUMN; extern const int NOT_FOUND_COLUMN_IN_BLOCK; extern const int BAD_ARGUMENTS; + extern const int INCORRECT_DATA; } namespace @@ -3205,4 +3213,430 @@ const ActionsDAG::Node * FindOriginalNodeForOutputName::find(const String & outp return it->second; } + +static void serializeCapture(const LambdaCapture & capture, WriteBuffer & out) +{ + writeStringBinary(capture.return_name, out); + encodeDataType(capture.return_type, out); + + writeVarUInt(capture.captured_names.size(), out); + for (const auto & name : capture.captured_names) + writeStringBinary(name, out); + + writeVarUInt(capture.captured_types.size(), out); + for (const auto & type : capture.captured_types) + encodeDataType(type, out); + + writeVarUInt(capture.lambda_arguments.size(), out); + for (const auto & item : capture.lambda_arguments) + { + writeStringBinary(item.name, out); + encodeDataType(item.type, out); + } +} + +static void deserializeCapture(LambdaCapture & capture, ReadBuffer & in) +{ + readStringBinary(capture.return_name, in); + capture.return_type = decodeDataType(in); + + UInt64 num_names; + readVarUInt(num_names, in); + capture.captured_names.resize(num_names); + for (auto & name : capture.captured_names) + readStringBinary(name, in); + + UInt64 num_types; + readVarUInt(num_types, in); + capture.captured_types.resize(num_types); + for (auto & type : capture.captured_types) + type = decodeDataType(in); + + UInt64 num_args; + readVarUInt(num_args, in); + capture.lambda_arguments.clear(); + for (size_t i = 0; i < num_args; ++i) + { + NameAndTypePair name_and_type; + readStringBinary(name_and_type.name, in); + name_and_type.type = decodeDataType(in); + capture.lambda_arguments.push_back(std::move(name_and_type)); + } +} + +static void serialzieConstant( + const IDataType & type, + const IColumn & value, + WriteBuffer & out, + SerializedSetsRegistry & registry) +{ + if (WhichDataType(type).isSet()) + { + const IColumn * maybe_set = &value; + if (const auto * column_const = typeid_cast(maybe_set)) + maybe_set = &column_const->getDataColumn(); + + const auto * column_set = typeid_cast(maybe_set); + if (!column_set) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "ColumnSet is expected for DataTypeSet. Got {}", value.getName()); + + auto hash = column_set->getData()->getHash(); + writeBinary(hash, out); + registry.sets.emplace(hash, column_set->getData()); + return; + } + + if (WhichDataType(type).isFunction()) + { + const IColumn * maybe_function = &value; + if (const auto * column_const = typeid_cast(maybe_function)) + maybe_function = &column_const->getDataColumn(); + + const auto * column_function = typeid_cast(maybe_function); + if (!column_function) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "ColumnSet is expected for DataTypeSet. Got {}", value.getName()); + + auto function = column_function->getFunction(); + const auto * function_expression = typeid_cast(function.get()); + if (!function_expression) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected FunctionExpression for ColumnFunction. Got {}", function->getName()); + + serializeCapture(function_expression->getCapture(), out); + function_expression->getAcionsDAG().serialize(out, registry); + + const auto & captured_columns = column_function->getCapturedColumns(); + writeVarUInt(captured_columns.size(), out); + for (const auto & captured_column : captured_columns) + { + encodeDataType(captured_column.type, out); + serialzieConstant(*captured_column.type, *captured_column.column, out, registry); + } + + return; + } + + const auto * const_column = typeid_cast(&value); + if (!const_column) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot serialize non-constant column {}", value.getName()); + + const auto & data = const_column->getDataColumn(); + type.getDefaultSerialization()->serializeBinary(data, 0, out, FormatSettings{}); +} + +static MutableColumnPtr deserializeConstant( + const IDataType & type, + ReadBuffer & in, + DeserializedSetsRegistry & registry, + const ContextPtr & context) +{ + if (WhichDataType(type).isSet()) + { + FutureSet::Hash hash; + readBinary(hash, in); + + auto column_set = ColumnSet::create(0, nullptr); + registry.sets[hash].push_back(column_set.get()); + + return column_set; + } + + if (WhichDataType(type).isFunction()) + { + LambdaCapture capture; + deserializeCapture(capture, in); + auto capture_dag = ActionsDAG::deserialize(in, registry, context); + + UInt64 num_captured_columns; + readVarUInt(num_captured_columns, in); + ColumnsWithTypeAndName captured_columns(num_captured_columns); + + for (auto & captured_column : captured_columns) + { + captured_column.type = decodeDataType(in); + captured_column.column = deserializeConstant(*captured_column.type, in, registry, context); + } + + auto function_expression = std::make_shared( + std::make_shared(std::move(capture)), + std::make_shared( + std::move(capture_dag), + ExpressionActionsSettings::fromContext(context, CompileExpressions::yes))); + + return ColumnFunction::create(1, std::move(function_expression), std::move(captured_columns)); + } + + auto column = type.createColumn(); + type.getDefaultSerialization()->deserializeBinary(*column, in, FormatSettings{}); + return ColumnConst::create(std::move(column), 0); +} + +void ActionsDAG::serialize(WriteBuffer & out, SerializedSetsRegistry & registry) const +{ + size_t nodes_size = nodes.size(); + writeVarUInt(nodes_size, out); + + std::unordered_map node_to_id; + for (const auto & node : nodes) + node_to_id.emplace(&node, node_to_id.size()); + + if (nodes_size != node_to_id.size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Duplicate nodes in ActionsDAG"); + + for (const auto & node : nodes) + { + writeIntBinary(static_cast(node.type), out); + writeStringBinary(node.result_name, out); + encodeDataType(node.result_type, out); + + writeVarUInt(node.children.size(), out); + for (const auto * child : node.children) + writeVarUInt(node_to_id.at(child), out); + + /// Serialize column if it is present + const bool has_column = node.column != nullptr; + UInt8 column_flags = 0; + if (has_column) + { + column_flags |= 1; + if (node.is_deterministic_constant) + column_flags |= 2; + } + + const auto * function_capture = typeid_cast(node.function_base.get()); + if (function_capture) + column_flags |= 4; + + writeIntBinary(column_flags, out); + + if (has_column) + serialzieConstant(*node.result_type, *node.column, out, registry); + + if (node.type == ActionType::INPUT) + { + /// nothing to serialize + } + else if (node.type == ActionType::COLUMN) + { + /// nothing to serialize, column is already serialized + } + else if (node.type == ActionType::ALIAS) + { + /// nothing to serialize + } + else if (node.type == ActionType::FUNCTION) + { + writeStringBinary(node.function_base->getName(), out); + if (function_capture) + { + serializeCapture(function_capture->getCapture(), out); + function_capture->getAcionsDAG().serialize(out, registry); + } + } + else if (node.type == ActionType::ARRAY_JOIN) + { + /// nothing to serialize + } + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown node type {}", static_cast(node.type)); + } + } + + writeVarUInt(inputs.size(), out); + for (const auto * input : inputs) + writeVarUInt(node_to_id.at(input), out); + + writeVarUInt(outputs.size(), out); + for (const auto * output : outputs) + writeVarUInt(node_to_id.at(output), out); +} + +ActionsDAG ActionsDAG::deserialize(ReadBuffer & in, DeserializedSetsRegistry & registry, const ContextPtr & context) +{ + size_t nodes_size; + readVarUInt(nodes_size, in); + + std::list nodes; + std::unordered_map id_to_node; + for (size_t i = 0; i < nodes_size; ++i) + id_to_node.emplace(i, &nodes.emplace_back(Node{})); + + for (size_t i = 0; i < nodes_size; ++i) + { + Node & node = *id_to_node.at(i); + + UInt8 action_type{0}; + readIntBinary(action_type, in); + if (action_type > static_cast(ActionType::FUNCTION)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown action type {}", size_t(action_type)); + node.type = static_cast(action_type); + + readStringBinary(node.result_name, in); + node.result_type = decodeDataType(in); + + size_t children_size; + readVarUInt(children_size, in); + for (size_t j = 0; j < children_size; ++j) + { + size_t child_id; + readVarUInt(child_id, in); + node.children.push_back(id_to_node.at(child_id)); + } + + UInt8 column_flags = 0; + readIntBinary(column_flags, in); + + /// Deserialize column if it is present + if (column_flags & 1) + { + if ((column_flags & 2) == 0) + node.is_deterministic_constant = false; + + node.column = deserializeConstant(*node.result_type, in, registry, context); + } + + if (node.type == ActionType::INPUT) + { + /// nothing to deserialize + if (!node.children.empty()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized input can't have children"); + } + else if (node.type == ActionType::COLUMN) + { + /// Column is already deserialized + if (!node.children.empty()) + throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized column can't have children"); + } + else if (node.type == ActionType::ALIAS) + { + /// nothing to deserialize + if (node.children.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized alias must have one children"); + } + else if (node.type == ActionType::FUNCTION) + { + String function_name; + readStringBinary(function_name, in); + + ColumnsWithTypeAndName arguments; + arguments.reserve(node.children.size()); + for (const auto * child : node.children) + { + ColumnWithTypeAndName argument; + argument.column = child->column; + argument.type = child->result_type; + argument.name = child->result_name; + + arguments.emplace_back(std::move(argument)); + } + + if (column_flags & 4) + { + LambdaCapture capture; + deserializeCapture(capture, in); + auto capture_dag = ActionsDAG::deserialize(in, registry, context); + + node.function_base = std::make_shared( + std::make_shared( + std::move(capture_dag), + ExpressionActionsSettings::fromContext(context, CompileExpressions::yes)), + std::make_shared(std::move(capture)), + node.result_type, + function_name); + + node.function = node.function_base->prepare(arguments); + node.is_function_compiled = false; + } + else + { + auto function = FunctionFactory::instance().get(function_name, context); + + node.function_base = function->build(arguments); + node.function = node.function_base->prepare(arguments); + node.is_function_compiled = false; + + auto lhs_type = node.result_type; + if (const auto * lhs_tuple = typeid_cast(lhs_type.get())) + lhs_type = std::make_shared(lhs_tuple->getElements()); + + auto rhs_type = node.function_base->getResultType(); + if (const auto * rhs_tuple = typeid_cast(rhs_type.get())) + rhs_type = std::make_shared(rhs_tuple->getElements()); + + if (!lhs_type->equals(*lhs_type)) + throw Exception(ErrorCodes::INCORRECT_DATA, + "Deserialized function {} has invalid type. Expected {}, deserialized {}.", + function_name, + rhs_type->getName(), + lhs_type->getName()); + } + } + else if (node.type == ActionType::ARRAY_JOIN) + { + /// nothing to deserialize + if (node.children.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "Deserialized array join must have one children"); + } + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown node type {}", static_cast(node.type)); + } + } + + size_t inputs_size; + readVarUInt(inputs_size, in); + std::vector inputs; + std::unordered_set inputs_set; + inputs.reserve(inputs_size); + for (size_t i = 0; i < inputs_size; ++i) + { + size_t input_id; + readVarUInt(input_id, in); + const auto * input = id_to_node.at(input_id); + + if (input->type != ActionType::INPUT) + throw Exception(ErrorCodes::INCORRECT_DATA, + "Deserialized input {} has type {}", + input->result_name, magic_enum::enum_name(input->type)); + + if (!inputs_set.emplace(input).second) + throw Exception(ErrorCodes::INCORRECT_DATA, + "Duplicate input {}", input->result_name); + + inputs.push_back(input); + } + + size_t outputs_size; + readVarUInt(outputs_size, in); + std::vector outputs; + outputs.reserve(outputs_size); + for (size_t i = 0; i < outputs_size; ++i) + { + size_t output_id; + readVarUInt(output_id, in); + outputs.push_back(id_to_node.at(output_id)); + } + + for (const auto & node : nodes) + if (node.type == ActionType::INPUT && !inputs_set.contains(&node)) + throw Exception(ErrorCodes::INCORRECT_DATA, + "Deserialized input {} is not in the inputs list", + node.result_name); + + ActionsDAG dag; + dag.nodes = std::move(nodes); + dag.inputs = std::move(inputs); + dag.outputs = std::move(outputs); + + return dag; +} + } diff --git a/src/Interpreters/ActionsDAG.h b/src/Interpreters/ActionsDAG.h index 7e88f77a835..bc9b1c50db0 100644 --- a/src/Interpreters/ActionsDAG.h +++ b/src/Interpreters/ActionsDAG.h @@ -35,6 +35,9 @@ namespace JSONBuilder class SortDescription; +struct SerializedSetsRegistry; +struct DeserializedSetsRegistry; + /// Directed acyclic graph of expressions. /// This is an intermediate representation of actions which is usually built from expression list AST. /// Node of DAG describe calculation of a single column with known type, name, and constant value (if applicable). @@ -130,6 +133,9 @@ public: std::string dumpNames() const; std::string dumpDAG() const; + void serialize(WriteBuffer & out, SerializedSetsRegistry & registry) const; + static ActionsDAG deserialize(ReadBuffer & in, DeserializedSetsRegistry & registry, const ContextPtr & context); + const Node & addInput(std::string name, DataTypePtr type); const Node & addInput(ColumnWithTypeAndName column); const Node & addColumn(ColumnWithTypeAndName column); diff --git a/src/Interpreters/ActionsVisitor.cpp b/src/Interpreters/ActionsVisitor.cpp index 696021b418c..f1cbcef83c1 100644 --- a/src/Interpreters/ActionsVisitor.cpp +++ b/src/Interpreters/ActionsVisitor.cpp @@ -111,7 +111,7 @@ static size_t getTypeDepth(const DataTypePtr & type) /// 33.33 in the set is converted to 33.3, but it is not equal to 33.3 in the column, so the result should still be empty. /// We can not include values that don't represent any possible value from the type of filtered column to the set. template -static Block createBlockFromCollection(const Collection & collection, const DataTypes & value_types, const DataTypes & types, bool transform_null_in) +static ColumnsWithTypeAndName createBlockFromCollection(const Collection & collection, const DataTypes & value_types, const DataTypes & types, bool transform_null_in) { size_t columns_num = types.size(); MutableColumns columns(columns_num); @@ -169,9 +169,13 @@ static Block createBlockFromCollection(const Collection & collection, const Data } } - Block res; + ColumnsWithTypeAndName res(columns_num); for (size_t i = 0; i < columns_num; ++i) - res.insert(ColumnWithTypeAndName{std::move(columns[i]), types[i], "_" + toString(i)}); + { + res[i].type = types[i]; + res[i].column = std::move(columns[i]); + } + return res; } @@ -189,16 +193,14 @@ static Field extractValueFromNode(const ASTPtr & node, const IDataType & type, C throw Exception(ErrorCodes::INCORRECT_ELEMENT_OF_SET, "Incorrect element of set. Must be literal or constant expression."); } -static Block createBlockFromAST(const ASTPtr & node, const DataTypes & types, ContextPtr context) +static ColumnsWithTypeAndName createBlockFromAST(const ASTPtr & node, const DataTypes & types, ContextPtr context) { /// Will form a block with values from the set. - Block header; size_t num_columns = types.size(); + MutableColumns columns(num_columns); for (size_t i = 0; i < num_columns; ++i) - header.insert(ColumnWithTypeAndName(types[i]->createColumn(), types[i], "_" + toString(i))); - - MutableColumns columns = header.cloneEmptyColumns(); + columns[i] = types[i]->createColumn(); DataTypePtr tuple_type; Row tuple_values; @@ -290,7 +292,14 @@ static Block createBlockFromAST(const ASTPtr & node, const DataTypes & types, Co throw Exception(ErrorCodes::INCORRECT_ELEMENT_OF_SET, "Incorrect element of set"); } - return header.cloneWithColumns(std::move(columns)); + ColumnsWithTypeAndName res(num_columns); + for (size_t i = 0; i < num_columns; ++i) + { + res[i].type = types[i]; + res[i].column = std::move(columns[i]); + } + + return res; } @@ -304,7 +313,7 @@ namespace * We need special implementation for ASTFunction, because in case, when we interpret * large tuple or array as function, `evaluateConstantExpression` works extremely slow. */ -Block createBlockForSet( +ColumnsWithTypeAndName createBlockForSet( const DataTypePtr & left_arg_type, const ASTPtr & right_arg, const DataTypes & set_element_types, @@ -321,7 +330,7 @@ Block createBlockForSet( type->getName()); }; - Block block; + ColumnsWithTypeAndName block; bool tranform_null_in = context->getSettingsRef()[Setting::transform_null_in]; /// 1 in 1; (1, 2) in (1, 2); identity(tuple(tuple(tuple(1)))) in tuple(tuple(tuple(1))); etc. @@ -360,7 +369,7 @@ Block createBlockForSet( * 'set_element_types' - types of what are on the left hand side of IN. * 'right_arg' - Literal - Tuple or Array. */ -Block createBlockForSet( +ColumnsWithTypeAndName createBlockForSet( const DataTypePtr & left_arg_type, const std::shared_ptr & right_arg, const DataTypes & set_element_types, @@ -442,14 +451,14 @@ FutureSetPtr makeExplicitSet( if (const auto * low_cardinality_type = typeid_cast(element_type.get())) element_type = low_cardinality_type->getDictionaryType(); - Block block; + ColumnsWithTypeAndName block; const auto & right_arg_func = std::dynamic_pointer_cast(right_arg); if (right_arg_func && (right_arg_func->name == "tuple" || right_arg_func->name == "array")) block = createBlockForSet(left_arg_type, right_arg_func, set_element_types, context); else block = createBlockForSet(left_arg_type, right_arg, set_element_types, context); - return prepared_sets.addFromTuple(set_key, block, context->getSettingsRef()); + return prepared_sets.addFromTuple(set_key, std::move(block), context->getSettingsRef()); } class ScopeStack::Index @@ -1291,14 +1300,10 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data & String result_name = lambda->arguments->children.at(1)->getColumnName(); lambda_dag.removeUnusedActions(Names(1, result_name)); - auto lambda_actions = std::make_shared( - std::move(lambda_dag), - ExpressionActionsSettings::fromContext(data.getContext(), CompileExpressions::yes)); - - DataTypePtr result_type = lambda_actions->getSampleBlock().getByName(result_name).type; + DataTypePtr result_type = lambda_dag.findInOutputs(result_name).result_type; Names captured; - Names required = lambda_actions->getRequiredColumns(); + Names required = lambda_dag.getRequiredColumnsNames(); for (const auto & required_arg : required) if (findColumn(required_arg, lambda_arguments) == lambda_arguments.end()) captured.push_back(required_arg); @@ -1307,8 +1312,9 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data & /// because it does not uniquely define the expression (the types of arguments can be different). String lambda_name = data.getUniqueName("__lambda"); + auto actions_settings = ExpressionActionsSettings::fromContext(data.getContext(), CompileExpressions::yes); auto function_capture = std::make_shared( - lambda_actions, captured, lambda_arguments, result_type, result_name, false); + std::move(lambda_dag), actions_settings, captured, lambda_arguments, result_type, result_name, false); data.addFunction(function_capture, captured, lambda_name); argument_types[i] = std::make_shared(lambda_type->getArgumentTypes(), result_type); diff --git a/src/Interpreters/AggregateDescription.cpp b/src/Interpreters/AggregateDescription.cpp index d4c09995b56..504ed4b8361 100644 --- a/src/Interpreters/AggregateDescription.cpp +++ b/src/Interpreters/AggregateDescription.cpp @@ -1,13 +1,21 @@ #include +#include #include #include #include #include +#include +#include namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + void AggregateDescription::explain(WriteBuffer & out, size_t indent) const { String prefix(indent, ' '); @@ -121,4 +129,77 @@ void AggregateDescription::explain(JSONBuilder::JSONMap & map) const map.add("Arguments", std::move(args_array)); } +void serializeAggregateDescriptions(const AggregateDescriptions & aggregates, WriteBuffer & out) +{ + writeVarUInt(aggregates.size(), out); + for (const auto & aggregate : aggregates) + { + writeStringBinary(aggregate.column_name, out); + + UInt64 num_args = aggregate.argument_names.size(); + const auto & argument_types = aggregate.function->getArgumentTypes(); + + if (argument_types.size() != num_args) + { + WriteBufferFromOwnString buf; + aggregate.explain(buf, 0); + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Invalid number of for aggregate function. Expected {}, got {}. Description:\n{}", + argument_types.size(), num_args, buf.str()); + } + + writeVarUInt(num_args, out); + for (size_t i = 0; i < num_args; ++i) + { + writeStringBinary(aggregate.argument_names[i], out); + encodeDataType(argument_types[i], out); + } + + writeStringBinary(aggregate.function->getName(), out); + + writeVarUInt(aggregate.parameters.size(), out); + for (const auto & param : aggregate.parameters) + writeFieldBinary(param, out); + } +} + +void deserializeAggregateDescriptions(AggregateDescriptions & aggregates, ReadBuffer & in) +{ + UInt64 num_aggregates; + readVarUInt(num_aggregates, in); + aggregates.resize(num_aggregates); + for (auto & aggregate : aggregates) + { + readStringBinary(aggregate.column_name, in); + + UInt64 num_args; + readVarUInt(num_args, in); + aggregate.argument_names.resize(num_args); + + DataTypes argument_types; + argument_types.reserve(num_args); + + for (auto & arg_name : aggregate.argument_names) + { + readStringBinary(arg_name, in); + argument_types.emplace_back(decodeDataType(in)); + } + + String function_name; + readStringBinary(function_name, in); + + UInt64 num_params; + readVarUInt(num_params, in); + aggregate.parameters.resize(num_params); + for (auto & param : aggregate.parameters) + param = readFieldBinary(in); + + auto action = NullsAction::EMPTY; /// As I understand, it should be resolved to function name. + AggregateFunctionProperties properties; + aggregate.function = AggregateFunctionFactory::instance().get( + function_name, action, argument_types, aggregate.parameters, properties); + } + +} + } diff --git a/src/Interpreters/AggregateDescription.h b/src/Interpreters/AggregateDescription.h index 0f1c0ce67ae..19c999650a8 100644 --- a/src/Interpreters/AggregateDescription.h +++ b/src/Interpreters/AggregateDescription.h @@ -25,4 +25,8 @@ struct AggregateDescription }; using AggregateDescriptions = std::vector; + +void serializeAggregateDescriptions(const AggregateDescriptions & aggregates, WriteBuffer & out); +void deserializeAggregateDescriptions(AggregateDescriptions & aggregates, ReadBuffer & in); + } diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 453309ca37d..58aaf1cc92c 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -76,22 +76,6 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } -namespace Setting -{ - extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization; - extern const SettingsUInt64 max_rows_to_group_by; - extern const SettingsOverflowModeGroupBy group_by_overflow_mode; - extern const SettingsUInt64 max_bytes_before_external_group_by; - extern const SettingsDouble max_bytes_ratio_before_external_group_by; - extern const SettingsMaxThreads max_threads; - extern const SettingsUInt64 min_free_disk_space_for_temporary_data; - extern const SettingsBool compile_aggregate_expressions; - extern const SettingsUInt64 min_count_to_compile_aggregate_expression; - extern const SettingsUInt64 max_block_size; - extern const SettingsBool enable_software_prefetch_in_aggregation; - extern const SettingsBool optimize_group_by_constant_keys; -}; - } namespace @@ -195,45 +179,59 @@ Block Aggregator::getHeader(bool final) const } Aggregator::Params::Params( - const Settings & settings, const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, + size_t max_rows_to_group_by_, + OverflowMode group_by_overflow_mode_, size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_, + size_t max_bytes_before_external_group_by_, bool empty_result_for_aggregation_by_empty_set_, TemporaryDataOnDiskScopePtr tmp_data_scope_, + size_t max_threads_, + size_t min_free_disk_space_, + bool compile_aggregate_expressions_, + size_t min_count_to_compile_aggregate_expression_, + size_t max_block_size_, + bool enable_prefetch_, bool only_merge_, // true for projections + bool optimize_group_by_constant_keys_, + float min_hit_rate_to_use_consecutive_keys_optimization_, const StatsCollectingParams & stats_collecting_params_) : keys(keys_) , keys_size(keys.size()) , aggregates(aggregates_) , aggregates_size(aggregates.size()) , overflow_row(overflow_row_) - , max_rows_to_group_by(settings[Setting::max_rows_to_group_by]) - , group_by_overflow_mode(settings[Setting::group_by_overflow_mode]) + , max_rows_to_group_by(max_rows_to_group_by_) + , group_by_overflow_mode(group_by_overflow_mode_) , group_by_two_level_threshold(group_by_two_level_threshold_) , group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_) - , max_bytes_before_external_group_by(settings[Setting::max_bytes_before_external_group_by]) + , max_bytes_before_external_group_by(max_bytes_before_external_group_by_) , empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_) , tmp_data_scope(std::move(tmp_data_scope_)) - , max_threads(settings[Setting::max_threads]) - , min_free_disk_space(settings[Setting::min_free_disk_space_for_temporary_data]) - , compile_aggregate_expressions(settings[Setting::compile_aggregate_expressions]) - , min_count_to_compile_aggregate_expression(settings[Setting::min_count_to_compile_aggregate_expression]) - , max_block_size(settings[Setting::max_block_size]) + , max_threads(max_threads_) + , min_free_disk_space(min_free_disk_space_) + , compile_aggregate_expressions(compile_aggregate_expressions_) + , min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_) + , max_block_size(max_block_size_) , only_merge(only_merge_) - , enable_prefetch(settings[Setting::enable_software_prefetch_in_aggregation]) - , optimize_group_by_constant_keys(settings[Setting::optimize_group_by_constant_keys]) - , min_hit_rate_to_use_consecutive_keys_optimization(settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization]) + , enable_prefetch(enable_prefetch_) + , optimize_group_by_constant_keys(optimize_group_by_constant_keys_) + , min_hit_rate_to_use_consecutive_keys_optimization(min_hit_rate_to_use_consecutive_keys_optimization_) , stats_collecting_params(stats_collecting_params_) { - if (settings[Setting::max_bytes_ratio_before_external_group_by] != 0.) +} + +size_t Aggregator::Params::getMaxBytesBeforeExternalGroupBy(size_t max_bytes_before_external_group_by, double max_bytes_ratio_before_external_group_by) +{ + if (max_bytes_ratio_before_external_group_by != 0.) { - if (settings[Setting::max_bytes_before_external_group_by] > 0) + if (max_bytes_before_external_group_by > 0) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Settings max_bytes_ratio_before_external_group_by and max_bytes_before_external_group_by cannot be set simultaneously"); - double ratio = settings[Setting::max_bytes_ratio_before_external_group_by]; + double ratio = max_bytes_ratio_before_external_group_by; if (ratio < 0 || ratio >= 1.) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting max_bytes_ratio_before_external_group_by should be >= 0 and < 1 ({})", ratio); @@ -251,6 +249,8 @@ Aggregator::Params::Params( LOG_WARNING(getLogger("Aggregator"), "No system memory limits configured. Ignoring max_bytes_ratio_before_external_group_by"); } } + + return max_bytes_before_external_group_by; } Aggregator::Params::Params( @@ -259,7 +259,7 @@ Aggregator::Params::Params( bool overflow_row_, size_t max_threads_, size_t max_block_size_, - double min_hit_rate_to_use_consecutive_keys_optimization_) + float min_hit_rate_to_use_consecutive_keys_optimization_) : keys(keys_) , keys_size(keys.size()) , aggregates(aggregates_) diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 6691f6d2645..2088e4f7aad 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -131,19 +131,31 @@ public: bool only_merge = false; bool enable_prefetch = false; bool optimize_group_by_constant_keys = false; - const double min_hit_rate_to_use_consecutive_keys_optimization = 0.; + const float min_hit_rate_to_use_consecutive_keys_optimization = 0.; StatsCollectingParams stats_collecting_params; + static size_t getMaxBytesBeforeExternalGroupBy(size_t max_bytes_before_external_group_by, double max_bytes_ratio_before_external_group_by); + Params( - const Settings & settings, const Names & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, + size_t max_rows_to_group_by_, + OverflowMode group_by_overflow_mode_, size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_, + size_t max_bytes_before_external_group_by_, bool empty_result_for_aggregation_by_empty_set_, TemporaryDataOnDiskScopePtr tmp_data_scope_, + size_t max_threads_, + size_t min_free_disk_space_, + bool compile_aggregate_expressions_, + size_t min_count_to_compile_aggregate_expression_, + size_t max_block_size_, + bool enable_prefetch_, bool only_merge_, // true for projections + bool optimize_group_by_constant_keys_, + float min_hit_rate_to_use_consecutive_keys_optimization_, const StatsCollectingParams & stats_collecting_params_); /// Only parameters that matter during merge. @@ -153,7 +165,7 @@ public: bool overflow_row_, size_t max_threads_, size_t max_block_size_, - double min_hit_rate_to_use_consecutive_keys_optimization_); + float min_hit_rate_to_use_consecutive_keys_optimization_); Params cloneWithKeys(const Names & keys_, bool only_merge_ = false) { diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 8e85f81b6c6..e4e44e9cbad 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -131,7 +131,6 @@ namespace Setting extern const SettingsBool extremes; extern const SettingsBool final; extern const SettingsBool force_aggregation_in_order; - extern const SettingsOverflowModeGroupBy group_by_overflow_mode; extern const SettingsUInt64 group_by_two_level_threshold; extern const SettingsUInt64 group_by_two_level_threshold_bytes; extern const SettingsBool group_by_use_nulls; @@ -149,14 +148,12 @@ namespace Setting extern const SettingsUInt64 max_result_rows; extern const SettingsUInt64 max_rows_in_distinct; extern const SettingsUInt64 max_rows_in_set_to_optimize_join; - extern const SettingsUInt64 max_rows_to_group_by; extern const SettingsUInt64 max_rows_to_read; extern const SettingsUInt64 max_size_to_preallocate_for_aggregation; extern const SettingsFloat max_streams_to_max_threads_ratio; extern const SettingsUInt64 max_subquery_depth; extern const SettingsMaxThreads max_threads; extern const SettingsUInt64 min_count_to_compile_sort_description; - extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization; extern const SettingsBool multiple_joins_try_to_keep_original_names; extern const SettingsBool optimize_aggregation_in_order; extern const SettingsBool optimize_move_to_prewhere; @@ -178,6 +175,16 @@ namespace Setting extern const SettingsTotalsMode totals_mode; extern const SettingsBool use_concurrency_control; extern const SettingsBool use_with_fill_by_sorting_prefix; + extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization; + extern const SettingsUInt64 max_rows_to_group_by; + extern const SettingsOverflowModeGroupBy group_by_overflow_mode; + extern const SettingsUInt64 max_bytes_before_external_group_by; + extern const SettingsDouble max_bytes_ratio_before_external_group_by; + extern const SettingsUInt64 min_free_disk_space_for_temporary_data; + extern const SettingsBool compile_aggregate_expressions; + extern const SettingsUInt64 min_count_to_compile_aggregate_expression; + extern const SettingsBool enable_software_prefetch_in_aggregation; + extern const SettingsBool optimize_group_by_constant_keys; } namespace ServerSetting @@ -2751,16 +2758,29 @@ static Aggregator::Params getAggregatorParams( return Aggregator::Params { - settings, keys, aggregates, overflow_row, + settings[Setting::max_rows_to_group_by], + settings[Setting::group_by_overflow_mode], group_by_two_level_threshold, group_by_two_level_threshold_bytes, - settings[Setting::empty_result_for_aggregation_by_empty_set] || (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && keys.empty() && query_analyzer.hasConstAggregationKeys()), + Aggregator::Params::getMaxBytesBeforeExternalGroupBy(settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]), + settings[Setting::empty_result_for_aggregation_by_empty_set] + || (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && keys.empty() + && query_analyzer.hasConstAggregationKeys()), context.getTempDataOnDisk(), - /* only_merge_= */ false, - stats_collecting_params}; + settings[Setting::max_threads], + settings[Setting::min_free_disk_space_for_temporary_data], + settings[Setting::compile_aggregate_expressions], + settings[Setting::min_count_to_compile_aggregate_expression], + settings[Setting::max_block_size], + settings[Setting::enable_software_prefetch_in_aggregation], + /* only_merge */ false, + settings[Setting::optimize_group_by_constant_keys], + settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization], + stats_collecting_params + }; } void InterpreterSelectQuery::executeAggregation( diff --git a/src/Interpreters/PreparedSets.cpp b/src/Interpreters/PreparedSets.cpp index c69e2f84d42..8de6edc5d21 100644 --- a/src/Interpreters/PreparedSets.cpp +++ b/src/Interpreters/PreparedSets.cpp @@ -38,7 +38,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -static SizeLimits getSizeLimitsForSet(const Settings & settings) +SizeLimits PreparedSets::getSizeLimitsForSet(const Settings & settings) { return SizeLimits(settings[Setting::max_rows_in_set], settings[Setting::max_bytes_in_set], settings[Setting::set_overflow_mode]); } @@ -59,8 +59,9 @@ static bool equals(const DataTypes & lhs, const DataTypes & rhs) } -FutureSetFromStorage::FutureSetFromStorage(SetPtr set_) : set(std::move(set_)) {} +FutureSetFromStorage::FutureSetFromStorage(Hash hash_, SetPtr set_) : hash(hash_), set(std::move(set_)) {} SetPtr FutureSetFromStorage::get() const { return set; } +FutureSet::Hash FutureSetFromStorage::getHash() const { return hash; } DataTypes FutureSetFromStorage::getTypes() const { return set->getElementsTypes(); } SetPtr FutureSetFromStorage::buildOrderedSetInplace(const ContextPtr &) @@ -69,24 +70,41 @@ SetPtr FutureSetFromStorage::buildOrderedSetInplace(const ContextPtr &) } -FutureSetFromTuple::FutureSetFromTuple(Block block, const Settings & settings) +FutureSetFromTuple::FutureSetFromTuple( + Hash hash_, ColumnsWithTypeAndName block, + bool transform_null_in, SizeLimits size_limits) + : hash(hash_) { - auto size_limits = getSizeLimitsForSet(settings); - set = std::make_shared(size_limits, settings[Setting::use_index_for_in_with_subqueries_max_values], settings[Setting::transform_null_in]); - set->setHeader(block.cloneEmpty().getColumnsWithTypeAndName()); + ColumnsWithTypeAndName header = block; + for (auto & elem : header) + elem.column = elem.column->cloneEmpty(); + set = std::make_shared(size_limits, 0, transform_null_in); + + set->setHeader(header); Columns columns; - columns.reserve(block.columns()); + columns.reserve(block.size()); + size_t num_rows = 0; for (const auto & column : block) + { columns.emplace_back(column.column); + num_rows = column.column->size(); + } - set_key_columns.filter = ColumnUInt8::create(block.rows()); + set_key_columns.filter = ColumnUInt8::create(num_rows); set->insertFromColumns(columns, set_key_columns); set->finishInsert(); + + for (const auto & type : set->getElementsTypes()) + { + auto name = type->getName(); + hash = CityHash_v1_0_2::CityHash128WithSeed(name.data(), name.size(), hash); + } } DataTypes FutureSetFromTuple::getTypes() const { return set->getElementsTypes(); } +FutureSet::Hash FutureSetFromTuple::getHash() const { return hash; } SetPtr FutureSetFromTuple::buildOrderedSetInplace(const ContextPtr & context) { @@ -107,34 +125,33 @@ SetPtr FutureSetFromTuple::buildOrderedSetInplace(const ContextPtr & context) FutureSetFromSubquery::FutureSetFromSubquery( - String key, + Hash hash_, std::unique_ptr source_, StoragePtr external_table_, std::shared_ptr external_table_set_, - const Settings & settings) - : external_table(std::move(external_table_)), external_table_set(std::move(external_table_set_)), source(std::move(source_)) + bool transform_null_in, + SizeLimits size_limits, + size_t max_size_for_index) + : hash(hash_), external_table(std::move(external_table_)), external_table_set(std::move(external_table_set_)), source(std::move(source_)) { set_and_key = std::make_shared(); - set_and_key->key = std::move(key); + set_and_key->key = PreparedSets::toString(hash_, {}); - auto size_limits = getSizeLimitsForSet(settings); - set_and_key->set - = std::make_shared(size_limits, settings[Setting::use_index_for_in_with_subqueries_max_values], settings[Setting::transform_null_in]); + set_and_key->set = std::make_shared(size_limits, max_size_for_index, transform_null_in); set_and_key->set->setHeader(source->getCurrentHeader().getColumnsWithTypeAndName()); } FutureSetFromSubquery::FutureSetFromSubquery( - String key, + Hash hash_, QueryTreeNodePtr query_tree_, - const Settings & settings) - : query_tree(std::move(query_tree_)) + bool transform_null_in, + SizeLimits size_limits, + size_t max_size_for_index) + : hash(hash_), query_tree(std::move(query_tree_)) { set_and_key = std::make_shared(); - set_and_key->key = std::move(key); - - auto size_limits = getSizeLimitsForSet(settings); - set_and_key->set - = std::make_shared(size_limits, settings[Setting::use_index_for_in_with_subqueries_max_values], settings[Setting::transform_null_in]); + set_and_key->key = PreparedSets::toString(hash_, {}); + set_and_key->set = std::make_shared(size_limits, max_size_for_index, transform_null_in); } FutureSetFromSubquery::~FutureSetFromSubquery() = default; @@ -158,6 +175,8 @@ DataTypes FutureSetFromSubquery::getTypes() const return set_and_key->set->getElementsTypes(); } +FutureSet::Hash FutureSetFromSubquery::getHash() const { return hash; } + std::unique_ptr FutureSetFromSubquery::build(const ContextPtr & context) { if (set_and_key->set->isCreated()) @@ -266,9 +285,12 @@ String PreparedSets::toString(const PreparedSets::Hash & key, const DataTypes & return buf.str(); } -FutureSetFromTuplePtr PreparedSets::addFromTuple(const Hash & key, Block block, const Settings & settings) +FutureSetFromTuplePtr PreparedSets::addFromTuple(const Hash & key, ColumnsWithTypeAndName block, const Settings & settings) { - auto from_tuple = std::make_shared(std::move(block), settings); + auto size_limits = getSizeLimitsForSet(settings); + auto from_tuple = std::make_shared( + key, std::move(block), + settings[Setting::transform_null_in], size_limits); const auto & set_types = from_tuple->getTypes(); auto & sets_by_hash = sets_from_tuple[key]; @@ -282,7 +304,7 @@ FutureSetFromTuplePtr PreparedSets::addFromTuple(const Hash & key, Block block, FutureSetFromStoragePtr PreparedSets::addFromStorage(const Hash & key, SetPtr set_) { - auto from_storage = std::make_shared(std::move(set_)); + auto from_storage = std::make_shared(key, std::move(set_)); auto [it, inserted] = sets_from_storage.emplace(key, from_storage); if (!inserted) @@ -298,8 +320,10 @@ FutureSetFromSubqueryPtr PreparedSets::addFromSubquery( FutureSetFromSubqueryPtr external_table_set, const Settings & settings) { + auto size_limits = getSizeLimitsForSet(settings); auto from_subquery = std::make_shared( - toString(key, {}), std::move(source), std::move(external_table), std::move(external_table_set), settings); + key, std::move(source), std::move(external_table), std::move(external_table_set), + settings[Setting::transform_null_in], size_limits, settings[Setting::use_index_for_in_with_subqueries_max_values]); auto [it, inserted] = sets_from_subqueries.emplace(key, from_subquery); @@ -314,10 +338,10 @@ FutureSetFromSubqueryPtr PreparedSets::addFromSubquery( QueryTreeNodePtr query_tree, const Settings & settings) { + auto size_limits = getSizeLimitsForSet(settings); auto from_subquery = std::make_shared( - toString(key, {}), - std::move(query_tree), - settings); + key, std::move(query_tree), + settings[Setting::transform_null_in], size_limits, settings[Setting::use_index_for_in_with_subqueries_max_values]); auto [it, inserted] = sets_from_subqueries.emplace(key, from_subquery); diff --git a/src/Interpreters/PreparedSets.h b/src/Interpreters/PreparedSets.h index a6aee974d0e..c3004e94fa8 100644 --- a/src/Interpreters/PreparedSets.h +++ b/src/Interpreters/PreparedSets.h @@ -9,6 +9,8 @@ #include #include #include +#include +#include namespace DB { @@ -50,6 +52,9 @@ public: virtual DataTypes getTypes() const = 0; /// If possible, return set with stored elements useful for PK analysis. virtual SetPtr buildOrderedSetInplace(const ContextPtr & context) = 0; + + using Hash = CityHash_v1_0_2::uint128; + virtual Hash getHash() const = 0; }; using FutureSetPtr = std::shared_ptr; @@ -59,13 +64,15 @@ using FutureSetPtr = std::shared_ptr; class FutureSetFromStorage final : public FutureSet { public: - explicit FutureSetFromStorage(SetPtr set_); + explicit FutureSetFromStorage(Hash hash_, SetPtr set_); SetPtr get() const override; DataTypes getTypes() const override; SetPtr buildOrderedSetInplace(const ContextPtr &) override; + Hash getHash() const override; private: + Hash hash; SetPtr set; }; @@ -76,14 +83,16 @@ using FutureSetFromStoragePtr = std::shared_ptr; class FutureSetFromTuple final : public FutureSet { public: - FutureSetFromTuple(Block block, const Settings & settings); + FutureSetFromTuple(Hash hash_, ColumnsWithTypeAndName block, bool transform_null_in, SizeLimits size_limits); SetPtr get() const override { return set; } SetPtr buildOrderedSetInplace(const ContextPtr & context) override; DataTypes getTypes() const override; + Hash getHash() const override; private: + Hash hash; SetPtr set; SetKeyColumns set_key_columns; }; @@ -106,21 +115,26 @@ class FutureSetFromSubquery final : public FutureSet { public: FutureSetFromSubquery( - String key, + Hash hash_, std::unique_ptr source_, StoragePtr external_table_, std::shared_ptr external_table_set_, - const Settings & settings); + bool transform_null_in, + SizeLimits size_limits, + size_t max_size_for_index); FutureSetFromSubquery( - String key, + Hash hash_, QueryTreeNodePtr query_tree_, - const Settings & settings); + bool transform_null_in, + SizeLimits size_limits, + size_t max_size_for_index); ~FutureSetFromSubquery() override; SetPtr get() const override; DataTypes getTypes() const override; + Hash getHash() const override; SetPtr buildOrderedSetInplace(const ContextPtr & context) override; std::unique_ptr build(const ContextPtr & context); @@ -130,6 +144,7 @@ public: void setQueryPlan(std::unique_ptr source_); private: + Hash hash; SetAndKeyPtr set_and_key; StoragePtr external_table; std::shared_ptr external_table_set; @@ -156,7 +171,7 @@ public: using SetsFromSubqueries = std::unordered_map; FutureSetFromStoragePtr addFromStorage(const Hash & key, SetPtr set_); - FutureSetFromTuplePtr addFromTuple(const Hash & key, Block block, const Settings & settings); + FutureSetFromTuplePtr addFromTuple(const Hash & key, ColumnsWithTypeAndName block, const Settings & settings); FutureSetFromSubqueryPtr addFromSubquery( const Hash & key, @@ -183,6 +198,7 @@ public: // const SetsFromSubqueries & getSetsFromSubquery() const { return sets_from_subqueries; } static String toString(const Hash & key, const DataTypes & types); + static SizeLimits getSizeLimitsForSet(const Settings & settings); private: SetsFromTuple sets_from_tuple; diff --git a/src/Interpreters/SetSerialization.h b/src/Interpreters/SetSerialization.h new file mode 100644 index 00000000000..1413aa556da --- /dev/null +++ b/src/Interpreters/SetSerialization.h @@ -0,0 +1,32 @@ +#pragma once +#include + +namespace DB +{ + +class FutureSet; +using FutureSetPtr = std::shared_ptr; + +struct SerializedSetsRegistry +{ + struct Hashing + { + UInt64 operator()(const FutureSet::Hash & key) const { return key.low64 ^ key.high64; } + }; + + std::unordered_map sets; +}; + +class ColumnSet; + +struct DeserializedSetsRegistry +{ + struct Hashing + { + UInt64 operator()(const FutureSet::Hash & key) const { return key.low64 ^ key.high64; } + }; + + std::unordered_map, Hashing> sets; +}; + +} diff --git a/src/Planner/Planner.cpp b/src/Planner/Planner.cpp index 4e40c31dd78..19ba725523f 100644 --- a/src/Planner/Planner.cpp +++ b/src/Planner/Planner.cpp @@ -106,7 +106,6 @@ namespace Setting extern const SettingsBool exact_rows_before_limit; extern const SettingsBool extremes; extern const SettingsBool force_aggregation_in_order; - extern const SettingsOverflowModeGroupBy group_by_overflow_mode; extern const SettingsUInt64 group_by_two_level_threshold; extern const SettingsUInt64 group_by_two_level_threshold_bytes; extern const SettingsBool group_by_use_nulls; @@ -115,9 +114,7 @@ namespace Setting extern const SettingsUInt64 max_size_to_preallocate_for_aggregation; extern const SettingsUInt64 max_subquery_depth; extern const SettingsUInt64 max_rows_in_distinct; - extern const SettingsUInt64 max_rows_to_group_by; extern const SettingsMaxThreads max_threads; - extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization; extern const SettingsBool parallel_replicas_allow_in_with_subquery; extern const SettingsString parallel_replicas_custom_key; extern const SettingsUInt64 parallel_replicas_min_number_of_rows_per_replica; @@ -126,6 +123,16 @@ namespace Setting extern const SettingsFloat totals_auto_threshold; extern const SettingsTotalsMode totals_mode; extern const SettingsBool use_with_fill_by_sorting_prefix; + extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization; + extern const SettingsUInt64 max_rows_to_group_by; + extern const SettingsOverflowModeGroupBy group_by_overflow_mode; + extern const SettingsUInt64 max_bytes_before_external_group_by; + extern const SettingsDouble max_bytes_ratio_before_external_group_by; + extern const SettingsUInt64 min_free_disk_space_for_temporary_data; + extern const SettingsBool compile_aggregate_expressions; + extern const SettingsUInt64 min_count_to_compile_aggregate_expression; + extern const SettingsBool enable_software_prefetch_in_aggregation; + extern const SettingsBool optimize_group_by_constant_keys; } namespace ServerSetting @@ -429,15 +436,27 @@ Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context } Aggregator::Params aggregator_params = Aggregator::Params( - settings, aggregation_analysis_result.aggregation_keys, aggregate_descriptions, query_analysis_result.aggregate_overflow_row, + settings[Setting::max_rows_to_group_by], + settings[Setting::group_by_overflow_mode], settings[Setting::group_by_two_level_threshold], settings[Setting::group_by_two_level_threshold_bytes], - settings[Setting::empty_result_for_aggregation_by_empty_set] || (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && aggregation_analysis_result.aggregation_keys.empty() && aggregation_analysis_result.group_by_with_constant_keys), + Aggregator::Params::getMaxBytesBeforeExternalGroupBy(settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]), + settings[Setting::empty_result_for_aggregation_by_empty_set] + || (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && aggregation_analysis_result.aggregation_keys.empty() + && aggregation_analysis_result.group_by_with_constant_keys), query_context->getTempDataOnDisk(), + settings[Setting::max_threads], + settings[Setting::min_free_disk_space_for_temporary_data], + settings[Setting::compile_aggregate_expressions], + settings[Setting::min_count_to_compile_aggregate_expression], + settings[Setting::max_block_size], + settings[Setting::enable_software_prefetch_in_aggregation], /* only_merge */ false, + settings[Setting::optimize_group_by_constant_keys], + settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization], stats_collecting_params); return aggregator_params; diff --git a/src/Planner/PlannerActionsVisitor.cpp b/src/Planner/PlannerActionsVisitor.cpp index 46b1db3f660..8f8584418ea 100644 --- a/src/Planner/PlannerActionsVisitor.cpp +++ b/src/Planner/PlannerActionsVisitor.cpp @@ -778,12 +778,9 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi lambda_actions_dag.getOutputs().push_back(actions_stack.back().getNodeOrThrow(lambda_expression_node_name)); lambda_actions_dag.removeUnusedActions(Names(1, lambda_expression_node_name)); - auto expression_actions_settings = ExpressionActionsSettings::fromContext(planner_context->getQueryContext(), CompileExpressions::yes); - auto lambda_actions = std::make_shared(std::move(lambda_actions_dag), expression_actions_settings); - Names captured_column_names; ActionsDAG::NodeRawConstPtrs lambda_children; - Names required_column_names = lambda_actions->getRequiredColumns(); + Names required_column_names = lambda_actions_dag.getRequiredColumnsNames(); actions_stack.pop_back(); levels.reset(actions_stack.size()); @@ -802,9 +799,10 @@ PlannerActionsVisitorImpl::NodeNameAndNodeMinLevel PlannerActionsVisitorImpl::vi } } + auto expression_actions_settings = ExpressionActionsSettings::fromContext(planner_context->getQueryContext(), CompileExpressions::yes); auto lambda_node_name = calculateActionNodeName(node, *planner_context); auto function_capture = std::make_shared( - lambda_actions, captured_column_names, lambda_arguments_names_and_types, lambda_node.getExpression()->getResultType(), lambda_expression_node_name, true); + std::move(lambda_actions_dag), expression_actions_settings, captured_column_names, lambda_arguments_names_and_types, lambda_node.getExpression()->getResultType(), lambda_expression_node_name, true); // TODO: Pass IFunctionBase here not FunctionCaptureOverloadResolver. const auto * actions_node = actions_stack[level].addFunctionIfNecessary(lambda_node_name, std::move(lambda_children), function_capture); diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 9fc88be6398..ae1cd83aa4e 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -13,7 +13,9 @@ #include #include #include -#include +#include +#include +#include #include #include #include @@ -30,6 +32,8 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int NOT_IMPLEMENTED; + extern const int INCORRECT_DATA; } static bool memoryBoundMergingWillBeUsed( @@ -147,6 +151,25 @@ const SortDescription & AggregatingStep::getSortDescription() const return IQueryPlanStep::getSortDescription(); } +static void updateThreadsValues( + size_t & new_merge_threads, + size_t & new_temporary_data_merge_threads, + Aggregator::Params & params, + const BuildQueryPipelineSettings & settings) +{ + /// Update values from settings if plan was deserialized. + if (new_merge_threads == 0) + new_merge_threads = settings.max_threads; + + if (new_temporary_data_merge_threads == 0) + new_temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads; + if (new_temporary_data_merge_threads == 0) + new_temporary_data_merge_threads = new_merge_threads; + + if (params.max_threads == 0) + params.max_threads = settings.max_threads; +} + ActionsDAG AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG( const Block & in_header, const Block & out_header, @@ -204,6 +227,10 @@ ActionsDAG AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG( void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings) { + size_t new_merge_threads = merge_threads; + size_t new_temporary_data_merge_threads = temporary_data_merge_threads; + updateThreadsValues(new_merge_threads, new_temporary_data_merge_threads, params, settings); + QueryPipelineProcessorsCollector collector(pipeline, this); /// Forget about current totals and extremes. They will be calculated again after aggregation if needed. @@ -279,8 +306,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B transform_params_for_set, many_data, j, - merge_threads, - temporary_data_merge_threads, + new_merge_threads, + new_temporary_data_merge_threads, should_produce_results_in_order_of_bucket_number, skip_merging); // For each input stream we have `grouping_sets_size` copies, so port index @@ -373,7 +400,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B return std::make_shared( header, transform_params, sort_description_for_merging, group_by_sort_description, - max_block_size, aggregation_in_order_max_block_bytes / merge_threads, + max_block_size, aggregation_in_order_max_block_bytes / new_merge_threads, many_data, counter++); }); @@ -399,7 +426,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B pipeline.addTransform(std::move(transform)); /// Do merge of aggregated data in parallel. - pipeline.resize(merge_threads); + pipeline.resize(new_merge_threads); const auto & required_sort_description = memoryBoundMergingWillBeUsed() ? group_by_sort_description : SortDescription{}; pipeline.addSimpleTransform( @@ -455,8 +482,8 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B transform_params, many_data, counter++, - merge_threads, - temporary_data_merge_threads, + new_merge_threads, + new_temporary_data_merge_threads, should_produce_results_in_order_of_bucket_number, skip_merging); }); @@ -484,6 +511,7 @@ void AggregatingStep::describeActions(FormatSettings & settings) const settings.out << prefix << "Order: " << dumpSortDescription(sort_description_for_merging) << '\n'; } settings.out << prefix << "Skip merging: " << skip_merging << '\n'; + // settings.out << prefix << "Memory bound merging: " << memory_bound_merging_of_aggregation_results_enabled << '\n'; } void AggregatingStep::describeActions(JSONBuilder::JSONMap & map) const @@ -588,8 +616,12 @@ void AggregatingProjectionStep::updateOutputHeader() QueryPipelineBuilderPtr AggregatingProjectionStep::updatePipeline( QueryPipelineBuilders pipelines, - const BuildQueryPipelineSettings &) + const BuildQueryPipelineSettings & settings) { + size_t new_merge_threads = merge_threads; + size_t new_temporary_data_merge_threads = temporary_data_merge_threads; + updateThreadsValues(new_merge_threads, new_temporary_data_merge_threads, params, settings); + auto & normal_parts_pipeline = pipelines.front(); auto & projection_parts_pipeline = pipelines.back(); @@ -624,7 +656,7 @@ QueryPipelineBuilderPtr AggregatingProjectionStep::updatePipeline( pipeline.addSimpleTransform([&](const Block & header) { return std::make_shared( - header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads); + header, transform_params, many_data, counter++, new_merge_threads, new_temporary_data_merge_threads); }); }; @@ -641,4 +673,173 @@ QueryPipelineBuilderPtr AggregatingProjectionStep::updatePipeline( return pipeline; } + +void AggregatingStep::serializeSettings(QueryPlanSerializationSettings & settings) const +{ + settings.max_block_size = max_block_size; + settings.aggregation_in_order_max_block_bytes = aggregation_in_order_max_block_bytes; + + settings.aggregation_in_order_memory_bound_merging = should_produce_results_in_order_of_bucket_number; + settings.aggregation_sort_result_by_bucket_number = memory_bound_merging_of_aggregation_results_enabled; + + settings.max_rows_to_group_by = params.max_rows_to_group_by; + settings.group_by_overflow_mode = params.group_by_overflow_mode; + + settings.group_by_two_level_threshold = params.group_by_two_level_threshold; + settings.group_by_two_level_threshold_bytes = params.group_by_two_level_threshold_bytes; + + settings.max_bytes_before_external_group_by = params.max_bytes_before_external_group_by; + settings.empty_result_for_aggregation_by_empty_set = params.empty_result_for_aggregation_by_empty_set; + + settings.min_free_disk_space_for_temporary_data = params.min_free_disk_space; + + settings.compile_aggregate_expressions = params.compile_aggregate_expressions; + settings.min_count_to_compile_aggregate_expression = params.min_count_to_compile_aggregate_expression; + + settings.enable_software_prefetch_in_aggregation = params.enable_prefetch; + settings.optimize_group_by_constant_keys = params.optimize_group_by_constant_keys; + settings.min_hit_rate_to_use_consecutive_keys_optimization = params.min_hit_rate_to_use_consecutive_keys_optimization; + + settings.collect_hash_table_stats_during_aggregation = params.stats_collecting_params.isCollectionAndUseEnabled(); + settings.max_entries_for_hash_table_stats = params.stats_collecting_params.max_entries_for_hash_table_stats; + settings.max_size_to_preallocate_for_aggregation = params.stats_collecting_params.max_size_to_preallocate; +} + +void AggregatingStep::serialize(Serialization & ctx) const +{ + if (!sort_description_for_merging.empty()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of AggregatingStep optimized for in-order is not supported."); + + if (!grouping_sets_params.empty()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of AggregatingStep with grouping sets is not supported."); + + if (explicit_sorting_required_for_aggregation_in_order) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of AggregatingStep explicit_sorting_required_for_aggregation_in_order is not supported."); + + /// If you wonder why something is serialized using settings, and other is serialized using flags, considerations are following: + /// * flags are something that may change data format returning from the step + /// * settings are something which already was in Settings.h and, usually, is passed to Aggregator unchanged + /// Flags `final` and `group_by_use_nulls` change types, and `overflow_row` appends additional block to results. + /// Settings like `max_rows_to_group_by` or `empty_result_for_aggregation_by_empty_set` affect the result, + /// but does not change data format. + /// Overall, the rule is not strict. + + UInt8 flags = 0; + if (final) + flags |= 1; + if (params.overflow_row) + flags |= 2; + if (group_by_use_nulls) + flags |= 4; + if (!grouping_sets_params.empty()) + flags |= 8; + /// Ideally, key should be calculated from QueryPlan on the follower. + /// So, let's have a flag to disable sending/reading pre-calculated value. + if (params.stats_collecting_params.isCollectionAndUseEnabled()) + flags |= 16; + + writeIntBinary(flags, ctx.out); + + if (explicit_sorting_required_for_aggregation_in_order) + serializeSortDescription(group_by_sort_description, ctx.out); + + writeVarUInt(params.keys.size(), ctx.out); + for (const auto & key : params.keys) + writeStringBinary(key, ctx.out); + + serializeAggregateDescriptions(params.aggregates, ctx.out); + + if (params.stats_collecting_params.isCollectionAndUseEnabled()) + writeIntBinary(params.stats_collecting_params.key, ctx.out); +} + +std::unique_ptr AggregatingStep::deserialize(Deserialization & ctx) +{ + if (ctx.input_headers.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "AggregatingStep must have one input stream"); + + UInt8 flags; + readIntBinary(flags, ctx.in); + + bool final = bool(flags & 1); + bool overflow_row = bool(flags & 2); + bool group_by_use_nulls = bool(flags & 4); + bool has_grouping_sets = bool(flags & 8); + bool has_stats_key = bool(flags & 16); + + if (has_grouping_sets) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of AggregatingStep with grouping sets is not supported."); + + UInt64 num_keys; + readVarUInt(num_keys, ctx.in); + Names keys(num_keys); + for (auto & key : keys) + readStringBinary(key, ctx.in); + + AggregateDescriptions aggregates; + deserializeAggregateDescriptions(aggregates, ctx.in); + + UInt64 stats_key = 0; + if (has_stats_key) + readIntBinary(stats_key, ctx.in); + + StatsCollectingParams stats_collecting_params( + stats_key, + ctx.settings.collect_hash_table_stats_during_aggregation, + ctx.settings.max_entries_for_hash_table_stats, + ctx.settings.max_size_to_preallocate_for_aggregation); + + Aggregator::Params params + { + keys, + aggregates, + overflow_row, + ctx.settings.max_rows_to_group_by, + ctx.settings.group_by_overflow_mode, + ctx.settings.group_by_two_level_threshold, + ctx.settings.group_by_two_level_threshold_bytes, + ctx.settings.max_bytes_before_external_group_by, + ctx.settings.empty_result_for_aggregation_by_empty_set, + Context::getGlobalContextInstance()->getTempDataOnDisk(), + 0, //settings.max_threads, + ctx.settings.min_free_disk_space_for_temporary_data, + ctx.settings.compile_aggregate_expressions, + ctx.settings.min_count_to_compile_aggregate_expression, + ctx.settings.max_block_size, + ctx.settings.enable_software_prefetch_in_aggregation, + /* only_merge */ false, + ctx.settings.optimize_group_by_constant_keys, + ctx.settings.min_hit_rate_to_use_consecutive_keys_optimization, + stats_collecting_params + }; + + SortDescription sort_description_for_merging; + GroupingSetsParamsList grouping_sets_params; + + auto aggregating_step = std::make_unique( + ctx.input_headers.front(), + std::move(params), + std::move(grouping_sets_params), + final, + ctx.settings.max_block_size, + ctx.settings.aggregation_in_order_max_block_bytes, + 0, //merge_threads, + 0, //temporary_data_merge_threads, + false, // storage_has_evenly_distributed_read, TODO: later + group_by_use_nulls, + std::move(sort_description_for_merging), + SortDescription{}, + ctx.settings.aggregation_in_order_memory_bound_merging, + ctx.settings.aggregation_sort_result_by_bucket_number, + false); + + return aggregating_step; +} + +void registerAggregatingStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("Aggregating", AggregatingStep::deserialize); +} + + } diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index d76764f05ba..ad06e1feffc 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -74,6 +74,13 @@ public: UInt64 group, bool group_by_use_nulls); + void serializeSettings(QueryPlanSerializationSettings & settings) const override; + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx); + + void enableMemoryBoundMerging() { memory_bound_merging_of_aggregation_results_enabled = true; } + private: void updateOutputHeader() override; @@ -98,7 +105,7 @@ private: /// These settings are used to determine if we should resize pipeline to 1 at the end. const bool should_produce_results_in_order_of_bucket_number; - const bool memory_bound_merging_of_aggregation_results_enabled; + bool memory_bound_merging_of_aggregation_results_enabled; bool explicit_sorting_required_for_aggregation_in_order; Processors aggregating_in_order; @@ -120,7 +127,7 @@ public: ); String getName() const override { return "AggregatingProjection"; } - QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) override; + QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override; private: void updateOutputHeader() override; diff --git a/src/Processors/QueryPlan/ArrayJoinStep.cpp b/src/Processors/QueryPlan/ArrayJoinStep.cpp index 4ba53480b67..d969a3bb23a 100644 --- a/src/Processors/QueryPlan/ArrayJoinStep.cpp +++ b/src/Processors/QueryPlan/ArrayJoinStep.cpp @@ -1,4 +1,7 @@ #include +#include +#include +#include #include #include #include @@ -79,4 +82,50 @@ void ArrayJoinStep::describeActions(JSONBuilder::JSONMap & map) const map.add("Columns", std::move(columns_array)); } +void ArrayJoinStep::serializeSettings(QueryPlanSerializationSettings & settings) const +{ + settings.max_block_size = max_block_size; +} + +void ArrayJoinStep::serialize(Serialization & ctx) const +{ + UInt8 flags = 0; + if (array_join.is_left) + flags |= 1; + if (is_unaligned) + flags |= 2; + + writeIntBinary(flags, ctx.out); + + writeVarUInt(array_join.columns.size(), ctx.out); + for (const auto & column : array_join.columns) + writeStringBinary(column, ctx.out); +} + +std::unique_ptr ArrayJoinStep::deserialize(Deserialization & ctx) +{ + UInt8 flags; + readIntBinary(flags, ctx.in); + + bool is_left = bool(flags & 1); + bool is_unaligned = bool(flags & 2); + + UInt64 num_columns; + readVarUInt(num_columns, ctx.in); + + ArrayJoin array_join; + array_join.is_left = is_left; + array_join.columns.resize(num_columns); + + for (auto & column : array_join.columns) + readStringBinary(column, ctx.in); + + return std::make_unique(ctx.input_headers.front(), std::move(array_join), is_unaligned, ctx.settings.max_block_size); +} + +void registerArrayJoinStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("ArrayJoin", ArrayJoinStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/ArrayJoinStep.h b/src/Processors/QueryPlan/ArrayJoinStep.h index 34eb34b5b25..add6212253b 100644 --- a/src/Processors/QueryPlan/ArrayJoinStep.h +++ b/src/Processors/QueryPlan/ArrayJoinStep.h @@ -22,6 +22,11 @@ public: const Names & getColumns() const { return array_join.columns; } bool isLeft() const { return array_join.is_left; } + void serializeSettings(QueryPlanSerializationSettings & settings) const override; + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx); + private: void updateOutputHeader() override; diff --git a/src/Processors/QueryPlan/BuildQueryPipelineSettings.cpp b/src/Processors/QueryPlan/BuildQueryPipelineSettings.cpp index 1832cc2ad42..c1ba13c77c8 100644 --- a/src/Processors/QueryPlan/BuildQueryPipelineSettings.cpp +++ b/src/Processors/QueryPlan/BuildQueryPipelineSettings.cpp @@ -9,6 +9,8 @@ namespace DB namespace Setting { extern const SettingsBool query_plan_merge_filters; + extern const SettingsMaxThreads max_threads; + extern const SettingsUInt64 aggregation_memory_efficient_merge_threads; } BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr from) @@ -19,6 +21,9 @@ BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr fr settings.process_list_element = from->getProcessListElement(); settings.progress_callback = from->getProgressCallback(); + settings.max_threads = from->getSettingsRef()[Setting::max_threads]; + settings.aggregation_memory_efficient_merge_threads = from->getSettingsRef()[Setting::aggregation_memory_efficient_merge_threads]; + /// Setting query_plan_merge_filters is enabled by default. /// But it can brake short-circuit without splitting filter step into smaller steps. /// So, enable and disable this optimizations together. diff --git a/src/Processors/QueryPlan/BuildQueryPipelineSettings.h b/src/Processors/QueryPlan/BuildQueryPipelineSettings.h index 6219e37db58..74ddd789ab1 100644 --- a/src/Processors/QueryPlan/BuildQueryPipelineSettings.h +++ b/src/Processors/QueryPlan/BuildQueryPipelineSettings.h @@ -24,6 +24,9 @@ struct BuildQueryPipelineSettings ProgressCallback progress_callback = nullptr; TemporaryFileLookupPtr temporary_file_lookup; + size_t max_threads; + size_t aggregation_memory_efficient_merge_threads; + const ExpressionActionsSettings & getActionsSettings() const { return actions_settings; } static BuildQueryPipelineSettings fromContext(ContextPtr from); }; diff --git a/src/Processors/QueryPlan/CreatingSetsStep.h b/src/Processors/QueryPlan/CreatingSetsStep.h index 0495ca2e638..5501cbd9bde 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.h +++ b/src/Processors/QueryPlan/CreatingSetsStep.h @@ -66,6 +66,8 @@ public: ContextPtr getContext() const { return context; } PreparedSets::Subqueries detachSets() { return std::move(subqueries); } + void serialize(Serialization &) const override {} + private: void updateOutputHeader() override { output_header = getInputHeaders().front(); } diff --git a/src/Processors/QueryPlan/DistinctStep.cpp b/src/Processors/QueryPlan/DistinctStep.cpp index 42f7f8d6e66..b7ce2ab612c 100644 --- a/src/Processors/QueryPlan/DistinctStep.cpp +++ b/src/Processors/QueryPlan/DistinctStep.cpp @@ -1,4 +1,7 @@ #include +#include +#include +#include #include #include #include @@ -13,6 +16,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int INCORRECT_DATA; } static ITransformingStep::Traits getTraits(bool pre_distinct) @@ -48,6 +52,16 @@ DistinctStep::DistinctStep( { } +void DistinctStep::updateLimitHint(UInt64 hint) +{ + if (hint && limit_hint) + /// Both limits are set - take the min + limit_hint = std::min(hint, limit_hint); + else + /// Some limit is not set - take the other one + limit_hint = std::max(hint, limit_hint); +} + void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) { if (!pre_distinct) @@ -158,4 +172,58 @@ void DistinctStep::updateOutputHeader() output_header = input_headers.front(); } +void DistinctStep::serializeSettings(QueryPlanSerializationSettings & settings) const +{ + settings.max_rows_in_distinct = set_size_limits.max_rows; + settings.max_bytes_in_distinct = set_size_limits.max_bytes; + settings.distinct_overflow_mode = set_size_limits.overflow_mode; +} + +void DistinctStep::serialize(Serialization & ctx) const +{ + /// Let's not serialize limit_hint. + /// Ideally, we can get if from a query plan optimization on the follower. + + writeVarUInt(columns.size(), ctx.out); + for (const auto & column : columns) + writeStringBinary(column, ctx.out); +} + +std::unique_ptr DistinctStep::deserialize(Deserialization & ctx, bool pre_distinct_) +{ + if (ctx.input_headers.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "DistinctStep must have one input stream"); + + size_t columns_size; + readVarUInt(columns_size, ctx.in); + Names column_names(columns_size); + for (size_t i = 0; i < columns_size; ++i) + readStringBinary(column_names[i], ctx.in); + + SizeLimits size_limits; + size_limits.max_rows = ctx.settings.max_rows_in_distinct; + size_limits.max_bytes = ctx.settings.max_bytes_in_distinct; + size_limits.overflow_mode = ctx.settings.distinct_overflow_mode; + + return std::make_unique( + ctx.input_headers.front(), size_limits, 0, column_names, pre_distinct_); +} + +std::unique_ptr DistinctStep::deserializeNormal(Deserialization & ctx) +{ + return DistinctStep::deserialize(ctx, false); +} +std::unique_ptr DistinctStep::deserializePre(Deserialization & ctx) +{ + return DistinctStep::deserialize(ctx, true); +} + +void registerDistinctStep(QueryPlanStepRegistry & registry) +{ + /// Preliminary distinct probably can be a query plan optimization. + /// It's easier to serialize it using different names, so that pre-distinct can be potentially removed later. + registry.registerStep("Distinct", DistinctStep::deserializeNormal); + registry.registerStep("PreDistinct", DistinctStep::deserializePre); +} + } diff --git a/src/Processors/QueryPlan/DistinctStep.h b/src/Processors/QueryPlan/DistinctStep.h index d6caf92d072..4b70b67b5e0 100644 --- a/src/Processors/QueryPlan/DistinctStep.h +++ b/src/Processors/QueryPlan/DistinctStep.h @@ -20,6 +20,8 @@ public: String getName() const override { return "Distinct"; } const Names & getColumnNames() const { return columns; } + String getSerializationName() const override { return pre_distinct ? "PreDistinct" : "Distinct"; } + void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; void describeActions(JSONBuilder::JSONMap & map) const override; @@ -28,6 +30,14 @@ public: bool isPreliminary() const { return pre_distinct; } UInt64 getLimitHint() const { return limit_hint; } + void updateLimitHint(UInt64 hint); + + void serializeSettings(QueryPlanSerializationSettings & settings) const override; + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx, bool pre_distinct_); + static std::unique_ptr deserializeNormal(Deserialization & ctx); + static std::unique_ptr deserializePre(Deserialization & ctx); const SizeLimits & getSetSizeLimits() const { return set_size_limits; } diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index 963932072da..32f80769b04 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include #include #include @@ -10,6 +12,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; +} + static ITransformingStep::Traits getTraits(const ActionsDAG & actions) { return ITransformingStep::Traits @@ -109,4 +116,23 @@ void ExpressionStep::updateOutputHeader() output_header = ExpressionTransform::transformHeader(input_headers.front(), actions_dag); } +void ExpressionStep::serialize(Serialization & ctx) const +{ + actions_dag.serialize(ctx.out, ctx.registry); +} + +std::unique_ptr ExpressionStep::deserialize(Deserialization & ctx) +{ + ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context); + if (ctx.input_headers.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "ExpressionStep must have one input stream"); + + return std::make_unique(ctx.input_headers.front(), std::move(actions_dag)); +} + +void registerExpressionStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("Expression", ExpressionStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/ExpressionStep.h b/src/Processors/QueryPlan/ExpressionStep.h index 234552e5445..15fbbdad807 100644 --- a/src/Processors/QueryPlan/ExpressionStep.h +++ b/src/Processors/QueryPlan/ExpressionStep.h @@ -25,6 +25,9 @@ public: void describeActions(JSONBuilder::JSONMap & map) const override; + void serialize(Serialization & ctx) const override; + static std::unique_ptr deserialize(Deserialization & ctx); + private: void updateOutputHeader() override; diff --git a/src/Processors/QueryPlan/ExtremesStep.cpp b/src/Processors/QueryPlan/ExtremesStep.cpp index 1eb593df2ab..06f6fd80bc4 100644 --- a/src/Processors/QueryPlan/ExtremesStep.cpp +++ b/src/Processors/QueryPlan/ExtremesStep.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include namespace DB @@ -29,4 +31,19 @@ void ExtremesStep::transformPipeline(QueryPipelineBuilder & pipeline, const Buil pipeline.addExtremesTransform(); } +void ExtremesStep::serialize(Serialization & ctx) const +{ + (void)ctx; +} + +std::unique_ptr ExtremesStep::deserialize(Deserialization & ctx) +{ + return std::make_unique(ctx.input_headers.front()); +} + +void registerExtremesStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("Extremes", ExtremesStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/ExtremesStep.h b/src/Processors/QueryPlan/ExtremesStep.h index 363f0a2f6d4..a41e33f38cb 100644 --- a/src/Processors/QueryPlan/ExtremesStep.h +++ b/src/Processors/QueryPlan/ExtremesStep.h @@ -13,6 +13,9 @@ public: void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override; + void serialize(Serialization & ctx) const override; + static std::unique_ptr deserialize(Deserialization & ctx); + private: void updateOutputHeader() override { diff --git a/src/Processors/QueryPlan/FilterStep.cpp b/src/Processors/QueryPlan/FilterStep.cpp index 49190701553..6fb1e2cb73c 100644 --- a/src/Processors/QueryPlan/FilterStep.cpp +++ b/src/Processors/QueryPlan/FilterStep.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include #include #include @@ -16,6 +18,12 @@ namespace DB { + +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; +} + static ITransformingStep::Traits getTraits() { return ITransformingStep::Traits @@ -240,4 +248,39 @@ void FilterStep::updateOutputHeader() return; } +void FilterStep::serialize(Serialization & ctx) const +{ + UInt8 flags = 0; + if (remove_filter_column) + flags |= 1; + writeIntBinary(flags, ctx.out); + + writeStringBinary(filter_column_name, ctx.out); + + actions_dag.serialize(ctx.out, ctx.registry); +} + +std::unique_ptr FilterStep::deserialize(Deserialization & ctx) +{ + if (ctx.input_headers.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "FilterStep must have one input stream"); + + UInt8 flags; + readIntBinary(flags, ctx.in); + + bool remove_filter_column = bool(flags & 1); + + String filter_column_name; + readStringBinary(filter_column_name, ctx.in); + + ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context); + + return std::make_unique(ctx.input_headers.front(), std::move(actions_dag), std::move(filter_column_name), remove_filter_column); +} + +void registerFilterStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("Filter", FilterStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/FilterStep.h b/src/Processors/QueryPlan/FilterStep.h index cb90459f0ab..ceefd67f8a6 100644 --- a/src/Processors/QueryPlan/FilterStep.h +++ b/src/Processors/QueryPlan/FilterStep.h @@ -26,6 +26,10 @@ public: const String & getFilterColumnName() const { return filter_column_name; } bool removesFilterColumn() const { return remove_filter_column; } + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx); + private: void updateOutputHeader() override; diff --git a/src/Processors/QueryPlan/IQueryPlanStep.cpp b/src/Processors/QueryPlan/IQueryPlanStep.cpp index fdb1690bc6d..a6770c4347e 100644 --- a/src/Processors/QueryPlan/IQueryPlanStep.cpp +++ b/src/Processors/QueryPlan/IQueryPlanStep.cpp @@ -8,6 +8,7 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; + extern const int NOT_IMPLEMENTED; } IQueryPlanStep::IQueryPlanStep() @@ -146,4 +147,11 @@ void IQueryPlanStep::appendExtraProcessors(const Processors & extra_processors) processors.insert(processors.end(), extra_processors.begin(), extra_processors.end()); } +void IQueryPlanStep::serialize(Serialization & /*ctx*/) const +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method serialize is not implemented for {}", getName()); +} + +void IQueryPlanStep::updateOutputHeader() { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented"); } + } diff --git a/src/Processors/QueryPlan/IQueryPlanStep.h b/src/Processors/QueryPlan/IQueryPlanStep.h index d38b36a3a78..2f5c63ab36f 100644 --- a/src/Processors/QueryPlan/IQueryPlanStep.h +++ b/src/Processors/QueryPlan/IQueryPlanStep.h @@ -24,6 +24,8 @@ namespace JSONBuilder { class JSONMap; } class QueryPlan; using QueryPlanRawPtrs = std::list; +struct QueryPlanSerializationSettings; + using Header = Block; using Headers = std::vector
; @@ -36,6 +38,7 @@ public: virtual ~IQueryPlanStep() = default; virtual String getName() const = 0; + virtual String getSerializationName() const { return getName(); } /// Add processors from current step to QueryPipeline. /// Calling this method, we assume and don't check that: @@ -54,6 +57,11 @@ public: const std::string & getStepDescription() const { return step_description; } void setStepDescription(std::string description) { step_description = std::move(description); } + struct Serialization; + struct Deserialization; + + virtual void serializeSettings(QueryPlanSerializationSettings & /*settings*/) const {} + virtual void serialize(Serialization & /*ctx*/) const; virtual const SortDescription & getSortDescription() const; struct FormatSettings diff --git a/src/Processors/QueryPlan/LimitByStep.cpp b/src/Processors/QueryPlan/LimitByStep.cpp index 92900b8153f..cb14f6b2ef0 100644 --- a/src/Processors/QueryPlan/LimitByStep.cpp +++ b/src/Processors/QueryPlan/LimitByStep.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include #include #include @@ -24,11 +26,11 @@ static ITransformingStep::Traits getTraits() LimitByStep::LimitByStep( const Header & input_header_, - size_t group_length_, size_t group_offset_, const Names & columns_) + size_t group_length_, size_t group_offset_, Names columns_) : ITransformingStep(input_header_, input_header_, getTraits()) , group_length(group_length_) , group_offset(group_offset_) - , columns(columns_) + , columns(std::move(columns_)) { } @@ -83,4 +85,37 @@ void LimitByStep::describeActions(JSONBuilder::JSONMap & map) const map.add("Offset", group_offset); } +void LimitByStep::serialize(Serialization & ctx) const +{ + writeVarUInt(group_length, ctx.out); + writeVarUInt(group_offset, ctx.out); + + + writeVarUInt(columns.size(), ctx.out); + for (const auto & column : columns) + writeStringBinary(column, ctx.out); +} + +std::unique_ptr LimitByStep::deserialize(Deserialization & ctx) +{ + UInt64 group_length; + UInt64 group_offset; + + readVarUInt(group_length, ctx.in); + readVarUInt(group_offset, ctx.in); + + UInt64 num_columns; + readVarUInt(num_columns, ctx.in); + Names columns(num_columns); + for (auto & column : columns) + readStringBinary(column, ctx.in); + + return std::make_unique(ctx.input_headers.front(), group_length, group_offset, std::move(columns)); +} + +void registerLimitByStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("LimitBy", LimitByStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/LimitByStep.h b/src/Processors/QueryPlan/LimitByStep.h index e34d1d5327d..f382ebdbd44 100644 --- a/src/Processors/QueryPlan/LimitByStep.h +++ b/src/Processors/QueryPlan/LimitByStep.h @@ -10,7 +10,7 @@ class LimitByStep : public ITransformingStep public: explicit LimitByStep( const Header & input_header_, - size_t group_length_, size_t group_offset_, const Names & columns_); + size_t group_length_, size_t group_offset_, Names columns_); String getName() const override { return "LimitBy"; } @@ -19,6 +19,10 @@ public: void describeActions(JSONBuilder::JSONMap & map) const override; void describeActions(FormatSettings & settings) const override; + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx); + private: void updateOutputHeader() override { diff --git a/src/Processors/QueryPlan/LimitStep.cpp b/src/Processors/QueryPlan/LimitStep.cpp index a186e1f7965..18c401e68c1 100644 --- a/src/Processors/QueryPlan/LimitStep.cpp +++ b/src/Processors/QueryPlan/LimitStep.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include #include #include @@ -76,4 +78,47 @@ void LimitStep::describeActions(JSONBuilder::JSONMap & map) const map.add("Reads All Data", always_read_till_end); } +void LimitStep::serialize(Serialization & ctx) const +{ + UInt8 flags = 0; + if (always_read_till_end) + flags |= 1; + if (with_ties) + flags |= 2; + + writeIntBinary(flags, ctx.out); + + writeVarUInt(limit, ctx.out); + writeVarUInt(offset, ctx.out); + + if (with_ties) + serializeSortDescription(description, ctx.out); +} + +std::unique_ptr LimitStep::deserialize(Deserialization & ctx) +{ + UInt8 flags; + readIntBinary(flags, ctx.in); + + bool always_read_till_end = bool(flags & 1); + bool with_ties = bool(flags & 2); + + UInt64 limit; + UInt64 offset; + + readVarUInt(limit, ctx.in); + readVarUInt(offset, ctx.in); + + SortDescription description; + if (with_ties) + deserializeSortDescription(description, ctx.in); + + return std::make_unique(ctx.input_headers.front(), limit, offset, always_read_till_end, with_ties, std::move(description)); +} + +void registerLimitStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("Limit", LimitStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/LimitStep.h b/src/Processors/QueryPlan/LimitStep.h index 4a779259681..e1696c6165e 100644 --- a/src/Processors/QueryPlan/LimitStep.h +++ b/src/Processors/QueryPlan/LimitStep.h @@ -33,6 +33,10 @@ public: bool withTies() const { return with_ties; } + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx); + private: void updateOutputHeader() override { diff --git a/src/Processors/QueryPlan/OffsetStep.cpp b/src/Processors/QueryPlan/OffsetStep.cpp index 1d25414a44c..05b516a5971 100644 --- a/src/Processors/QueryPlan/OffsetStep.cpp +++ b/src/Processors/QueryPlan/OffsetStep.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include #include #include @@ -46,4 +48,22 @@ void OffsetStep::describeActions(JSONBuilder::JSONMap & map) const map.add("Offset", offset); } +void OffsetStep::serialize(Serialization & ctx) const +{ + writeVarUInt(offset, ctx.out); +} + +std::unique_ptr OffsetStep::deserialize(Deserialization & ctx) +{ + UInt64 offset; + readVarUInt(offset, ctx.in); + + return std::make_unique(ctx.input_headers.front(), offset); +} + +void registerOffsetStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("Offset", OffsetStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/OffsetStep.h b/src/Processors/QueryPlan/OffsetStep.h index ae6bc1c66c0..c8b755d15fb 100644 --- a/src/Processors/QueryPlan/OffsetStep.h +++ b/src/Processors/QueryPlan/OffsetStep.h @@ -18,6 +18,10 @@ public: void describeActions(JSONBuilder::JSONMap & map) const override; void describeActions(FormatSettings & settings) const override; + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx); + private: void updateOutputHeader() override { diff --git a/src/Processors/QueryPlan/Optimizations/limitPushDown.cpp b/src/Processors/QueryPlan/Optimizations/limitPushDown.cpp index 10cc78da33a..aee7ab23683 100644 --- a/src/Processors/QueryPlan/Optimizations/limitPushDown.cpp +++ b/src/Processors/QueryPlan/Optimizations/limitPushDown.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include namespace DB::QueryPlanOptimizations @@ -63,6 +64,12 @@ size_t tryPushDownLimit(QueryPlan::Node * parent_node, QueryPlan::Nodes &) if (tryUpdateLimitForSortingSteps(child_node, limit->getLimitForSorting())) return 0; + if (auto * distinct = typeid_cast(child.get())) + { + distinct->updateLimitHint(limit->getLimitForSorting()); + return 0; + } + if (typeid_cast(child.get())) return 0; diff --git a/src/Processors/QueryPlan/QueryPlanSerializationSettings.cpp b/src/Processors/QueryPlan/QueryPlanSerializationSettings.cpp new file mode 100644 index 00000000000..9b11132e27c --- /dev/null +++ b/src/Processors/QueryPlan/QueryPlanSerializationSettings.cpp @@ -0,0 +1,8 @@ +#include + +namespace DB +{ + +IMPLEMENT_SETTINGS_TRAITS(QueryPlanSerializationSettingsTraits, PLAN_SERIALIZATION_SETTINGS); + +} diff --git a/src/Processors/QueryPlan/QueryPlanSerializationSettings.h b/src/Processors/QueryPlan/QueryPlanSerializationSettings.h new file mode 100644 index 00000000000..4e119051a51 --- /dev/null +++ b/src/Processors/QueryPlan/QueryPlanSerializationSettings.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ + +#define PLAN_SERIALIZATION_SETTINGS(M, ALIAS) \ + M(UInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size in rows for reading", 0) \ + \ + M(UInt64, max_rows_in_distinct, 0, "Maximum number of elements during execution of DISTINCT.", 0) \ + M(UInt64, max_bytes_in_distinct, 0, "Maximum total size of state (in uncompressed bytes) in memory for the execution of DISTINCT.", 0) \ + M(OverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ + \ + M(UInt64, max_rows_to_sort, 0, "If more than the specified amount of records have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \ + M(UInt64, max_bytes_to_sort, 0, "If more than the specified amount of (uncompressed) bytes have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \ + M(OverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ + \ + M(UInt64, prefer_external_sort_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging.", 0) \ + M(UInt64, max_bytes_before_external_sort, 0, "If memory usage during ORDER BY operation is exceeding this threshold in bytes, activate the 'external sorting' mode (spill data to disk). Recommended value is half of available system memory.", 0) \ + M(UInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.", 0) \ + M(Float, remerge_sort_lowered_memory_bytes_ratio, 2., "If memory usage after remerge does not reduced by this ratio, remerge will be disabled.", 0) \ + M(UInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \ + \ + M(UInt64, aggregation_in_order_max_block_bytes, 50000000, "Maximal size of block in bytes accumulated during aggregation in order of primary key. Lower block size allows to parallelize more final merge stage of aggregation.", 0) \ + M(Bool, aggregation_in_order_memory_bound_merging, true, "Enable memory bound merging strategy when in-order is applied.", 0) \ + M(Bool, aggregation_sort_result_by_bucket_number, true, "Send intermediate aggregation result in order of bucket number.", 0) \ + \ + M(UInt64, max_rows_to_group_by, 0, "If aggregation during GROUP BY is generating more than the specified number of rows (unique GROUP BY keys), the behavior will be determined by the 'group_by_overflow_mode' which by default is - throw an exception, but can be also switched to an approximate GROUP BY mode.", 0) \ + M(OverflowModeGroupBy, group_by_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ + M(UInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.", 0) \ + M(UInt64, group_by_two_level_threshold_bytes, 50000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.", 0) \ + M(UInt64, max_bytes_before_external_group_by, 0, "If memory usage during GROUP BY operation is exceeding this threshold in bytes, activate the 'external aggregation' mode (spill data to disk). Recommended value is half of available system memory.", 0) \ + M(Bool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.", 0) \ + M(Bool, compile_aggregate_expressions, true, "Compile aggregate functions to native code.", 0) \ + M(UInt64, min_count_to_compile_aggregate_expression, 3, "The number of identical aggregate expressions before they are JIT-compiled", 0) \ + M(Bool, enable_software_prefetch_in_aggregation, true, "Enable use of software prefetch in aggregation", 0) \ + M(Bool, optimize_group_by_constant_keys, true, "Optimize GROUP BY when all keys in block are constant", 0) \ + M(Float, min_hit_rate_to_use_consecutive_keys_optimization, 0.5, "Minimal hit rate of a cache which is used for consecutive keys optimization in aggregation to keep it enabled", 0) \ + M(Bool, collect_hash_table_stats_during_aggregation, true, "Enable collecting hash table statistics to optimize memory allocation", 0) \ + M(UInt64, max_entries_for_hash_table_stats, 10'000, "How many entries hash table statistics collected during aggregation is allowed to have", 0) \ + M(UInt64, max_size_to_preallocate_for_aggregation, 100'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before aggregation", 0) \ + \ + M(TotalsMode, totals_mode, TotalsMode::AFTER_HAVING_EXCLUSIVE, "How to calculate TOTALS when HAVING is present, as well as when max_rows_to_group_by and group_by_overflow_mode = ‘any’ are present.", IMPORTANT) \ + M(Float, totals_auto_threshold, 0.5, "The threshold for totals_mode = 'auto'.", 0) \ + \ + + +DECLARE_SETTINGS_TRAITS(QueryPlanSerializationSettingsTraits, PLAN_SERIALIZATION_SETTINGS) + +struct QueryPlanSerializationSettings : public BaseSettings +{ + QueryPlanSerializationSettings() = default; +}; + +} diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp new file mode 100644 index 00000000000..0df21ff9d05 --- /dev/null +++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp @@ -0,0 +1,70 @@ +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNKNOWN_IDENTIFIER; + extern const int LOGICAL_ERROR; +} + +QueryPlanStepRegistry & QueryPlanStepRegistry::instance() +{ + static QueryPlanStepRegistry registry; + return registry; +} + +void QueryPlanStepRegistry::registerStep(const std::string & name, StepCreateFunction && create_function) +{ + if (steps.contains(name)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan step '{}' is already registered", name); + steps[name] = std::move(create_function); +} + +QueryPlanStepPtr QueryPlanStepRegistry::createStep( + const std::string & name, + IQueryPlanStep::Deserialization & ctx) const +{ + StepCreateFunction create_function; + { + auto it = steps.find(name); + if (it == steps.end()) + throw Exception(ErrorCodes::UNKNOWN_IDENTIFIER, "Unknown query plan step: {}", name); + create_function = it->second; + } + return create_function(ctx); +} + +void registerExpressionStep(QueryPlanStepRegistry & registry); +void registerUnionStep(QueryPlanStepRegistry & registry); +void registerDistinctStep(QueryPlanStepRegistry & registry); +void registerSortingStep(QueryPlanStepRegistry & registry); +void registerAggregatingStep(QueryPlanStepRegistry & registry); +void registerArrayJoinStep(QueryPlanStepRegistry & registry); +void registerLimitByStep(QueryPlanStepRegistry & registry); +void registerLimitStep(QueryPlanStepRegistry & registry); +void registerOffsetStep(QueryPlanStepRegistry & registry); +void registerFilterStep(QueryPlanStepRegistry & registry); +void registerTotalsHavingStep(QueryPlanStepRegistry & registry); +void registerExtremesStep(QueryPlanStepRegistry & registry); + +void QueryPlanStepRegistry::registerPlanSteps() +{ + QueryPlanStepRegistry & registry = QueryPlanStepRegistry::instance(); + + registerExpressionStep(registry); + registerUnionStep(registry); + registerDistinctStep(registry); + registerSortingStep(registry); + registerAggregatingStep(registry); + registerArrayJoinStep(registry); + registerLimitByStep(registry); + registerLimitStep(registry); + registerOffsetStep(registry); + registerFilterStep(registry); + registerTotalsHavingStep(registry); + registerExtremesStep(registry); +} + +} diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.h b/src/Processors/QueryPlan/QueryPlanStepRegistry.h new file mode 100644 index 00000000000..fdd1232e102 --- /dev/null +++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.h @@ -0,0 +1,32 @@ +#pragma once + +#include + +namespace DB +{ + +class QueryPlanStepRegistry +{ +public: + using StepCreateFunction = std::function; + + QueryPlanStepRegistry() = default; + QueryPlanStepRegistry(const QueryPlanStepRegistry &) = delete; + QueryPlanStepRegistry & operator=(const QueryPlanStepRegistry &) = delete; + + static QueryPlanStepRegistry & instance(); + + static void registerPlanSteps(); + + void registerStep(const std::string & name, StepCreateFunction && create_function); + + QueryPlanStepPtr createStep( + const std::string & name, + IQueryPlanStep::Deserialization & ctx) const; + +private: + std::unordered_map steps; + +}; + +} diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 6899dc7f5d6..6317cce83bd 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -1959,9 +1959,10 @@ bool ReadFromMergeTree::isQueryWithSampling() const if (context->getSettingsRef()[Setting::parallel_replicas_count] > 1 && data.supportsSampling()) return true; - const auto & select = query_info.query->as(); if (query_info.table_expression_modifiers) return query_info.table_expression_modifiers->getSampleSizeRatio() != std::nullopt; + + const auto & select = query_info.query->as(); return select.sampleSize() != nullptr; } diff --git a/src/Processors/QueryPlan/Serialization.h b/src/Processors/QueryPlan/Serialization.h new file mode 100644 index 00000000000..80c7dee0252 --- /dev/null +++ b/src/Processors/QueryPlan/Serialization.h @@ -0,0 +1,30 @@ +#pragma once +#include +#include + +namespace DB +{ + +struct SerializedSetsRegistry; +struct DeserializedSetsRegistry; + +struct IQueryPlanStep::Serialization +{ + WriteBuffer & out; + SerializedSetsRegistry & registry; +}; + +struct SerializedSetsRegistry; + +struct IQueryPlanStep::Deserialization +{ + ReadBuffer & in; + DeserializedSetsRegistry & registry; + const ContextPtr & context; + + const Headers & input_headers; + const Header * output_header; + const QueryPlanSerializationSettings & settings; +}; + +} diff --git a/src/Processors/QueryPlan/SortingStep.cpp b/src/Processors/QueryPlan/SortingStep.cpp index 4d671e325f6..c488610a4c3 100644 --- a/src/Processors/QueryPlan/SortingStep.cpp +++ b/src/Processors/QueryPlan/SortingStep.cpp @@ -2,6 +2,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -43,6 +46,8 @@ namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int BAD_ARGUMENTS; + extern const int NOT_IMPLEMENTED; + extern const int INCORRECT_DATA; } SortingStep::Settings::Settings(const Context & context) @@ -88,6 +93,33 @@ SortingStep::Settings::Settings(size_t max_block_size_) max_block_size = max_block_size_; } +SortingStep::Settings::Settings(const QueryPlanSerializationSettings & settings) +{ + max_block_size = settings.max_block_size; + size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); + max_bytes_before_remerge = settings.max_bytes_before_remerge_sort; + remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio; + max_bytes_before_external_sort = settings.max_bytes_before_external_sort; + tmp_data = Context::getGlobalContextInstance()->getTempDataOnDisk(); + min_free_disk_space = settings.min_free_disk_space_for_temporary_data; + max_block_bytes = settings.prefer_external_sort_block_bytes; + read_in_order_use_buffering = false; //settings.read_in_order_use_buffering; +} + +void SortingStep::Settings::updatePlanSettings(QueryPlanSerializationSettings & settings) const +{ + settings.max_block_size = max_block_size; + settings.max_rows_to_sort = size_limits.max_rows; + settings.max_bytes_to_sort = size_limits.max_bytes; + settings.sort_overflow_mode = size_limits.overflow_mode; + + settings.max_bytes_before_remerge_sort = max_bytes_before_remerge; + settings.remerge_sort_lowered_memory_bytes_ratio = remerge_lowered_memory_bytes_ratio; + settings.max_bytes_before_external_sort = max_bytes_before_external_sort; + settings.min_free_disk_space_for_temporary_data = min_free_disk_space; + settings.prefer_external_sort_block_bytes = max_block_bytes; +} + static ITransformingStep::Traits getTraits(size_t limit) { return ITransformingStep::Traits @@ -452,4 +484,52 @@ void SortingStep::describeActions(JSONBuilder::JSONMap & map) const map.add("Limit", limit); } +void SortingStep::serializeSettings(QueryPlanSerializationSettings & settings) const +{ + sort_settings.updatePlanSettings(settings); +} + +void SortingStep::serialize(Serialization & ctx) const +{ + if (type != Type::Full) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of SortingStep is implemented only for Full sorting"); + + /// Do not serialize type here; Later we can use different names if needed.\ + + /// Do not serialize limit for now; it is expected to be pushed down from plan optimization. + + serializeSortDescription(result_description, ctx.out); + + /// Later + if (!partition_by_description.empty()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of partitioned sorting is not implemented for SortingStep"); + + writeVarUInt(partition_by_description.size(), ctx.out); +} + +std::unique_ptr SortingStep::deserialize(Deserialization & ctx) +{ + if (ctx.input_headers.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "SortingStep must have one input stream"); + + SortingStep::Settings sort_settings(ctx.settings); + + SortDescription result_description; + deserializeSortDescription(result_description, ctx.in); + + UInt64 partition_desc_size; + readVarUInt(partition_desc_size, ctx.in); + + if (partition_desc_size) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Deserialization of partitioned sorting is not implemented for SortingStep"); + + return std::make_unique( + ctx.input_headers.front(), std::move(result_description), 0, std::move(sort_settings)); +} + +void registerSortingStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("Sorting", SortingStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/SortingStep.h b/src/Processors/QueryPlan/SortingStep.h index d831fe0234f..5eb9fbf1db3 100644 --- a/src/Processors/QueryPlan/SortingStep.h +++ b/src/Processors/QueryPlan/SortingStep.h @@ -23,7 +23,7 @@ public: size_t max_block_size; SizeLimits size_limits; size_t max_bytes_before_remerge = 0; - double remerge_lowered_memory_bytes_ratio = 0; + float remerge_lowered_memory_bytes_ratio = 0; size_t max_bytes_before_external_sort = 0; TemporaryDataOnDiskScopePtr tmp_data = nullptr; size_t min_free_disk_space = 0; @@ -32,6 +32,9 @@ public: explicit Settings(const Context & context); explicit Settings(size_t max_block_size_); + explicit Settings(const QueryPlanSerializationSettings & settings); + + void updatePlanSettings(QueryPlanSerializationSettings & settings) const; }; /// Full @@ -96,6 +99,11 @@ public: UInt64 limit_, bool skip_partial_sort = false); + void serializeSettings(QueryPlanSerializationSettings & settings) const override; + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx); + private: void scatterByPartitionIfNeeded(QueryPipelineBuilder& pipeline); void updateOutputHeader() override; diff --git a/src/Processors/QueryPlan/SourceStepWithFilter.h b/src/Processors/QueryPlan/SourceStepWithFilter.h index be6585b8755..a650cdc3211 100644 --- a/src/Processors/QueryPlan/SourceStepWithFilter.h +++ b/src/Processors/QueryPlan/SourceStepWithFilter.h @@ -53,6 +53,9 @@ public: void setLimit(size_t limit_value) { + if (limit) + limit_value = std::min(limit_value, *limit); + limit = limit_value; } diff --git a/src/Processors/QueryPlan/TotalsHavingStep.cpp b/src/Processors/QueryPlan/TotalsHavingStep.cpp index 5cd483862ff..df741e01f9b 100644 --- a/src/Processors/QueryPlan/TotalsHavingStep.cpp +++ b/src/Processors/QueryPlan/TotalsHavingStep.cpp @@ -1,4 +1,7 @@ #include +#include +#include +#include #include #include #include @@ -10,6 +13,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int INCORRECT_DATA; +} + static ITransformingStep::Traits getTraits(bool has_filter) { return ITransformingStep::Traits @@ -33,7 +41,7 @@ TotalsHavingStep::TotalsHavingStep( const std::string & filter_column_, bool remove_filter_, TotalsMode totals_mode_, - double auto_include_threshold_, + float auto_include_threshold_, bool final_) : ITransformingStep( input_header_, @@ -141,5 +149,75 @@ void TotalsHavingStep::updateOutputHeader() getAggregatesMask(input_headers.front(), aggregates)); } +void TotalsHavingStep::serializeSettings(QueryPlanSerializationSettings & settings) const +{ + settings.totals_mode = totals_mode; + settings.totals_auto_threshold = auto_include_threshold; +} + +void TotalsHavingStep::serialize(Serialization & ctx) const +{ + UInt8 flags = 0; + if (final) + flags |= 1; + if (overflow_row) + flags |= 2; + if (actions_dag) + flags |= 4; + if (actions_dag && remove_filter) + flags |= 8; + + writeIntBinary(flags, ctx.out); + + serializeAggregateDescriptions(aggregates, ctx.out); + + if (actions_dag) + { + writeStringBinary(filter_column_name, ctx.out); + actions_dag->serialize(ctx.out, ctx.registry); + } +} + +std::unique_ptr TotalsHavingStep::deserialize(Deserialization & ctx) +{ + if (ctx.input_headers.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "TotalsHaving must have one input stream"); + + UInt8 flags; + readIntBinary(flags, ctx.in); + + bool final = bool(flags & 1); + bool overflow_row = bool(flags & 2); + bool has_actions_dag = bool(flags & 4); + bool remove_filter_column = bool(flags & 8); + + AggregateDescriptions aggregates; + deserializeAggregateDescriptions(aggregates, ctx.in); + + std::optional actions_dag; + String filter_column_name; + if (has_actions_dag) + { + readStringBinary(filter_column_name, ctx.in); + + actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context); + } + + return std::make_unique( + ctx.input_headers.front(), + std::move(aggregates), + overflow_row, + std::move(actions_dag), + std::move(filter_column_name), + remove_filter_column, + ctx.settings.totals_mode, + ctx.settings.totals_auto_threshold, + final); +} + +void registerTotalsHavingStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("TotalsHaving", TotalsHavingStep::deserialize); +} } diff --git a/src/Processors/QueryPlan/TotalsHavingStep.h b/src/Processors/QueryPlan/TotalsHavingStep.h index 3bcd6360c80..a68eb4c75ca 100644 --- a/src/Processors/QueryPlan/TotalsHavingStep.h +++ b/src/Processors/QueryPlan/TotalsHavingStep.h @@ -20,7 +20,7 @@ public: const std::string & filter_column_, bool remove_filter_, TotalsMode totals_mode_, - double auto_include_threshold_, + float auto_include_threshold_, bool final_); String getName() const override { return "TotalsHaving"; } @@ -32,6 +32,11 @@ public: const ActionsDAG * getActions() const { return actions_dag ? &*actions_dag : nullptr; } + void serializeSettings(QueryPlanSerializationSettings & settings) const override; + void serialize(Serialization & ctx) const override; + + static std::unique_ptr deserialize(Deserialization & ctx); + private: void updateOutputHeader() override; @@ -42,7 +47,7 @@ private: String filter_column_name; bool remove_filter; TotalsMode totals_mode; - double auto_include_threshold; + float auto_include_threshold; bool final; }; diff --git a/src/Processors/QueryPlan/UnionStep.cpp b/src/Processors/QueryPlan/UnionStep.cpp index d5c2469629b..0808b833d01 100644 --- a/src/Processors/QueryPlan/UnionStep.cpp +++ b/src/Processors/QueryPlan/UnionStep.cpp @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include #include #include @@ -37,7 +39,7 @@ void UnionStep::updateOutputHeader() output_header = checkHeaders(input_headers); } -QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) +QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) { auto pipeline = std::make_unique(); @@ -49,6 +51,8 @@ QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipeline return pipeline; } + size_t new_max_threads = max_threads ? max_threads : settings.max_threads; + for (auto & cur_pipeline : pipelines) { #if !defined(NDEBUG) @@ -75,7 +79,7 @@ QueryPipelineBuilderPtr UnionStep::updatePipeline(QueryPipelineBuilders pipeline } } - *pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines), max_threads, &processors); + *pipeline = QueryPipelineBuilder::unitePipelines(std::move(pipelines), new_max_threads, &processors); return pipeline; } @@ -84,4 +88,19 @@ void UnionStep::describePipeline(FormatSettings & settings) const IQueryPlanStep::describePipeline(processors, settings); } +void UnionStep::serialize(Serialization & ctx) const +{ + (void)ctx; +} + +std::unique_ptr UnionStep::deserialize(Deserialization & ctx) +{ + return std::make_unique(ctx.input_headers); +} + +void registerUnionStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("Union", &UnionStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/UnionStep.h b/src/Processors/QueryPlan/UnionStep.h index efb8f51c7a4..87a3a064eaf 100644 --- a/src/Processors/QueryPlan/UnionStep.h +++ b/src/Processors/QueryPlan/UnionStep.h @@ -13,12 +13,15 @@ public: String getName() const override { return "Union"; } - QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings &) override; + QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override; void describePipeline(FormatSettings & settings) const override; size_t getMaxThreads() const { return max_threads; } + void serialize(Serialization & ctx) const override; + static std::unique_ptr deserialize(Deserialization & ctx); + private: void updateOutputHeader() override; diff --git a/src/Processors/TTL/TTLAggregationAlgorithm.cpp b/src/Processors/TTL/TTLAggregationAlgorithm.cpp index 5fc76db2eec..a86f24b06c4 100644 --- a/src/Processors/TTL/TTLAggregationAlgorithm.cpp +++ b/src/Processors/TTL/TTLAggregationAlgorithm.cpp @@ -43,15 +43,25 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm( const Settings & settings = storage_.getContext()->getSettingsRef(); Aggregator::Params params( - settings, keys, aggregates, /*overflow_row_=*/ false, - /*group_by_two_level_threshold_=*/ 0, - /*group_by_two_level_threshold_bytes_=*/ 0, + settings[Setting::max_rows_to_group_by], + settings[Setting::group_by_overflow_mode], + /*group_by_two_level_threshold*/ 0, + /*group_by_two_level_threshold_bytes*/ 0, + Aggregator::Params::getMaxBytesBeforeExternalGroupBy(settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]), settings[Setting::empty_result_for_aggregation_by_empty_set], storage_.getContext()->getTempDataOnDisk(), - /*only_merge_=*/false, + settings[Setting::max_threads], + settings[Setting::min_free_disk_space_for_temporary_data], + settings[Setting::compile_aggregate_expressions], + settings[Setting::min_count_to_compile_aggregate_expression], + settings[Setting::max_block_size], + settings[Setting::enable_software_prefetch_in_aggregation], + /*only_merge=*/false, + settings[Setting::optimize_group_by_constant_keys], + settings[Setting::min_chunk_bytes_for_parallel_parsing], /*stats_collecting_params_=*/{}); aggregator = std::make_unique(header, params); diff --git a/src/QueryPipeline/SizeLimits.cpp b/src/QueryPipeline/SizeLimits.cpp index 4161f3f365f..3e86ae7e9f9 100644 --- a/src/QueryPipeline/SizeLimits.cpp +++ b/src/QueryPipeline/SizeLimits.cpp @@ -2,6 +2,8 @@ #include #include #include +#include +#include namespace ProfileEvents @@ -14,6 +16,12 @@ namespace ProfileEvents namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int INCORRECT_DATA; +} + bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int too_many_rows_exception_code, int too_many_bytes_exception_code) const { if (overflow_mode == OverflowMode::THROW) @@ -64,4 +72,25 @@ bool SizeLimits::check(UInt64 rows, UInt64 bytes, const char * what, int excepti return check(rows, bytes, what, exception_code, exception_code); } +static void checkAllowedOwerflowMode(OverflowMode mode, int code) +{ + if (!(mode == OverflowMode::BREAK || mode == OverflowMode::THROW)) + throw Exception(code, "Unexpected overflow mode {}", mode); +} + +void SizeLimits::serialize(WriteBuffer & out) const +{ + checkAllowedOwerflowMode(overflow_mode, ErrorCodes::LOGICAL_ERROR); + writeVarUInt(max_rows, out); + writeVarUInt(max_bytes, out); + writeIntBinary(overflow_mode, out); +} +void SizeLimits::deserialize(ReadBuffer & in) +{ + checkAllowedOwerflowMode(overflow_mode, ErrorCodes::INCORRECT_DATA); + readVarUInt(max_rows, in); + readVarUInt(max_bytes, in); + readIntBinary(overflow_mode, in); +} + } diff --git a/src/QueryPipeline/SizeLimits.h b/src/QueryPipeline/SizeLimits.h index 1c84f81a127..04293ee8966 100644 --- a/src/QueryPipeline/SizeLimits.h +++ b/src/QueryPipeline/SizeLimits.h @@ -18,6 +18,9 @@ enum class OverflowMode : uint8_t ANY = 2, }; +class WriteBuffer; +class ReadBuffer; + struct SizeLimits { @@ -38,6 +41,9 @@ struct SizeLimits bool softCheck(UInt64 rows, UInt64 bytes) const; bool hasLimits() const { return max_rows || max_bytes; } + + void serialize(WriteBuffer & out) const; + void deserialize(ReadBuffer & in); }; }