Add serialization for SortingStep.

This commit is contained in:
Nikolai Kochetov 2024-09-02 15:04:17 +00:00
parent 1ed4e1b847
commit 7ee29b6254
4 changed files with 104 additions and 1 deletions

View File

@ -9,9 +9,23 @@ namespace DB
{
#define PLAN_SERIALIZATION_SETTINGS(M, ALIAS) \
M(UInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size in rows for reading", 0) \
\
M(UInt64, max_rows_in_distinct, 0, "Maximum number of elements during execution of DISTINCT.", 0) \
M(UInt64, max_bytes_in_distinct, 0, "Maximum total size of state (in uncompressed bytes) in memory for the execution of DISTINCT.", 0) \
M(OverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
M(UInt64, max_rows_to_sort, 0, "If more than the specified amount of records have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(UInt64, max_bytes_to_sort, 0, "If more than the specified amount of (uncompressed) bytes have to be processed for ORDER BY operation, the behavior will be determined by the 'sort_overflow_mode' which by default is - throw an exception", 0) \
M(OverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
\
M(UInt64, prefer_external_sort_block_bytes, DEFAULT_BLOCK_SIZE * 256, "Prefer maximum block bytes for external sort, reduce the memory usage during merging.", 0) \
M(UInt64, max_bytes_before_external_sort, 0, "If memory usage during ORDER BY operation is exceeding this threshold in bytes, activate the 'external sorting' mode (spill data to disk). Recommended value is half of available system memory.", 0) \
M(UInt64, max_bytes_before_remerge_sort, 1000000000, "In case of ORDER BY with LIMIT, when memory usage is higher than specified threshold, perform additional steps of merging blocks before final merge to keep just top LIMIT rows.", 0) \
M(Float, remerge_sort_lowered_memory_bytes_ratio, 2., "If memory usage after remerge does not reduced by this ratio, remerge will be disabled.", 0) \
M(UInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \
\
DECLARE_SETTINGS_TRAITS(QueryPlanSerializationSettingsTraits, PLAN_SERIALIZATION_SETTINGS)

View File

@ -42,6 +42,8 @@ QueryPlanStepPtr QueryPlanStepRegistry::createStep(
void registerExpressionStep(QueryPlanStepRegistry & registry);
void registerUnionStep(QueryPlanStepRegistry & registry);
void registerDistinctStep(QueryPlanStepRegistry & registry);
void registerSortingStep(QueryPlanStepRegistry & registry);
void registerReadFromTableStep(QueryPlanStepRegistry & registry);
void QueryPlanStepRegistry::registerPlanSteps()
@ -51,6 +53,7 @@ void QueryPlanStepRegistry::registerPlanSteps()
registerExpressionStep(registry);
registerUnionStep(registry);
registerDistinctStep(registry);
registerSortingStep(registry);
registerReadFromTableStep(registry);
}

View File

@ -2,6 +2,8 @@
#include <Interpreters/Context.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/Transforms/FinishSortingTransform.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
#include <Processors/Transforms/MergeSortingTransform.h>
@ -48,6 +50,33 @@ SortingStep::Settings::Settings(size_t max_block_size_)
max_block_size = max_block_size_;
}
SortingStep::Settings::Settings(const QueryPlanSerializationSettings & settings)
{
max_block_size = settings.max_block_size;
size_limits = SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode);
max_bytes_before_remerge = settings.max_bytes_before_remerge_sort;
remerge_lowered_memory_bytes_ratio = settings.remerge_sort_lowered_memory_bytes_ratio;
max_bytes_before_external_sort = settings.max_bytes_before_external_sort;
tmp_data = Context::getGlobalContextInstance()->getTempDataOnDisk();
min_free_disk_space = settings.min_free_disk_space_for_temporary_data;
max_block_bytes = settings.prefer_external_sort_block_bytes;
read_in_order_use_buffering = false; //settings.read_in_order_use_buffering;
}
void SortingStep::Settings::updatePlanSettings(QueryPlanSerializationSettings & settings) const
{
settings.max_block_size = max_block_size;
settings.max_rows_to_sort = size_limits.max_rows;
settings.max_bytes_to_sort = size_limits.max_bytes;
settings.sort_overflow_mode = size_limits.overflow_mode;
settings.max_bytes_before_remerge_sort = max_bytes_before_remerge;
settings.remerge_sort_lowered_memory_bytes_ratio = remerge_lowered_memory_bytes_ratio;
settings.max_bytes_before_external_sort = max_bytes_before_external_sort;
settings.min_free_disk_space_for_temporary_data = min_free_disk_space;
settings.prefer_external_sort_block_bytes = max_block_bytes;
}
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
@ -465,4 +494,53 @@ void SortingStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Limit", limit);
}
void SortingStep::serializeSettings(QueryPlanSerializationSettings & settings) const
{
sort_settings.updatePlanSettings(settings);
}
void SortingStep::serialize(WriteBuffer & out) const
{
if (type != Type::Full)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of SortingStep is implemented only for Full sorting");
/// Do not serialize type here; Later we can use different names if needed.\
/// Do not serialize limit for now; it is expected to be pushed down from plan optimization.
serializeSortDescription(result_description, out);
/// Later
if (!partition_by_description.empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Serialization of partitioned sorting is not implemented for SortingStep");
writeVarUInt(partition_by_description.size(), out);
}
std::unique_ptr<IQueryPlanStep> SortingStep::deserialize(
ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings & settings)
{
if (input_streams_.size() != 1)
throw Exception(ErrorCodes::INCORRECT_DATA, "SortingStep must have one input stream");
SortingStep::Settings sort_settings(settings);
SortDescription result_description;
deserializeSortDescription(result_description, in);
UInt64 partition_desc_size;
readVarUInt(partition_desc_size, in);
if (partition_desc_size)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Deserialization of partitioned sorting is not implemented for SortingStep");
return std::make_unique<SortingStep>(
input_streams_.front(), std::move(result_description), 0, std::move(sort_settings), true);
}
void registerSortingStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("Sorting", SortingStep::deserialize);
}
}

View File

@ -23,7 +23,7 @@ public:
size_t max_block_size;
SizeLimits size_limits;
size_t max_bytes_before_remerge = 0;
double remerge_lowered_memory_bytes_ratio = 0;
float remerge_lowered_memory_bytes_ratio = 0;
size_t max_bytes_before_external_sort = 0;
TemporaryDataOnDiskScopePtr tmp_data = nullptr;
size_t min_free_disk_space = 0;
@ -32,6 +32,9 @@ public:
explicit Settings(const Context & context);
explicit Settings(size_t max_block_size_);
explicit Settings(const QueryPlanSerializationSettings & settings);
void updatePlanSettings(QueryPlanSerializationSettings & settings) const;
};
/// Full
@ -93,6 +96,11 @@ public:
UInt64 limit_,
bool skip_partial_sort = false);
void serializeSettings(QueryPlanSerializationSettings & settings) const override;
void serialize(WriteBuffer & out) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings & settings);
private:
void scatterByPartitionIfNeeded(QueryPipelineBuilder& pipeline);
void updateOutputStream() override;