diff --git a/src/Processors/QueryPlan/QueryPlanSerializationSettings.h b/src/Processors/QueryPlan/QueryPlanSerializationSettings.h index 98ec3610a45..fe4586f55b2 100644 --- a/src/Processors/QueryPlan/QueryPlanSerializationSettings.h +++ b/src/Processors/QueryPlan/QueryPlanSerializationSettings.h @@ -9,9 +9,23 @@ namespace DB { #define PLAN_SERIALIZATION_SETTINGS(M, ALIAS) \ + M(UInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size in rows for reading", 0) \ + \ M(UInt64, max_rows_in_distinct, 0, "Maximum number of elements during execution of DISTINCT.", 0) \ M(UInt64, max_bytes_in_distinct, 0, "Maximum total size of state (in uncompressed bytes) in memory for the execution of DISTINCT.", 0) \ M(OverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ + \ + M(UInt64, max_rows_to_sort, 0, "If more than the specified amount of records have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \ + M(UInt64, max_bytes_to_sort, 0, "If more than the specified amount of (uncompressed) bytes have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \ + M(OverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \ + \ + M(UInt64, prefer_external_sort_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging.", 0) \ + M(UInt64, max_bytes_before_external_sort, 0, "If memory usage during ORDER BY operation is exceeding this threshold in bytes, activate the 'external sorting' mode (spill data to disk). Recommended value is half of available system memory.", 0) \ + M(UInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.", 0) \ + M(Float, remerge_sort_lowered_memory_bytes_ratio, 2., "If memory usage after remerge does not reduced by this ratio, remerge will be disabled.", 0) \ + M(UInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \ + \ + DECLARE_SETTINGS_TRAITS(QueryPlanSerializationSettingsTraits, PLAN_SERIALIZATION_SETTINGS) diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp index 5e221379b39..91050ef4bbe 100644 --- a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp +++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp @@ -42,6 +42,8 @@ QueryPlanStepPtr QueryPlanStepRegistry::createStep( void registerExpressionStep(QueryPlanStepRegistry & registry); void registerUnionStep(QueryPlanStepRegistry & registry); void registerDistinctStep(QueryPlanStepRegistry & registry); +void registerSortingStep(QueryPlanStepRegistry & registry); + void registerReadFromTableStep(QueryPlanStepRegistry & registry); void QueryPlanStepRegistry::registerPlanSteps() @@ -51,6 +53,7 @@ void QueryPlanStepRegistry::registerPlanSteps() registerExpressionStep(registry); registerUnionStep(registry); registerDistinctStep(registry); + registerSortingStep(registry); registerReadFromTableStep(registry); } diff --git a/src/Processors/QueryPlan/SortingStep.cpp b/src/Processors/QueryPlan/SortingStep.cpp index 48fad9f5fdb..04895a38ffe 100644 --- a/src/Processors/QueryPlan/SortingStep.cpp +++ b/src/Processors/QueryPlan/SortingStep.cpp @@ -2,6 +2,8 @@ #include #include #include +#include +#include #include #include #include @@ -48,6 +50,33 @@ SortingStep::Settings::Settings(size_t max_block_size_) max_block_size = max_block_size_; } +SortingStep::Settings::Settings(const QueryPlanSerializationSettings & settings) +{ + max_block_size = settings.max_block_size; + size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode); + max_bytes_before_remerge = settings.max_bytes_before_remerge_sort; + remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio; + max_bytes_before_external_sort = settings.max_bytes_before_external_sort; + tmp_data = Context::getGlobalContextInstance()->getTempDataOnDisk(); + min_free_disk_space = settings.min_free_disk_space_for_temporary_data; + max_block_bytes = settings.prefer_external_sort_block_bytes; + read_in_order_use_buffering = false; //settings.read_in_order_use_buffering; +} + +void SortingStep::Settings::updatePlanSettings(QueryPlanSerializationSettings & settings) const +{ + settings.max_block_size = max_block_size; + settings.max_rows_to_sort = size_limits.max_rows; + settings.max_bytes_to_sort = size_limits.max_bytes; + settings.sort_overflow_mode = size_limits.overflow_mode; + + settings.max_bytes_before_remerge_sort = max_bytes_before_remerge; + settings.remerge_sort_lowered_memory_bytes_ratio = remerge_lowered_memory_bytes_ratio; + settings.max_bytes_before_external_sort = max_bytes_before_external_sort; + settings.min_free_disk_space_for_temporary_data = min_free_disk_space; + settings.prefer_external_sort_block_bytes = max_block_bytes; +} + static ITransformingStep::Traits getTraits(size_t limit) { return ITransformingStep::Traits @@ -465,4 +494,53 @@ void SortingStep::describeActions(JSONBuilder::JSONMap & map) const map.add("Limit", limit); } +void SortingStep::serializeSettings(QueryPlanSerializationSettings & settings) const +{ + sort_settings.updatePlanSettings(settings); +} + +void SortingStep::serialize(WriteBuffer & out) const +{ + if (type != Type::Full) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of SortingStep is implemented only for Full sorting"); + + /// Do not serialize type here; Later we can use different names if needed.\ + + /// Do not serialize limit for now; it is expected to be pushed down from plan optimization. + + serializeSortDescription(result_description, out); + + /// Later + if (!partition_by_description.empty()) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of partitioned sorting is not implemented for SortingStep"); + + writeVarUInt(partition_by_description.size(), out); +} + +std::unique_ptr SortingStep::deserialize( + ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings & settings) +{ + if (input_streams_.size() != 1) + throw Exception(ErrorCodes::INCORRECT_DATA, "SortingStep must have one input stream"); + + SortingStep::Settings sort_settings(settings); + + SortDescription result_description; + deserializeSortDescription(result_description, in); + + UInt64 partition_desc_size; + readVarUInt(partition_desc_size, in); + + if (partition_desc_size) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Deserialization of partitioned sorting is not implemented for SortingStep"); + + return std::make_unique( + input_streams_.front(), std::move(result_description), 0, std::move(sort_settings), true); +} + +void registerSortingStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("Sorting", SortingStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/SortingStep.h b/src/Processors/QueryPlan/SortingStep.h index b4a49394a13..c1557cc07e4 100644 --- a/src/Processors/QueryPlan/SortingStep.h +++ b/src/Processors/QueryPlan/SortingStep.h @@ -23,7 +23,7 @@ public: size_t max_block_size; SizeLimits size_limits; size_t max_bytes_before_remerge = 0; - double remerge_lowered_memory_bytes_ratio = 0; + float remerge_lowered_memory_bytes_ratio = 0; size_t max_bytes_before_external_sort = 0; TemporaryDataOnDiskScopePtr tmp_data = nullptr; size_t min_free_disk_space = 0; @@ -32,6 +32,9 @@ public: explicit Settings(const Context & context); explicit Settings(size_t max_block_size_); + explicit Settings(const QueryPlanSerializationSettings & settings); + + void updatePlanSettings(QueryPlanSerializationSettings & settings) const; }; /// Full @@ -93,6 +96,11 @@ public: UInt64 limit_, bool skip_partial_sort = false); + void serializeSettings(QueryPlanSerializationSettings & settings) const override; + void serialize(WriteBuffer & out) const override; + + static std::unique_ptr deserialize(ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings & settings); + private: void scatterByPartitionIfNeeded(QueryPipelineBuilder& pipeline); void updateOutputStream() override;