diff --git a/src/Core/Block.h b/src/Core/Block.h index d998581a50f..7dc202d1851 100644 --- a/src/Core/Block.h +++ b/src/Core/Block.h @@ -177,6 +177,7 @@ private: friend class ActionsDAG; }; +using ConstBlockPtr = std::shared_ptr; using BlockPtr = std::shared_ptr; using Blocks = std::vector; using BlocksList = std::list; diff --git a/src/Processors/ISimpleTransform.cpp b/src/Processors/ISimpleTransform.cpp index ac8f2f8b7ae..c3fd14fcbb7 100644 --- a/src/Processors/ISimpleTransform.cpp +++ b/src/Processors/ISimpleTransform.cpp @@ -12,6 +12,14 @@ ISimpleTransform::ISimpleTransform(Block input_header_, Block output_header_, bo { } +ISimpleTransform::ISimpleTransform(ConstBlockPtr input_header_, ConstBlockPtr output_header_, bool skip_empty_chunks_) + : IProcessor({std::move(input_header_)}, {std::move(output_header_)}) + , input(inputs.front()) + , output(outputs.front()) + , skip_empty_chunks(skip_empty_chunks_) +{ +} + ISimpleTransform::Status ISimpleTransform::prepare() { /// Check can output. diff --git a/src/Processors/ISimpleTransform.h b/src/Processors/ISimpleTransform.h index 629529cdffa..3c0216ed71e 100644 --- a/src/Processors/ISimpleTransform.h +++ b/src/Processors/ISimpleTransform.h @@ -38,6 +38,7 @@ protected: public: ISimpleTransform(Block input_header_, Block output_header_, bool skip_empty_chunks_); + ISimpleTransform(ConstBlockPtr input_header_, ConstBlockPtr output_header_, bool skip_empty_chunks_); virtual void transform(Chunk &) = 0; diff --git a/src/Processors/Port.cpp b/src/Processors/Port.cpp index 79532dd4d6c..f35a2e6e340 100644 --- a/src/Processors/Port.cpp +++ b/src/Processors/Port.cpp @@ -11,10 +11,10 @@ namespace ErrorCodes void connect(OutputPort & output, InputPort & input, bool reconnect) { if (!reconnect && input.state) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", input.header.dumpStructure()); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", input.header->dumpStructure()); if (!reconnect && output.state) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", output.header.dumpStructure()); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", output.header->dumpStructure()); auto out_name = output.processor ? output.getProcessor().getName() : "null"; auto in_name = input.processor ? input.getProcessor().getName() : "null"; diff --git a/src/Processors/Port.h b/src/Processors/Port.h index f3c7bbb5fee..fbf5da9076b 100644 --- a/src/Processors/Port.h +++ b/src/Processors/Port.h @@ -20,7 +20,7 @@ class IProcessor; namespace ErrorCodes { - extern const int LOGICAL_ERROR; +extern const int LOGICAL_ERROR; } class Port @@ -200,7 +200,7 @@ protected: std::atomic data; }; - Block header; + const ConstBlockPtr header; std::shared_ptr state; /// This object is only used for data exchange between port and shared state. @@ -214,12 +214,16 @@ protected: public: using Data = State::Data; - Port(Block header_) : header(std::move(header_)) {} /// NOLINT - Port(Block header_, IProcessor * processor_) : header(std::move(header_)), processor(processor_) {} + + Port(ConstBlockPtr header_) : header(header_) { } /// NOLINT + Port(Block && header_) : header(std::make_shared(std::move(header_))) { } /// NOLINT + Port(const Block & header_) : header(std::make_shared(std::move(header_))) { } /// NOLINT + Port(Block header_, IProcessor * processor_) : header(std::make_shared(std::move(header_))), processor(processor_) { } void setUpdateInfo(UpdateInfo * info) { update_info = info; } - const Block & getHeader() const { return header; } + const Block & getHeader() const { return *header; } + const ConstBlockPtr & getHeaderPtr() const { return header; } bool ALWAYS_INLINE isConnected() const { return state != nullptr; } void ALWAYS_INLINE assumeConnected() const @@ -289,7 +293,7 @@ public: is_finished = flags & State::IS_FINISHED; - if (unlikely(!data->exception && data->chunk.getNumColumns() != header.columns())) + if (unlikely(!data->exception && data->chunk.getNumColumns() != header->columns())) { auto & chunk = data->chunk; @@ -298,9 +302,9 @@ public: "Invalid number of columns in chunk pulled from OutputPort. Expected {}, found {}\n" "Header: {}\n" "Chunk: {}\n", - header.columns(), + header->columns(), chunk.getNumColumns(), - header.dumpStructure(), + header->dumpStructure(), chunk.dumpStructure()); } @@ -410,16 +414,16 @@ public: void ALWAYS_INLINE pushData(Data data_) { - if (unlikely(!data_.exception && data_.chunk.getNumColumns() != header.columns())) + if (unlikely(!data_.exception && data_.chunk.getNumColumns() != header->columns())) { throw Exception( ErrorCodes::LOGICAL_ERROR, "Invalid number of columns in chunk pushed to OutputPort. Expected {}, found {}\n" "Header: {}\n" "Chunk: {}\n", - header.columns(), + header->columns(), data_.chunk.getNumColumns(), - header.dumpStructure(), + header->dumpStructure(), data_.chunk.dumpStructure()); } diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index 6f88c4527a4..a45fab6ecd4 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -38,7 +38,7 @@ void ExpressionStep::transformPipeline(QueryPipelineBuilder & pipeline, const Bu { auto expression = std::make_shared(std::move(actions_dag), settings.getActionsSettings()); - pipeline.addSimpleTransform([&](const Block & header) + pipeline.addSimpleTransform([&](const ConstBlockPtr & header) { return std::make_shared(header, expression); }); @@ -51,7 +51,7 @@ void ExpressionStep::transformPipeline(QueryPipelineBuilder & pipeline, const Bu ActionsDAG::MatchColumnsMode::Name); auto convert_actions = std::make_shared(std::move(convert_actions_dag), settings.getActionsSettings()); - pipeline.addSimpleTransform([&](const Block & header) + pipeline.addSimpleTransform([&](const ConstBlockPtr & header) { return std::make_shared(header, convert_actions); }); diff --git a/src/Processors/Transforms/ExpressionTransform.cpp b/src/Processors/Transforms/ExpressionTransform.cpp index 04fabc9a3c6..c1cb8260dfb 100644 --- a/src/Processors/Transforms/ExpressionTransform.cpp +++ b/src/Processors/Transforms/ExpressionTransform.cpp @@ -1,5 +1,7 @@ +#include #include #include +#include namespace DB @@ -11,6 +13,12 @@ Block ExpressionTransform::transformHeader(const Block & header, const ActionsDA } +ExpressionTransform::ExpressionTransform(const ConstBlockPtr & header_, ExpressionActionsPtr expression_) + : ISimpleTransform(header_, std::make_shared(transformHeader(*header_, expression_->getActionsDAG())), false) + , expression(std::move(expression_)) +{ +} + ExpressionTransform::ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_) : ISimpleTransform(header_, transformHeader(header_, expression_->getActionsDAG()), false) , expression(std::move(expression_)) diff --git a/src/Processors/Transforms/ExpressionTransform.h b/src/Processors/Transforms/ExpressionTransform.h index cd2aae044d5..fa50c679b49 100644 --- a/src/Processors/Transforms/ExpressionTransform.h +++ b/src/Processors/Transforms/ExpressionTransform.h @@ -1,6 +1,7 @@ #pragma once -#include #include +#include +#include "Core/Block.h" namespace DB { @@ -18,10 +19,8 @@ class ActionsDAG; class ExpressionTransform final : public ISimpleTransform { public: - ExpressionTransform( - const Block & header_, - ExpressionActionsPtr expression_); - + ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_); + ExpressionTransform(const ConstBlockPtr & header_, ExpressionActionsPtr expression_); String getName() const override { return "ExpressionTransform"; } static Block transformHeader(const Block & header, const ActionsDAG & expression); diff --git a/src/QueryPipeline/Pipe.cpp b/src/QueryPipeline/Pipe.cpp index 34602ecccee..30dd7a77f04 100644 --- a/src/QueryPipeline/Pipe.cpp +++ b/src/QueryPipeline/Pipe.cpp @@ -576,7 +576,7 @@ void Pipe::addTransform( max_parallel_streams = std::max(max_parallel_streams, output_ports.size()); } -void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter) +void Pipe::addSimpleTransform(const ProcessorGetterSharedHeaderWithStreamKind & getter) { if (output_ports.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add simple transform to empty Pipe."); @@ -588,7 +588,7 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter) if (!port) return; - auto transform = getter(port->getHeader(), stream_type); + auto transform = getter(port->getHeaderPtr(), stream_type); if (transform) { @@ -636,11 +636,21 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter) header = std::move(new_header); } +void Pipe::addSimpleTransform(const ProcessorGetterSharedHeader & getter) +{ + addSimpleTransform([&](const ConstBlockPtr & stream_header_ptr, StreamType) { return getter(stream_header_ptr); }); +} + void Pipe::addSimpleTransform(const ProcessorGetter & getter) { addSimpleTransform([&](const Block & stream_header, StreamType) { return getter(stream_header); }); } +void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter) +{ + addSimpleTransform([&](const ConstBlockPtr & stream_header_ptr, StreamType stream_type) { return getter(*stream_header_ptr,stream_type); }); +} + void Pipe::addChains(std::vector chains) { if (output_ports.size() != chains.size()) diff --git a/src/QueryPipeline/Pipe.h b/src/QueryPipeline/Pipe.h index 19fdbd77cb2..e357ff65657 100644 --- a/src/QueryPipeline/Pipe.h +++ b/src/QueryPipeline/Pipe.h @@ -1,9 +1,10 @@ #pragma once #include -#include #include +#include #include +#include "Core/Block.h" namespace DB @@ -83,10 +84,14 @@ public: using ProcessorGetter = std::function; using ProcessorGetterWithStreamKind = std::function; + using ProcessorGetterSharedHeader = std::function; + using ProcessorGetterSharedHeaderWithStreamKind = std::function; /// Add transform with single input and single output for each port. void addSimpleTransform(const ProcessorGetter & getter); void addSimpleTransform(const ProcessorGetterWithStreamKind & getter); + void addSimpleTransform(const ProcessorGetterSharedHeader & getter); + void addSimpleTransform(const ProcessorGetterSharedHeaderWithStreamKind & getter); /// Add chain to every output port. void addChains(std::vector chains); diff --git a/src/QueryPipeline/QueryPipelineBuilder.cpp b/src/QueryPipeline/QueryPipelineBuilder.cpp index d276fed60a2..3b8e532c95f 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.cpp +++ b/src/QueryPipeline/QueryPipelineBuilder.cpp @@ -135,6 +135,18 @@ void QueryPipelineBuilder::addSimpleTransform(const Pipe::ProcessorGetterWithStr pipe.addSimpleTransform(getter); } +void QueryPipelineBuilder::addSimpleTransform(const Pipe::ProcessorGetterSharedHeader & getter) +{ + checkInitializedAndNotCompleted(); + pipe.addSimpleTransform(getter); +} + +void QueryPipelineBuilder::addSimpleTransform(const Pipe::ProcessorGetterSharedHeaderWithStreamKind & getter) +{ + checkInitializedAndNotCompleted(); + pipe.addSimpleTransform(getter); +} + void QueryPipelineBuilder::addTransform(ProcessorPtr transform) { checkInitializedAndNotCompleted(); diff --git a/src/QueryPipeline/QueryPipelineBuilder.h b/src/QueryPipeline/QueryPipelineBuilder.h index a9e5b1535c0..e650ac08957 100644 --- a/src/QueryPipeline/QueryPipelineBuilder.h +++ b/src/QueryPipeline/QueryPipelineBuilder.h @@ -64,6 +64,8 @@ public: /// Add transform with simple input and simple output for each port. void addSimpleTransform(const Pipe::ProcessorGetter & getter); void addSimpleTransform(const Pipe::ProcessorGetterWithStreamKind & getter); + void addSimpleTransform(const Pipe::ProcessorGetterSharedHeader & getter); + void addSimpleTransform(const Pipe::ProcessorGetterSharedHeaderWithStreamKind & getter); /// Add transform with getNumStreams() input ports. void addTransform(ProcessorPtr transform); void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes); @@ -107,9 +109,9 @@ public: /// Unite several pipelines together. Result pipeline would have common_header structure. /// If collector is used, it will collect only newly-added processors, but not processors from pipelines. static QueryPipelineBuilder unitePipelines( - std::vector> pipelines, - size_t max_threads_limit = 0, - Processors * collected_processors = nullptr); + std::vector> pipelines, + size_t max_threads_limit = 0, + Processors * collected_processors = nullptr); static QueryPipelineBuilderPtr mergePipelines( QueryPipelineBuilderPtr left,