mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
Refactor Pipe [part 5].
This commit is contained in:
parent
9385f3de0e
commit
a153f05e10
@ -87,8 +87,8 @@ BlockIO InterpreterAlterQuery::execute()
|
|||||||
if (!partition_commands.empty())
|
if (!partition_commands.empty())
|
||||||
{
|
{
|
||||||
table->checkAlterPartitionIsPossible(partition_commands, metadata_snapshot, context.getSettingsRef());
|
table->checkAlterPartitionIsPossible(partition_commands, metadata_snapshot, context.getSettingsRef());
|
||||||
auto partition_commands_pipes = table->alterPartition(query_ptr, metadata_snapshot, partition_commands, context);
|
auto partition_commands_pipe = table->alterPartition(query_ptr, metadata_snapshot, partition_commands, context);
|
||||||
if (!partition_commands_pipes.empty())
|
if (!partition_commands_pipe.empty())
|
||||||
res.pipeline.init(std::move(partition_commands_pipes));
|
res.pipeline.init(std::move(partition_commands_pipes));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,6 +6,8 @@
|
|||||||
#include <Processors/LimitTransform.h>
|
#include <Processors/LimitTransform.h>
|
||||||
#include <Processors/NullSink.h>
|
#include <Processors/NullSink.h>
|
||||||
#include <Processors/Transforms/ExtremesTransform.h>
|
#include <Processors/Transforms/ExtremesTransform.h>
|
||||||
|
#include <Processors/Formats/IOutputFormat.h>
|
||||||
|
#include <Processors/Sources/NullSource.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -374,7 +376,37 @@ void Pipe::addExtremesSource(ProcessorPtr source)
|
|||||||
processors.emplace_back(std::move(source));
|
processors.emplace_back(std::move(source));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dropPort(OutputPort *& port, Processors & processors, Processors * collected_processors)
|
||||||
|
{
|
||||||
|
if (port == nullptr)
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto null_sink = std::make_shared<NullSink>(port->getHeader());
|
||||||
|
connect(*port, null_sink->getPort());
|
||||||
|
|
||||||
|
if (collected_processors)
|
||||||
|
collected_processors->emplace_back(null_sink.get());
|
||||||
|
|
||||||
|
processors.emplace_back(std::move(null_sink));
|
||||||
|
port = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Pipe::dropTotals()
|
||||||
|
{
|
||||||
|
dropPort(totals_port, processors, collected_processors);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Pipe::dropExtremes()
|
||||||
|
{
|
||||||
|
dropPort(extremes_port, processors, collected_processors);
|
||||||
|
}
|
||||||
|
|
||||||
void Pipe::addTransform(ProcessorPtr transform)
|
void Pipe::addTransform(ProcessorPtr transform)
|
||||||
|
{
|
||||||
|
addTransform(std::move(transform), nullptr, nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Pipe::addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes)
|
||||||
{
|
{
|
||||||
if (output_ports.empty())
|
if (output_ports.empty())
|
||||||
throw Exception("Cannot add transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Cannot add transform to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
|
||||||
@ -385,6 +417,19 @@ void Pipe::addTransform(ProcessorPtr transform)
|
|||||||
"Processor has " + std::to_string(inputs.size()) + " input ports, "
|
"Processor has " + std::to_string(inputs.size()) + " input ports, "
|
||||||
"but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR);
|
"but " + std::to_string(output_ports.size()) + " expected", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
if (totals && totals_port)
|
||||||
|
throw Exception("Cannot add transform with totals to Pipe because it already has totals.",
|
||||||
|
ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
if (extremes && extremes_port)
|
||||||
|
throw Exception("Cannot add transform with totals to Pipe because it already has totals.",
|
||||||
|
ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
if (totals)
|
||||||
|
totals_port = totals;
|
||||||
|
if (extremes)
|
||||||
|
extremes_port = extremes;
|
||||||
|
|
||||||
size_t next_output = 0;
|
size_t next_output = 0;
|
||||||
for (auto & input : inputs)
|
for (auto & input : inputs)
|
||||||
{
|
{
|
||||||
@ -393,15 +438,34 @@ void Pipe::addTransform(ProcessorPtr transform)
|
|||||||
}
|
}
|
||||||
|
|
||||||
auto & outputs = transform->getOutputs();
|
auto & outputs = transform->getOutputs();
|
||||||
if (outputs.empty())
|
|
||||||
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because it has no outputs",
|
|
||||||
ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
output_ports.clear();
|
output_ports.clear();
|
||||||
output_ports.reserve(outputs.size());
|
output_ports.reserve(outputs.size());
|
||||||
|
|
||||||
|
bool found_totals = false;
|
||||||
|
bool found_extremes = false;
|
||||||
|
|
||||||
for (auto & output : outputs)
|
for (auto & output : outputs)
|
||||||
output_ports.emplace_back(&output);
|
{
|
||||||
|
if (&output == totals)
|
||||||
|
found_totals = true;
|
||||||
|
else if (&output == extremes)
|
||||||
|
found_extremes = true;
|
||||||
|
else
|
||||||
|
output_ports.emplace_back(&output);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (totals && !found_totals)
|
||||||
|
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
|
||||||
|
"specified totals port does not belong to it", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
if (extremes && !found_extremes)
|
||||||
|
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because "
|
||||||
|
"specified extremes port does not belong to it", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
if (output_ports.empty())
|
||||||
|
throw Exception("Cannot add transform " + transform->getName() + " to Pipes because it has no outputs",
|
||||||
|
ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
header = output_ports.front()->getHeader();
|
header = output_ports.front()->getHeader();
|
||||||
for (size_t i = 1; i < output_ports.size(); ++i)
|
for (size_t i = 1; i < output_ports.size(); ++i)
|
||||||
@ -524,6 +588,44 @@ void Pipe::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
|
|||||||
header.clear();
|
header.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Pipe::setOutputFormat(ProcessorPtr output)
|
||||||
|
{
|
||||||
|
if (output_ports.empty())
|
||||||
|
throw Exception("Cannot set output format to empty Pipe.", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
if (output_ports.size() != 1)
|
||||||
|
throw Exception("Cannot set output format to Pipe because single output port is expected, "
|
||||||
|
"but it has " + std::to_string(output_ports.size()) + " ports", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
auto * format = dynamic_cast<IOutputFormat * >(output.get());
|
||||||
|
|
||||||
|
if (!format)
|
||||||
|
throw Exception("IOutputFormat processor expected for QueryPipeline::setOutputFormat.",
|
||||||
|
ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
auto & main = format->getPort(IOutputFormat::PortKind::Main);
|
||||||
|
auto & totals = format->getPort(IOutputFormat::PortKind::Totals);
|
||||||
|
auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes);
|
||||||
|
|
||||||
|
if (!totals_port)
|
||||||
|
addTotalsSource(std::make_shared<NullSource>(totals.getHeader()));
|
||||||
|
|
||||||
|
if (!extremes_port)
|
||||||
|
addExtremesSource(std::make_shared<NullSource>(extremes.getHeader()));
|
||||||
|
|
||||||
|
if (collected_processors)
|
||||||
|
collected_processors->emplace_back(output.get());
|
||||||
|
|
||||||
|
processors.emplace_back(std::move(output));
|
||||||
|
|
||||||
|
connect(*output_ports.front(), main);
|
||||||
|
connect(*totals_port, totals);
|
||||||
|
connect(*extremes_port, extremes);
|
||||||
|
|
||||||
|
output_ports.clear();
|
||||||
|
header.clear();
|
||||||
|
}
|
||||||
|
|
||||||
void Pipe::transform(const Transformer & transformer)
|
void Pipe::transform(const Transformer & transformer)
|
||||||
{
|
{
|
||||||
if (output_ports.empty())
|
if (output_ports.empty())
|
||||||
@ -620,13 +722,4 @@ void Pipe::setQuota(const std::shared_ptr<const EnabledQuota> & quota)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Pipe::enableQuota()
|
|
||||||
{
|
|
||||||
for (auto & processor : processors)
|
|
||||||
{
|
|
||||||
if (auto * source = dynamic_cast<ISource *>(processor.get()))
|
|
||||||
source->enableQuota();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -52,10 +52,15 @@ public:
|
|||||||
void addTotalsSource(ProcessorPtr source);
|
void addTotalsSource(ProcessorPtr source);
|
||||||
void addExtremesSource(ProcessorPtr source);
|
void addExtremesSource(ProcessorPtr source);
|
||||||
|
|
||||||
|
/// Drop totals and extremes (create NullSink for them).
|
||||||
|
void dropTotals();
|
||||||
|
void dropExtremes();
|
||||||
|
|
||||||
/// Add processor to list. It should have size() input ports with compatible header.
|
/// Add processor to list. It should have size() input ports with compatible header.
|
||||||
/// Output ports should have same headers.
|
/// Output ports should have same headers.
|
||||||
/// If totals or extremes are not empty, transform shouldn't change header.
|
/// If totals or extremes are not empty, transform shouldn't change header.
|
||||||
void addTransform(ProcessorPtr transform);
|
void addTransform(ProcessorPtr transform);
|
||||||
|
void addTransform(ProcessorPtr transform, OutputPort * totals, OutputPort * extremes);
|
||||||
|
|
||||||
enum class StreamType
|
enum class StreamType
|
||||||
{
|
{
|
||||||
@ -85,7 +90,6 @@ public:
|
|||||||
/// Specify quotas and limits for every ISourceWithProgress.
|
/// Specify quotas and limits for every ISourceWithProgress.
|
||||||
void setLimits(const SourceWithProgress::LocalLimits & limits);
|
void setLimits(const SourceWithProgress::LocalLimits & limits);
|
||||||
void setQuota(const std::shared_ptr<const EnabledQuota> & quota);
|
void setQuota(const std::shared_ptr<const EnabledQuota> & quota);
|
||||||
void enableQuota();
|
|
||||||
|
|
||||||
/// Do not allow to change the table while the processors of pipe are alive.
|
/// Do not allow to change the table while the processors of pipe are alive.
|
||||||
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
|
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
|
||||||
@ -125,6 +129,7 @@ private:
|
|||||||
bool isCompleted() const { return !empty() && output_ports.empty(); }
|
bool isCompleted() const { return !empty() && output_ports.empty(); }
|
||||||
static Pipe unitePipes(Pipes pipes, Processors * collected_processors);
|
static Pipe unitePipes(Pipes pipes, Processors * collected_processors);
|
||||||
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
|
void setSinks(const Pipe::ProcessorGetterWithStreamKind & getter);
|
||||||
|
void setOutputFormat(ProcessorPtr output);
|
||||||
|
|
||||||
friend class QueryPipeline;
|
friend class QueryPipeline;
|
||||||
};
|
};
|
||||||
|
@ -2,9 +2,7 @@
|
|||||||
|
|
||||||
#include <Processors/ResizeProcessor.h>
|
#include <Processors/ResizeProcessor.h>
|
||||||
#include <Processors/ConcatProcessor.h>
|
#include <Processors/ConcatProcessor.h>
|
||||||
#include <Processors/NullSink.h>
|
|
||||||
#include <Processors/LimitTransform.h>
|
#include <Processors/LimitTransform.h>
|
||||||
#include <Processors/Sources/NullSource.h>
|
|
||||||
#include <Processors/Transforms/TotalsHavingTransform.h>
|
#include <Processors/Transforms/TotalsHavingTransform.h>
|
||||||
#include <Processors/Transforms/ExtremesTransform.h>
|
#include <Processors/Transforms/ExtremesTransform.h>
|
||||||
#include <Processors/Transforms/CreatingSetsTransform.h>
|
#include <Processors/Transforms/CreatingSetsTransform.h>
|
||||||
@ -148,8 +146,7 @@ void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform)
|
|||||||
resize(1);
|
resize(1);
|
||||||
|
|
||||||
auto * totals_port = &transform->getOutputs().back();
|
auto * totals_port = &transform->getOutputs().back();
|
||||||
pipe.addTransform(std::move(transform));
|
pipe.addTransform(std::move(transform), totals_port, nullptr);
|
||||||
pipe.totals_port = totals_port;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueryPipeline::addDefaultTotals()
|
void QueryPipeline::addDefaultTotals()
|
||||||
@ -176,46 +173,21 @@ void QueryPipeline::addDefaultTotals()
|
|||||||
|
|
||||||
void QueryPipeline::dropTotalsAndExtremes()
|
void QueryPipeline::dropTotalsAndExtremes()
|
||||||
{
|
{
|
||||||
auto drop_port = [&](OutputPort *& port)
|
pipe.dropTotals();
|
||||||
{
|
pipe.dropExtremes();
|
||||||
auto null_sink = std::make_shared<NullSink>(port->getHeader());
|
|
||||||
connect(*port, null_sink->getPort());
|
|
||||||
processors.emplace(std::move(null_sink));
|
|
||||||
port = nullptr;
|
|
||||||
};
|
|
||||||
|
|
||||||
if (totals_having_port)
|
|
||||||
drop_port(totals_having_port);
|
|
||||||
|
|
||||||
if (extremes_port)
|
|
||||||
drop_port(extremes_port);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueryPipeline::addExtremesTransform()
|
void QueryPipeline::addExtremesTransform()
|
||||||
{
|
{
|
||||||
checkInitializedAndNotCompleted();
|
checkInitializedAndNotCompleted();
|
||||||
|
|
||||||
if (extremes_port)
|
if (pipe.getExtremesPort())
|
||||||
throw Exception("Extremes transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Extremes transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
std::vector<OutputPort *> extremes;
|
resize(1);
|
||||||
extremes.reserve(streams.size());
|
auto transform = std::make_shared<ExtremesTransform>(getHeader());
|
||||||
|
auto * port = &transform->getExtremesPort();
|
||||||
for (auto & stream : streams)
|
pipe.addTransform(std::move(transform), nullptr, port);
|
||||||
{
|
|
||||||
auto transform = std::make_shared<ExtremesTransform>(current_header);
|
|
||||||
connect(*stream, transform->getInputPort());
|
|
||||||
|
|
||||||
stream = &transform->getOutputPort();
|
|
||||||
extremes.push_back(&transform->getExtremesPort());
|
|
||||||
|
|
||||||
processors.emplace(std::move(transform));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (extremes.size() == 1)
|
|
||||||
extremes_port = extremes.front();
|
|
||||||
else
|
|
||||||
extremes_port = uniteExtremes(extremes, current_header, processors);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform)
|
void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform)
|
||||||
@ -228,94 +200,49 @@ void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform)
|
|||||||
|
|
||||||
resize(1);
|
resize(1);
|
||||||
|
|
||||||
auto concat = std::make_shared<ConcatProcessor>(current_header, 2);
|
/// Order is important for concat. Connect manually.
|
||||||
connect(transform->getOutputs().front(), concat->getInputs().front());
|
pipe.transform([&](OutputPortRawPtrs ports) -> Processors
|
||||||
connect(*streams.back(), concat->getInputs().back());
|
{
|
||||||
|
auto concat = std::make_shared<ConcatProcessor>(getHeader(), 2);
|
||||||
streams.assign({ &concat->getOutputs().front() });
|
connect(transform->getOutputs().front(), concat->getInputs().front());
|
||||||
processors.emplace(std::move(transform));
|
connect(*ports.back(), concat->getInputs().back());
|
||||||
processors.emplace(std::move(concat));
|
return { std::move(concat), std::move(transform) };
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueryPipeline::setOutputFormat(ProcessorPtr output)
|
void QueryPipeline::setOutputFormat(ProcessorPtr output)
|
||||||
{
|
{
|
||||||
checkInitializedAndNotCompleted();
|
checkInitializedAndNotCompleted();
|
||||||
|
|
||||||
auto * format = dynamic_cast<IOutputFormat * >(output.get());
|
|
||||||
|
|
||||||
if (!format)
|
|
||||||
throw Exception("IOutputFormat processor expected for QueryPipeline::setOutputFormat.", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
if (output_format)
|
if (output_format)
|
||||||
throw Exception("QueryPipeline already has output.", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("QueryPipeline already has output.", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
output_format = format;
|
|
||||||
|
|
||||||
resize(1);
|
resize(1);
|
||||||
|
|
||||||
auto & main = format->getPort(IOutputFormat::PortKind::Main);
|
output_format = dynamic_cast<IOutputFormat * >(output.get());
|
||||||
auto & totals = format->getPort(IOutputFormat::PortKind::Totals);
|
pipe.setOutputFormat(std::move(output));
|
||||||
auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes);
|
|
||||||
|
|
||||||
if (!totals_having_port)
|
|
||||||
{
|
|
||||||
auto null_source = std::make_shared<NullSource>(totals.getHeader());
|
|
||||||
totals_having_port = &null_source->getPort();
|
|
||||||
processors.emplace(std::move(null_source));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!extremes_port)
|
|
||||||
{
|
|
||||||
auto null_source = std::make_shared<NullSource>(extremes.getHeader());
|
|
||||||
extremes_port = &null_source->getPort();
|
|
||||||
processors.emplace(std::move(null_source));
|
|
||||||
}
|
|
||||||
|
|
||||||
processors.emplace(std::move(output));
|
|
||||||
|
|
||||||
connect(*streams.front(), main);
|
|
||||||
connect(*totals_having_port, totals);
|
|
||||||
connect(*extremes_port, extremes);
|
|
||||||
|
|
||||||
streams.clear();
|
|
||||||
current_header.clear();
|
|
||||||
extremes_port = nullptr;
|
|
||||||
totals_having_port = nullptr;
|
|
||||||
|
|
||||||
initRowsBeforeLimit();
|
initRowsBeforeLimit();
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueryPipeline::unitePipelines(
|
QueryPipeline QueryPipeline::unitePipelines(
|
||||||
std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header, size_t max_threads_limit)
|
std::vector<std::unique_ptr<QueryPipeline>> pipelines,
|
||||||
|
const Block & common_header,
|
||||||
|
size_t max_threads_limit,
|
||||||
|
Processors * collected_processors)
|
||||||
{
|
{
|
||||||
/// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0.
|
/// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0.
|
||||||
/// If true, result max_threads will be sum(max_threads).
|
/// If true, result max_threads will be sum(max_threads).
|
||||||
/// Note: it may be > than settings.max_threads, so we should apply this limit again.
|
/// Note: it may be > than settings.max_threads, so we should apply this limit again.
|
||||||
bool will_limit_max_threads = !initialized() || max_threads != 0;
|
bool will_limit_max_threads = true;
|
||||||
|
size_t max_threads = 0;
|
||||||
if (initialized())
|
Pipes pipes;
|
||||||
{
|
|
||||||
addSimpleTransform([&](const Block & header)
|
|
||||||
{
|
|
||||||
return std::make_shared<ConvertingTransform>(
|
|
||||||
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<OutputPort *> extremes;
|
|
||||||
std::vector<OutputPort *> totals;
|
|
||||||
|
|
||||||
if (extremes_port)
|
|
||||||
extremes.push_back(extremes_port);
|
|
||||||
|
|
||||||
if (totals_having_port)
|
|
||||||
totals.push_back(totals_having_port);
|
|
||||||
|
|
||||||
for (auto & pipeline_ptr : pipelines)
|
for (auto & pipeline_ptr : pipelines)
|
||||||
{
|
{
|
||||||
auto & pipeline = *pipeline_ptr;
|
auto & pipeline = *pipeline_ptr;
|
||||||
pipeline.checkInitialized();
|
pipeline.checkInitialized();
|
||||||
pipeline.processors.setCollectedProcessors(processors.getCollectedProcessors());
|
pipeline.pipe.collected_processors = collected_processors;
|
||||||
|
|
||||||
if (!pipeline.isCompleted())
|
if (!pipeline.isCompleted())
|
||||||
{
|
{
|
||||||
@ -326,36 +253,7 @@ void QueryPipeline::unitePipelines(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pipeline.extremes_port)
|
pipes.emplace_back(std::move(pipeline.pipe));
|
||||||
{
|
|
||||||
auto converting = std::make_shared<ConvertingTransform>(
|
|
||||||
pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position);
|
|
||||||
|
|
||||||
connect(*pipeline.extremes_port, converting->getInputPort());
|
|
||||||
extremes.push_back(&converting->getOutputPort());
|
|
||||||
processors.emplace(std::move(converting));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Take totals only from first port.
|
|
||||||
if (pipeline.totals_having_port)
|
|
||||||
{
|
|
||||||
auto converting = std::make_shared<ConvertingTransform>(
|
|
||||||
pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position);
|
|
||||||
|
|
||||||
connect(*pipeline.totals_having_port, converting->getInputPort());
|
|
||||||
totals.push_back(&converting->getOutputPort());
|
|
||||||
processors.emplace(std::move(converting));
|
|
||||||
}
|
|
||||||
|
|
||||||
auto * collector = processors.setCollectedProcessors(nullptr);
|
|
||||||
processors.emplace(pipeline.processors.detach());
|
|
||||||
processors.setCollectedProcessors(collector);
|
|
||||||
|
|
||||||
streams.addStreams(pipeline.streams);
|
|
||||||
|
|
||||||
table_locks.insert(table_locks.end(), std::make_move_iterator(pipeline.table_locks.begin()), std::make_move_iterator(pipeline.table_locks.end()));
|
|
||||||
interpreter_context.insert(interpreter_context.end(), pipeline.interpreter_context.begin(), pipeline.interpreter_context.end());
|
|
||||||
storage_holders.insert(storage_holders.end(), pipeline.storage_holders.begin(), pipeline.storage_holders.end());
|
|
||||||
|
|
||||||
max_threads += pipeline.max_threads;
|
max_threads += pipeline.max_threads;
|
||||||
will_limit_max_threads = will_limit_max_threads && pipeline.max_threads != 0;
|
will_limit_max_threads = will_limit_max_threads && pipeline.max_threads != 0;
|
||||||
@ -366,33 +264,21 @@ void QueryPipeline::unitePipelines(
|
|||||||
max_threads_limit = pipeline.max_threads;
|
max_threads_limit = pipeline.max_threads;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!will_limit_max_threads)
|
QueryPipeline pipeline;
|
||||||
max_threads = 0;
|
pipeline.init(Pipe::unitePipes(std::move(pipes), collected_processors));
|
||||||
else
|
|
||||||
limitMaxThreads(max_threads_limit);
|
|
||||||
|
|
||||||
if (!extremes.empty())
|
if (will_limit_max_threads)
|
||||||
{
|
{
|
||||||
if (extremes.size() == 1)
|
pipeline.setMaxThreads(max_threads);
|
||||||
extremes_port = extremes.back();
|
pipeline.limitMaxThreads(max_threads_limit);
|
||||||
else
|
|
||||||
extremes_port = uniteExtremes(extremes, common_header, processors);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!totals.empty())
|
return pipeline;
|
||||||
{
|
|
||||||
if (totals.size() == 1)
|
|
||||||
totals_having_port = totals.back();
|
|
||||||
else
|
|
||||||
totals_having_port = uniteTotals(totals, common_header, processors);
|
|
||||||
}
|
|
||||||
|
|
||||||
current_header = common_header;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
|
void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
|
||||||
{
|
{
|
||||||
for (auto & processor : processors.get())
|
for (auto & processor : pipe.processors)
|
||||||
{
|
{
|
||||||
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
|
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
|
||||||
source->setProgressCallback(callback);
|
source->setProgressCallback(callback);
|
||||||
@ -406,7 +292,7 @@ void QueryPipeline::setProcessListElement(QueryStatus * elem)
|
|||||||
{
|
{
|
||||||
process_list_element = elem;
|
process_list_element = elem;
|
||||||
|
|
||||||
for (auto & processor : processors.get())
|
for (auto & processor : pipe.processors)
|
||||||
{
|
{
|
||||||
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
|
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
|
||||||
source->setProcessListElement(elem);
|
source->setProcessListElement(elem);
|
||||||
@ -510,101 +396,28 @@ void QueryPipeline::initRowsBeforeLimit()
|
|||||||
output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
||||||
}
|
}
|
||||||
|
|
||||||
Pipe QueryPipeline::getPipe() &&
|
|
||||||
{
|
|
||||||
Pipes pipes(processors.detach(), streams.at(0), totals_having_port, extremes_port);
|
|
||||||
pipe.max_parallel_streams = streams.maxParallelStreams();
|
|
||||||
|
|
||||||
for (auto & lock : table_locks)
|
|
||||||
pipe.addTableLock(lock);
|
|
||||||
|
|
||||||
for (auto & context : interpreter_context)
|
|
||||||
pipe.addInterpreterContext(context);
|
|
||||||
|
|
||||||
for (auto & storage : storage_holders)
|
|
||||||
pipe.addStorageHolder(storage);
|
|
||||||
|
|
||||||
if (totals_having_port)
|
|
||||||
pipe.setTotalsPort(totals_having_port);
|
|
||||||
|
|
||||||
if (extremes_port)
|
|
||||||
pipe.setExtremesPort(extremes_port);
|
|
||||||
|
|
||||||
Pipes pipes;
|
|
||||||
pipes.emplace_back(std::move(pipe));
|
|
||||||
|
|
||||||
for (size_t i = 1; i < streams.size(); ++i)
|
|
||||||
pipes.emplace_back(Pipe(streams[i]));
|
|
||||||
|
|
||||||
return pipes;
|
|
||||||
}
|
|
||||||
|
|
||||||
PipelineExecutorPtr QueryPipeline::execute()
|
PipelineExecutorPtr QueryPipeline::execute()
|
||||||
{
|
{
|
||||||
if (!isCompleted())
|
if (!isCompleted())
|
||||||
throw Exception("Cannot execute pipeline because it is not completed.", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Cannot execute pipeline because it is not completed.", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
return std::make_shared<PipelineExecutor>(processors.get(), process_list_element);
|
return std::make_shared<PipelineExecutor>(pipe.processors, process_list_element);
|
||||||
}
|
}
|
||||||
|
|
||||||
QueryPipeline & QueryPipeline::operator= (QueryPipeline && rhs)
|
void QueryPipeline::setCollectedProcessors(Processors * processors)
|
||||||
{
|
{
|
||||||
/// Reset primitive fields
|
pipe.collected_processors = processors;
|
||||||
process_list_element = rhs.process_list_element;
|
|
||||||
rhs.process_list_element = nullptr;
|
|
||||||
max_threads = rhs.max_threads;
|
|
||||||
rhs.max_threads = 0;
|
|
||||||
output_format = rhs.output_format;
|
|
||||||
rhs.output_format = nullptr;
|
|
||||||
extremes_port = rhs.extremes_port;
|
|
||||||
rhs.extremes_port = nullptr;
|
|
||||||
totals_having_port = rhs.totals_having_port;
|
|
||||||
rhs.totals_having_port = nullptr;
|
|
||||||
|
|
||||||
/// Move these fields in destruction order (it's important)
|
|
||||||
streams = std::move(rhs.streams);
|
|
||||||
processors = std::move(rhs.processors);
|
|
||||||
current_header = std::move(rhs.current_header);
|
|
||||||
table_locks = std::move(rhs.table_locks);
|
|
||||||
storage_holders = std::move(rhs.storage_holders);
|
|
||||||
interpreter_context = std::move(rhs.interpreter_context);
|
|
||||||
|
|
||||||
return *this;
|
|
||||||
}
|
|
||||||
|
|
||||||
void QueryPipeline::ProcessorsContainer::emplace(ProcessorPtr processor)
|
|
||||||
{
|
|
||||||
if (collected_processors)
|
|
||||||
collected_processors->emplace_back(processor);
|
|
||||||
|
|
||||||
processors.emplace_back(std::move(processor));
|
|
||||||
}
|
|
||||||
|
|
||||||
void QueryPipeline::ProcessorsContainer::emplace(Processors processors_)
|
|
||||||
{
|
|
||||||
for (auto & processor : processors_)
|
|
||||||
emplace(std::move(processor));
|
|
||||||
}
|
|
||||||
|
|
||||||
Processors * QueryPipeline::ProcessorsContainer::setCollectedProcessors(Processors * collected_processors_)
|
|
||||||
{
|
|
||||||
if (collected_processors && collected_processors_)
|
|
||||||
throw Exception("Cannot set collected processors to QueryPipeline because "
|
|
||||||
"another one object was already created for current pipeline." , ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
std::swap(collected_processors, collected_processors_);
|
|
||||||
return collected_processors_;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
QueryPipelineProcessorsCollector::QueryPipelineProcessorsCollector(QueryPipeline & pipeline_, IQueryPlanStep * step_)
|
QueryPipelineProcessorsCollector::QueryPipelineProcessorsCollector(QueryPipeline & pipeline_, IQueryPlanStep * step_)
|
||||||
: pipeline(pipeline_), step(step_)
|
: pipeline(pipeline_), step(step_)
|
||||||
{
|
{
|
||||||
pipeline.processors.setCollectedProcessors(&processors);
|
pipeline.setCollectedProcessors(&processors);
|
||||||
}
|
}
|
||||||
|
|
||||||
QueryPipelineProcessorsCollector::~QueryPipelineProcessorsCollector()
|
QueryPipelineProcessorsCollector::~QueryPipelineProcessorsCollector()
|
||||||
{
|
{
|
||||||
pipeline.processors.setCollectedProcessors(nullptr);
|
pipeline.setCollectedProcessors(nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
Processors QueryPipelineProcessorsCollector::detachProcessors(size_t group)
|
Processors QueryPipelineProcessorsCollector::detachProcessors(size_t group)
|
||||||
|
@ -28,7 +28,7 @@ public:
|
|||||||
~QueryPipeline() = default;
|
~QueryPipeline() = default;
|
||||||
QueryPipeline(QueryPipeline &&) = default;
|
QueryPipeline(QueryPipeline &&) = default;
|
||||||
QueryPipeline(const QueryPipeline &) = delete;
|
QueryPipeline(const QueryPipeline &) = delete;
|
||||||
QueryPipeline & operator= (QueryPipeline && rhs);
|
QueryPipeline & operator= (QueryPipeline && rhs) = default;
|
||||||
QueryPipeline & operator= (const QueryPipeline & rhs) = delete;
|
QueryPipeline & operator= (const QueryPipeline & rhs) = delete;
|
||||||
|
|
||||||
/// All pipes must have same header.
|
/// All pipes must have same header.
|
||||||
@ -73,7 +73,11 @@ public:
|
|||||||
|
|
||||||
/// Unite several pipelines together. Result pipeline would have common_header structure.
|
/// Unite several pipelines together. Result pipeline would have common_header structure.
|
||||||
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
|
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
|
||||||
void unitePipelines(std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header, size_t max_threads_limit = 0);
|
static QueryPipeline unitePipelines(
|
||||||
|
std::vector<std::unique_ptr<QueryPipeline>> pipelines,
|
||||||
|
const Block & common_header,
|
||||||
|
size_t max_threads_limit = 0,
|
||||||
|
Processors * collected_processors = nullptr);
|
||||||
|
|
||||||
PipelineExecutorPtr execute();
|
PipelineExecutorPtr execute();
|
||||||
|
|
||||||
@ -113,10 +117,9 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Convert query pipeline to pipe.
|
/// Convert query pipeline to pipe.
|
||||||
Pipe getPipe() &&;
|
static Pipe getPipe(QueryPipeline pipeline) { return std::move(pipeline.pipe); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// Destruction order: processors, header, locks, temporary storages, local contexts
|
|
||||||
|
|
||||||
Pipe pipe;
|
Pipe pipe;
|
||||||
IOutputFormat * output_format = nullptr;
|
IOutputFormat * output_format = nullptr;
|
||||||
@ -132,6 +135,8 @@ private:
|
|||||||
|
|
||||||
void initRowsBeforeLimit();
|
void initRowsBeforeLimit();
|
||||||
|
|
||||||
|
void setCollectedProcessors(Processors * processors);
|
||||||
|
|
||||||
friend class QueryPipelineProcessorsCollector;
|
friend class QueryPipelineProcessorsCollector;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user