Add serialization for LimitByStep.

This commit is contained in:
Nikolai Kochetov 2024-09-05 15:00:25 +00:00
parent f257b27998
commit e92f79184f
3 changed files with 44 additions and 3 deletions

View File

@ -1,4 +1,5 @@
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/Transforms/LimitByTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/Operators.h>
@ -24,11 +25,11 @@ static ITransformingStep::Traits getTraits()
LimitByStep::LimitByStep(
const DataStream & input_stream_,
size_t group_length_, size_t group_offset_, const Names & columns_)
size_t group_length_, size_t group_offset_, Names columns_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, group_length(group_length_)
, group_offset(group_offset_)
, columns(columns_)
, columns(std::move(columns_))
{
}
@ -83,4 +84,38 @@ void LimitByStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Offset", group_offset);
}
void LimitByStep::serialize(WriteBuffer & out) const
{
writeVarUInt(group_length, out);
writeVarUInt(group_offset, out);
writeVarUInt(columns.size(), out);
for (const auto & column : columns)
writeStringBinary(column, out);
}
std::unique_ptr<IQueryPlanStep> LimitByStep::deserialize(
ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings &)
{
UInt64 group_length;
UInt64 group_offset;
readVarUInt(group_length, in);
readVarUInt(group_offset, in);
UInt64 num_columns;
readVarUInt(num_columns, in);
Names columns(num_columns);
for (auto & column : columns)
readStringBinary(column, in);
return std::make_unique<LimitByStep>(input_streams_.front(), group_length, group_offset, std::move(columns));
}
void registerLimitByStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("LimitBy", LimitByStep::deserialize);
}
}

View File

@ -10,7 +10,7 @@ class LimitByStep : public ITransformingStep
public:
explicit LimitByStep(
const DataStream & input_stream_,
size_t group_length_, size_t group_offset_, const Names & columns_);
size_t group_length_, size_t group_offset_, Names columns_);
String getName() const override { return "LimitBy"; }
@ -19,6 +19,10 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
void serialize(WriteBuffer & out) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings &);
private:
void updateOutputStream() override
{

View File

@ -45,6 +45,7 @@ void registerDistinctStep(QueryPlanStepRegistry & registry);
void registerSortingStep(QueryPlanStepRegistry & registry);
void registerAggregatingStep(QueryPlanStepRegistry & registry);
void registerArrayJoinStep(QueryPlanStepRegistry & registry);
void registerLimitByStep(QueryPlanStepRegistry & registry);
void registerReadFromTableStep(QueryPlanStepRegistry & registry);
@ -58,6 +59,7 @@ void QueryPlanStepRegistry::registerPlanSteps()
registerSortingStep(registry);
registerAggregatingStep(registry);
registerArrayJoinStep(registry);
registerLimitByStep(registry);
registerReadFromTableStep(registry);
}