Merge pull request #35789 from amosbird/better-pipeline1

Refactoring QueryPipeline
This commit is contained in:
Antonio Andelic 2022-04-01 14:38:19 +02:00 committed by GitHub
commit 5bcf772d7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 0 additions and 151 deletions

View File

@ -759,44 +759,6 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
header.clear(); header.clear();
} }
void Pipe::setOutputFormat(ProcessorPtr output)
{
if (output_ports.empty())
throw Exception("Cannot set output format to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
if (output_ports.size() != 1)
throw Exception("Cannot set output format to Pipe because single output port is expected, "
"but it has " + std::to_string(output_ports.size()) + " ports", ErrorCodes::LOGICAL_ERROR);
auto * format = dynamic_cast<IOutputFormat * >(output.get());
if (!format)
throw Exception("IOutputFormat processor expected for QueryPipelineBuilder::setOutputFormat.",
ErrorCodes::LOGICAL_ERROR);
auto & main = format->getPort(IOutputFormat::PortKind::Main);
auto & totals = format->getPort(IOutputFormat::PortKind::Totals);
auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes);
if (!totals_port)
addTotalsSource(std::make_shared<NullSource>(totals.getHeader()));
if (!extremes_port)
addExtremesSource(std::make_shared<NullSource>(extremes.getHeader()));
if (collected_processors)
collected_processors->emplace_back(output);
processors.emplace_back(std::move(output));
connect(*output_ports.front(), main);
connect(*totals_port, totals);
connect(*extremes_port, extremes);
output_ports.clear();
header.clear();
}
void Pipe::transform(const Transformer & transformer) void Pipe::transform(const Transformer & transformer)
{ {
if (output_ports.empty()) if (output_ports.empty())

View File

@ -141,7 +141,6 @@ private:
bool isCompleted() const { return !empty() && output_ports.empty(); } bool isCompleted() const { return !empty() && output_ports.empty(); }
static Pipe unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header); static Pipe unitePipes(Pipes pipes, Processors * collected_processors, bool allow_empty_header);
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter); void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
void setOutputFormat(ProcessorPtr output);
friend class QueryPipelineBuilder; friend class QueryPipelineBuilder;
friend class QueryPipeline; friend class QueryPipeline;

View File

@ -8,7 +8,6 @@
#include <Processors/Transforms/ExpressionTransform.h> #include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h> #include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/JoiningTransform.h> #include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Executors/PipelineExecutor.h> #include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Transforms/PartialSortingTransform.h> #include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Sources/SourceFromSingleChunk.h> #include <Processors/Sources/SourceFromSingleChunk.h>
@ -247,21 +246,6 @@ void QueryPipelineBuilder::addExtremesTransform()
pipe.addTransform(std::move(transform), nullptr, port); pipe.addTransform(std::move(transform), nullptr, port);
} }
void QueryPipelineBuilder::setOutputFormat(ProcessorPtr output)
{
checkInitializedAndNotCompleted();
if (output_format)
throw Exception("QueryPipeline already has output.", ErrorCodes::LOGICAL_ERROR);
resize(1);
output_format = dynamic_cast<IOutputFormat * >(output.get());
pipe.setOutputFormat(std::move(output));
initRowsBeforeLimit();
}
QueryPipelineBuilder QueryPipelineBuilder::unitePipelines( QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines, std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines,
size_t max_threads_limit, size_t max_threads_limit,
@ -461,93 +445,6 @@ void QueryPipelineBuilder::setProcessListElement(QueryStatus * elem)
} }
} }
void QueryPipelineBuilder::initRowsBeforeLimit()
{
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
/// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor.
std::vector<LimitTransform *> limits;
std::vector<RemoteSource *> remote_sources;
std::unordered_set<IProcessor *> visited;
struct QueuedEntry
{
IProcessor * processor;
bool visited_limit;
};
std::queue<QueuedEntry> queue;
queue.push({ output_format, false });
visited.emplace(output_format);
while (!queue.empty())
{
auto * processor = queue.front().processor;
auto visited_limit = queue.front().visited_limit;
queue.pop();
if (!visited_limit)
{
if (auto * limit = typeid_cast<LimitTransform *>(processor))
{
visited_limit = true;
limits.emplace_back(limit);
}
if (auto * source = typeid_cast<RemoteSource *>(processor))
remote_sources.emplace_back(source);
}
else if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
{
if (!rows_before_limit_at_least)
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
sorting->setRowsBeforeLimitCounter(rows_before_limit_at_least);
/// Don't go to children. Take rows_before_limit from last PartialSortingTransform.
continue;
}
/// Skip totals and extremes port for output format.
if (auto * format = dynamic_cast<IOutputFormat *>(processor))
{
auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor();
if (visited.emplace(child_processor).second)
queue.push({ child_processor, visited_limit });
continue;
}
for (auto & child_port : processor->getInputs())
{
auto * child_processor = &child_port.getOutputPort().getProcessor();
if (visited.emplace(child_processor).second)
queue.push({ child_processor, visited_limit });
}
}
if (!rows_before_limit_at_least && (!limits.empty() || !remote_sources.empty()))
{
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
for (auto & limit : limits)
limit->setRowsBeforeLimitCounter(rows_before_limit_at_least);
for (auto & source : remote_sources)
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
}
/// If there is a limit, then enable rows_before_limit_at_least
/// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result.
if (!limits.empty())
rows_before_limit_at_least->add(0);
if (rows_before_limit_at_least)
output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least);
}
PipelineExecutorPtr QueryPipelineBuilder::execute() PipelineExecutorPtr QueryPipelineBuilder::execute()
{ {
if (!isCompleted()) if (!isCompleted())

View File

@ -10,8 +10,6 @@
namespace DB namespace DB
{ {
class IOutputFormat;
class QueryPipelineProcessorsCollector; class QueryPipelineProcessorsCollector;
struct AggregatingTransformParams; struct AggregatingTransformParams;
@ -71,10 +69,6 @@ public:
void addTotalsHavingTransform(ProcessorPtr transform); void addTotalsHavingTransform(ProcessorPtr transform);
/// Add transform which calculates extremes. This transform adds extremes port and doesn't change inputs number. /// Add transform which calculates extremes. This transform adds extremes port and doesn't change inputs number.
void addExtremesTransform(); void addExtremesTransform();
/// Resize pipeline to single output and add IOutputFormat. Pipeline will be completed after this transformation.
void setOutputFormat(ProcessorPtr output);
/// Get current OutputFormat.
IOutputFormat * getOutputFormat() const { return output_format; }
/// Sink is a processor with single input port and no output ports. Creates sink for each output port. /// Sink is a processor with single input port and no output ports. Creates sink for each output port.
/// Pipeline will be completed after this transformation. /// Pipeline will be completed after this transformation.
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter); void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
@ -163,7 +157,6 @@ public:
private: private:
Pipe pipe; Pipe pipe;
IOutputFormat * output_format = nullptr;
/// Limit on the number of threads. Zero means no limit. /// Limit on the number of threads. Zero means no limit.
/// Sometimes, more streams are created then the number of threads for more optimal execution. /// Sometimes, more streams are created then the number of threads for more optimal execution.
@ -174,8 +167,6 @@ private:
void checkInitialized(); void checkInitialized();
void checkInitializedAndNotCompleted(); void checkInitializedAndNotCompleted();
void initRowsBeforeLimit();
void setCollectedProcessors(Processors * processors); void setCollectedProcessors(Processors * processors);
friend class QueryPipelineProcessorsCollector; friend class QueryPipelineProcessorsCollector;