Add serialization for SortingStep.

This commit is contained in:
Nikolai Kochetov 2024-09-05 14:33:07 +00:00
parent 14dc81ff63
commit 824aa10b82
3 changed files with 59 additions and 3 deletions

View File

@ -1,4 +1,6 @@
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/QueryPlanSerializationSettings.h>
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
#include <Processors/Transforms/ArrayJoinTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -81,4 +83,51 @@ void ArrayJoinStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Columns", std::move(columns_array));
}
void ArrayJoinStep::serializeSettings(QueryPlanSerializationSettings & settings) const
{
settings.max_block_size = max_block_size;
}
void ArrayJoinStep::serialize(WriteBuffer & out) const
{
UInt8 flags = 0;
if (is_left)
flags |= 1;
if (is_unaligned)
flags |= 2;
writeIntBinary(flags, out);
writeVarUInt(columns.size(), out);
for (const auto & column : columns)
writeStringBinary(column, out);
}
std::unique_ptr<IQueryPlanStep> ArrayJoinStep::deserialize(
ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings & settings)
{
UInt8 flags;
readIntBinary(flags, in);
bool is_left = flags & 1;
bool is_unaligned = flags & 2;
UInt64 num_columns;
readVarUInt(num_columns, in);
NameSet columns;
for (size_t i = 0; i < num_columns; ++i)
{
String column;
readStringBinary(column, in);
columns.insert(std::move(column));
}
return std::make_unique<ArrayJoinStep>(input_streams_.front(), std::move(columns), is_left, is_unaligned, settings.max_block_size);
}
void registerArrayJoinStep(QueryPlanStepRegistry & registry)
{
registry.registerStep("ArrayJoin", ArrayJoinStep::deserialize);
}
}

View File

@ -21,13 +21,18 @@ public:
const NameSet & getColumns() const { return columns; }
bool isLeft() const { return is_left; }
void serializeSettings(QueryPlanSerializationSettings & settings) const override;
void serialize(WriteBuffer & out) const override;
static std::unique_ptr<IQueryPlanStep> deserialize(ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings & settings);
private:
void updateOutputStream() override;
NameSet columns;
bool is_left = false;
bool is_unaligned = false;
size_t max_block_size = DEFAULT_BLOCK_SIZE;
bool is_left;
bool is_unaligned;
size_t max_block_size;
};
}

View File

@ -44,6 +44,7 @@ void registerUnionStep(QueryPlanStepRegistry & registry);
void registerDistinctStep(QueryPlanStepRegistry & registry);
void registerSortingStep(QueryPlanStepRegistry & registry);
void registerAggregatingStep(QueryPlanStepRegistry & registry);
void registerArrayJoinStep(QueryPlanStepRegistry & registry);
void registerReadFromTableStep(QueryPlanStepRegistry & registry);
@ -56,6 +57,7 @@ void QueryPlanStepRegistry::registerPlanSteps()
registerDistinctStep(registry);
registerSortingStep(registry);
registerAggregatingStep(registry);
registerArrayJoinStep(registry);
registerReadFromTableStep(registry);
}