diff --git a/src/Processors/QueryPlan/LimitByStep.cpp b/src/Processors/QueryPlan/LimitByStep.cpp index 8b4abecc12c..9f071931c03 100644 --- a/src/Processors/QueryPlan/LimitByStep.cpp +++ b/src/Processors/QueryPlan/LimitByStep.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -24,11 +25,11 @@ static ITransformingStep::Traits getTraits() LimitByStep::LimitByStep( const DataStream & input_stream_, - size_t group_length_, size_t group_offset_, const Names & columns_) + size_t group_length_, size_t group_offset_, Names columns_) : ITransformingStep(input_stream_, input_stream_.header, getTraits()) , group_length(group_length_) , group_offset(group_offset_) - , columns(columns_) + , columns(std::move(columns_)) { } @@ -83,4 +84,38 @@ void LimitByStep::describeActions(JSONBuilder::JSONMap & map) const map.add("Offset", group_offset); } +void LimitByStep::serialize(WriteBuffer & out) const +{ + writeVarUInt(group_length, out); + writeVarUInt(group_offset, out); + + + writeVarUInt(columns.size(), out); + for (const auto & column : columns) + writeStringBinary(column, out); +} + +std::unique_ptr LimitByStep::deserialize( + ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings &) +{ + UInt64 group_length; + UInt64 group_offset; + + readVarUInt(group_length, in); + readVarUInt(group_offset, in); + + UInt64 num_columns; + readVarUInt(num_columns, in); + Names columns(num_columns); + for (auto & column : columns) + readStringBinary(column, in); + + return std::make_unique(input_streams_.front(), group_length, group_offset, std::move(columns)); +} + +void registerLimitByStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("LimitBy", LimitByStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/LimitByStep.h b/src/Processors/QueryPlan/LimitByStep.h index 0edda3247d6..5cba37ae348 100644 --- a/src/Processors/QueryPlan/LimitByStep.h +++ b/src/Processors/QueryPlan/LimitByStep.h @@ -10,7 +10,7 @@ class LimitByStep : public ITransformingStep public: explicit LimitByStep( const DataStream & input_stream_, - size_t group_length_, size_t group_offset_, const Names & columns_); + size_t group_length_, size_t group_offset_, Names columns_); String getName() const override { return "LimitBy"; } @@ -19,6 +19,10 @@ public: void describeActions(JSONBuilder::JSONMap & map) const override; void describeActions(FormatSettings & settings) const override; + void serialize(WriteBuffer & out) const override; + + static std::unique_ptr deserialize(ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings &); + private: void updateOutputStream() override { diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp index d6de1895fb1..24ac6711e96 100644 --- a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp +++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp @@ -45,6 +45,7 @@ void registerDistinctStep(QueryPlanStepRegistry & registry); void registerSortingStep(QueryPlanStepRegistry & registry); void registerAggregatingStep(QueryPlanStepRegistry & registry); void registerArrayJoinStep(QueryPlanStepRegistry & registry); +void registerLimitByStep(QueryPlanStepRegistry & registry); void registerReadFromTableStep(QueryPlanStepRegistry & registry); @@ -58,6 +59,7 @@ void QueryPlanStepRegistry::registerPlanSteps() registerSortingStep(registry); registerAggregatingStep(registry); registerArrayJoinStep(registry); + registerLimitByStep(registry); registerReadFromTableStep(registry); }