Minimize memory copy in port headers during pipeline construction

This commit is contained in:
heymind 2024-09-29 15:18:20 +08:00
parent 64b278275c
commit bc7bd12a9c
12 changed files with 76 additions and 26 deletions

View File

@ -177,6 +177,7 @@ private:
friend class ActionsDAG;
};
using ConstBlockPtr = std::shared_ptr<const Block>;
using BlockPtr = std::shared_ptr<Block>;
using Blocks = std::vector<Block>;
using BlocksList = std::list<Block>;

View File

@ -12,6 +12,14 @@ ISimpleTransform::ISimpleTransform(Block input_header_, Block output_header_, bo
{
}
ISimpleTransform::ISimpleTransform(ConstBlockPtr input_header_, ConstBlockPtr output_header_, bool skip_empty_chunks_)
: IProcessor({std::move(input_header_)}, {std::move(output_header_)})
, input(inputs.front())
, output(outputs.front())
, skip_empty_chunks(skip_empty_chunks_)
{
}
ISimpleTransform::Status ISimpleTransform::prepare()
{
/// Check can output.

View File

@ -38,6 +38,7 @@ protected:
public:
ISimpleTransform(Block input_header_, Block output_header_, bool skip_empty_chunks_);
ISimpleTransform(ConstBlockPtr input_header_, ConstBlockPtr output_header_, bool skip_empty_chunks_);
virtual void transform(Chunk &) = 0;

View File

@ -11,10 +11,10 @@ namespace ErrorCodes
void connect(OutputPort & output, InputPort & input, bool reconnect)
{
if (!reconnect && input.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", input.header.dumpStructure());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", input.header->dumpStructure());
if (!reconnect && output.state)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", output.header.dumpStructure());
throw Exception(ErrorCodes::LOGICAL_ERROR, "Port is already connected, (header: [{}])", output.header->dumpStructure());
auto out_name = output.processor ? output.getProcessor().getName() : "null";
auto in_name = input.processor ? input.getProcessor().getName() : "null";

View File

@ -20,7 +20,7 @@ class IProcessor;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int LOGICAL_ERROR;
}
class Port
@ -200,7 +200,7 @@ protected:
std::atomic<Data *> data;
};
Block header;
const ConstBlockPtr header;
std::shared_ptr<State> state;
/// This object is only used for data exchange between port and shared state.
@ -214,12 +214,16 @@ protected:
public:
using Data = State::Data;
Port(Block header_) : header(std::move(header_)) {} /// NOLINT
Port(Block header_, IProcessor * processor_) : header(std::move(header_)), processor(processor_) {}
Port(ConstBlockPtr header_) : header(header_) { } /// NOLINT
Port(Block && header_) : header(std::make_shared<const Block>(std::move(header_))) { } /// NOLINT
Port(const Block & header_) : header(std::make_shared<const Block>(std::move(header_))) { } /// NOLINT
Port(Block header_, IProcessor * processor_) : header(std::make_shared<const Block>(std::move(header_))), processor(processor_) { }
void setUpdateInfo(UpdateInfo * info) { update_info = info; }
const Block & getHeader() const { return header; }
const Block & getHeader() const { return *header; }
const ConstBlockPtr & getHeaderPtr() const { return header; }
bool ALWAYS_INLINE isConnected() const { return state != nullptr; }
void ALWAYS_INLINE assumeConnected() const
@ -289,7 +293,7 @@ public:
is_finished = flags & State::IS_FINISHED;
if (unlikely(!data->exception && data->chunk.getNumColumns() != header.columns()))
if (unlikely(!data->exception && data->chunk.getNumColumns() != header->columns()))
{
auto & chunk = data->chunk;
@ -298,9 +302,9 @@ public:
"Invalid number of columns in chunk pulled from OutputPort. Expected {}, found {}\n"
"Header: {}\n"
"Chunk: {}\n",
header.columns(),
header->columns(),
chunk.getNumColumns(),
header.dumpStructure(),
header->dumpStructure(),
chunk.dumpStructure());
}
@ -410,16 +414,16 @@ public:
void ALWAYS_INLINE pushData(Data data_)
{
if (unlikely(!data_.exception && data_.chunk.getNumColumns() != header.columns()))
if (unlikely(!data_.exception && data_.chunk.getNumColumns() != header->columns()))
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Invalid number of columns in chunk pushed to OutputPort. Expected {}, found {}\n"
"Header: {}\n"
"Chunk: {}\n",
header.columns(),
header->columns(),
data_.chunk.getNumColumns(),
header.dumpStructure(),
header->dumpStructure(),
data_.chunk.dumpStructure());
}

View File

@ -38,7 +38,7 @@ void ExpressionStep::transformPipeline(QueryPipelineBuilder & pipeline, const Bu
{
auto expression = std::make_shared<ExpressionActions>(std::move(actions_dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header)
pipeline.addSimpleTransform([&](const ConstBlockPtr & header)
{
return std::make_shared<ExpressionTransform>(header, expression);
});
@ -51,7 +51,7 @@ void ExpressionStep::transformPipeline(QueryPipelineBuilder & pipeline, const Bu
ActionsDAG::MatchColumnsMode::Name);
auto convert_actions = std::make_shared<ExpressionActions>(std::move(convert_actions_dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header)
pipeline.addSimpleTransform([&](const ConstBlockPtr & header)
{
return std::make_shared<ExpressionTransform>(header, convert_actions);
});

View File

@ -1,5 +1,7 @@
#include <memory>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Interpreters/ExpressionActions.h>
#include <Core/Block.h>
namespace DB
@ -11,6 +13,12 @@ Block ExpressionTransform::transformHeader(const Block & header, const ActionsDA
}
ExpressionTransform::ExpressionTransform(const ConstBlockPtr & header_, ExpressionActionsPtr expression_)
: ISimpleTransform(header_, std::make_shared<const Block>(transformHeader(*header_, expression_->getActionsDAG())), false)
, expression(std::move(expression_))
{
}
ExpressionTransform::ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_)
: ISimpleTransform(header_, transformHeader(header_, expression_->getActionsDAG()), false)
, expression(std::move(expression_))

View File

@ -1,6 +1,7 @@
#pragma once
#include <Processors/Transforms/ExceptionKeepingTransform.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/Transforms/ExceptionKeepingTransform.h>
#include "Core/Block.h"
namespace DB
{
@ -18,10 +19,8 @@ class ActionsDAG;
class ExpressionTransform final : public ISimpleTransform
{
public:
ExpressionTransform(
const Block & header_,
ExpressionActionsPtr expression_);
ExpressionTransform(const Block & header_, ExpressionActionsPtr expression_);
ExpressionTransform(const ConstBlockPtr & header_, ExpressionActionsPtr expression_);
String getName() const override { return "ExpressionTransform"; }
static Block transformHeader(const Block & header, const ActionsDAG & expression);

View File

@ -576,7 +576,7 @@ void Pipe::addTransform(
max_parallel_streams = std::max<size_t>(max_parallel_streams, output_ports.size());
}
void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
void Pipe::addSimpleTransform(const ProcessorGetterSharedHeaderWithStreamKind & getter)
{
if (output_ports.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add simple transform to empty Pipe.");
@ -588,7 +588,7 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
if (!port)
return;
auto transform = getter(port->getHeader(), stream_type);
auto transform = getter(port->getHeaderPtr(), stream_type);
if (transform)
{
@ -636,11 +636,21 @@ void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
header = std::move(new_header);
}
void Pipe::addSimpleTransform(const ProcessorGetterSharedHeader & getter)
{
addSimpleTransform([&](const ConstBlockPtr & stream_header_ptr, StreamType) { return getter(stream_header_ptr); });
}
void Pipe::addSimpleTransform(const ProcessorGetter & getter)
{
addSimpleTransform([&](const Block & stream_header, StreamType) { return getter(stream_header); });
}
void Pipe::addSimpleTransform(const ProcessorGetterWithStreamKind & getter)
{
addSimpleTransform([&](const ConstBlockPtr & stream_header_ptr, StreamType stream_type) { return getter(*stream_header_ptr,stream_type); });
}
void Pipe::addChains(std::vector<Chain> chains)
{
if (output_ports.size() != chains.size())

View File

@ -1,9 +1,10 @@
#pragma once
#include <Processors/IProcessor.h>
#include <QueryPipeline/QueryPlanResourceHolder.h>
#include <QueryPipeline/Chain.h>
#include <QueryPipeline/QueryPlanResourceHolder.h>
#include <QueryPipeline/SizeLimits.h>
#include "Core/Block.h"
namespace DB
@ -83,10 +84,14 @@ public:
using ProcessorGetter = std::function<ProcessorPtr(const Block & header)>;
using ProcessorGetterWithStreamKind = std::function<ProcessorPtr(const Block & header, StreamType stream_type)>;
using ProcessorGetterSharedHeader = std::function<ProcessorPtr(const ConstBlockPtr & header)>;
using ProcessorGetterSharedHeaderWithStreamKind = std::function<ProcessorPtr(const ConstBlockPtr & header, StreamType stream_type)>;
/// Add transform with single input and single output for each port.
void addSimpleTransform(const ProcessorGetter & getter);
void addSimpleTransform(const ProcessorGetterWithStreamKind & getter);
void addSimpleTransform(const ProcessorGetterSharedHeader & getter);
void addSimpleTransform(const ProcessorGetterSharedHeaderWithStreamKind & getter);
/// Add chain to every output port.
void addChains(std::vector<Chain> chains);

View File

@ -135,6 +135,18 @@ void QueryPipelineBuilder::addSimpleTransform(const Pipe::ProcessorGetterWithStr
pipe.addSimpleTransform(getter);
}
void QueryPipelineBuilder::addSimpleTransform(const Pipe::ProcessorGetterSharedHeader & getter)
{
checkInitializedAndNotCompleted();
pipe.addSimpleTransform(getter);
}
void QueryPipelineBuilder::addSimpleTransform(const Pipe::ProcessorGetterSharedHeaderWithStreamKind & getter)
{
checkInitializedAndNotCompleted();
pipe.addSimpleTransform(getter);
}
void QueryPipelineBuilder::addTransform(ProcessorPtr transform)
{
checkInitializedAndNotCompleted();

View File

@ -64,6 +64,8 @@ public:
/// Add transform with simple input and simple output for each port.
void addSimpleTransform(const Pipe::ProcessorGetter & getter);
void addSimpleTransform(const Pipe::ProcessorGetterWithStreamKind & getter);
void addSimpleTransform(const Pipe::ProcessorGetterSharedHeader & getter);
void addSimpleTransform(const Pipe::ProcessorGetterSharedHeaderWithStreamKind & getter);
/// Add transform with getNumStreams() input ports.
void addTransform(ProcessorPtr transform);
void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes);
@ -107,9 +109,9 @@ public:
/// Unite several pipelines together. Result pipeline would have common_header structure.
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
static QueryPipelineBuilder unitePipelines(
std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines,
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);
std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines,
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);
static QueryPipelineBuilderPtr mergePipelines(
QueryPipelineBuilderPtr left,