From 824aa10b82a3e4708194624312dac58d53f8a139 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 5 Sep 2024 14:33:07 +0000 Subject: [PATCH] Add serialization for SortingStep. --- src/Processors/QueryPlan/ArrayJoinStep.cpp | 49 +++++++++++++++++++ src/Processors/QueryPlan/ArrayJoinStep.h | 11 +++-- .../QueryPlan/QueryPlanStepRegistry.cpp | 2 + 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/src/Processors/QueryPlan/ArrayJoinStep.cpp b/src/Processors/QueryPlan/ArrayJoinStep.cpp index aa721e138cf..24f2b8ce114 100644 --- a/src/Processors/QueryPlan/ArrayJoinStep.cpp +++ b/src/Processors/QueryPlan/ArrayJoinStep.cpp @@ -1,4 +1,6 @@ #include +#include +#include #include #include #include @@ -81,4 +83,51 @@ void ArrayJoinStep::describeActions(JSONBuilder::JSONMap & map) const map.add("Columns", std::move(columns_array)); } +void ArrayJoinStep::serializeSettings(QueryPlanSerializationSettings & settings) const +{ + settings.max_block_size = max_block_size; +} + +void ArrayJoinStep::serialize(WriteBuffer & out) const +{ + UInt8 flags = 0; + if (is_left) + flags |= 1; + if (is_unaligned) + flags |= 2; + + writeIntBinary(flags, out); + + writeVarUInt(columns.size(), out); + for (const auto & column : columns) + writeStringBinary(column, out); +} + +std::unique_ptr ArrayJoinStep::deserialize( + ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings & settings) +{ + UInt8 flags; + readIntBinary(flags, in); + + bool is_left = flags & 1; + bool is_unaligned = flags & 2; + + UInt64 num_columns; + readVarUInt(num_columns, in); + NameSet columns; + for (size_t i = 0; i < num_columns; ++i) + { + String column; + readStringBinary(column, in); + columns.insert(std::move(column)); + } + + return std::make_unique(input_streams_.front(), std::move(columns), is_left, is_unaligned, settings.max_block_size); +} + +void registerArrayJoinStep(QueryPlanStepRegistry & registry) +{ + registry.registerStep("ArrayJoin", ArrayJoinStep::deserialize); +} + } diff --git a/src/Processors/QueryPlan/ArrayJoinStep.h b/src/Processors/QueryPlan/ArrayJoinStep.h index 3f2eacc3159..9dd581bc2b2 100644 --- a/src/Processors/QueryPlan/ArrayJoinStep.h +++ b/src/Processors/QueryPlan/ArrayJoinStep.h @@ -21,13 +21,18 @@ public: const NameSet & getColumns() const { return columns; } bool isLeft() const { return is_left; } + void serializeSettings(QueryPlanSerializationSettings & settings) const override; + void serialize(WriteBuffer & out) const override; + + static std::unique_ptr deserialize(ReadBuffer & in, const DataStreams & input_streams_, const DataStream *, QueryPlanSerializationSettings & settings); + private: void updateOutputStream() override; NameSet columns; - bool is_left = false; - bool is_unaligned = false; - size_t max_block_size = DEFAULT_BLOCK_SIZE; + bool is_left; + bool is_unaligned; + size_t max_block_size; }; } diff --git a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp index a6dfee1268d..d6de1895fb1 100644 --- a/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp +++ b/src/Processors/QueryPlan/QueryPlanStepRegistry.cpp @@ -44,6 +44,7 @@ void registerUnionStep(QueryPlanStepRegistry & registry); void registerDistinctStep(QueryPlanStepRegistry & registry); void registerSortingStep(QueryPlanStepRegistry & registry); void registerAggregatingStep(QueryPlanStepRegistry & registry); +void registerArrayJoinStep(QueryPlanStepRegistry & registry); void registerReadFromTableStep(QueryPlanStepRegistry & registry); @@ -56,6 +57,7 @@ void QueryPlanStepRegistry::registerPlanSteps() registerDistinctStep(registry); registerSortingStep(registry); registerAggregatingStep(registry); + registerArrayJoinStep(registry); registerReadFromTableStep(registry); }