ClickHouse/src/Processors/QueryPipeline.cpp

621 lines
20 KiB
C++
Raw Normal View History

2019-03-26 18:28:37 +00:00
#include <Processors/QueryPipeline.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/NullSink.h>
2019-04-08 14:55:20 +00:00
#include <Processors/LimitTransform.h>
2019-03-26 18:28:37 +00:00
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
2019-04-05 11:34:11 +00:00
#include <Processors/Transforms/ExtremesTransform.h>
2019-03-26 18:28:37 +00:00
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <Processors/Transforms/ConvertingTransform.h>
2020-08-04 13:06:59 +00:00
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
2019-03-26 18:28:37 +00:00
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
2019-03-26 18:28:37 +00:00
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentThread.h>
#include <Processors/DelayedPortsProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h>
2020-06-03 19:50:11 +00:00
#include <Processors/Sources/RemoteSource.h>
2019-03-26 18:28:37 +00:00
namespace DB
{
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
2019-03-26 18:28:37 +00:00
void QueryPipeline::checkInitialized()
{
if (!initialized())
throw Exception("QueryPipeline wasn't initialized.", ErrorCodes::LOGICAL_ERROR);
}
2020-05-27 18:20:26 +00:00
void QueryPipeline::checkInitializedAndNotCompleted()
{
checkInitialized();
2020-08-04 13:06:59 +00:00
if (pipe.isCompleted())
2020-05-27 18:20:26 +00:00
throw Exception("QueryPipeline was already completed.", ErrorCodes::LOGICAL_ERROR);
}
2020-08-04 13:06:59 +00:00
static void checkSource(const ProcessorPtr & source, bool can_have_totals)
2019-03-26 18:28:37 +00:00
{
if (!source->getInputs().empty())
throw Exception("Source for query pipeline shouldn't have any input, but " + source->getName() + " has " +
toString(source->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
2019-04-17 14:38:16 +00:00
if (source->getOutputs().empty())
throw Exception("Source for query pipeline should have single output, but it doesn't have any",
ErrorCodes::LOGICAL_ERROR);
if (!can_have_totals && source->getOutputs().size() != 1)
2019-03-26 18:28:37 +00:00
throw Exception("Source for query pipeline should have single output, but " + source->getName() + " has " +
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
2019-04-17 14:38:16 +00:00
if (source->getOutputs().size() > 2)
throw Exception("Source for query pipeline should have 1 or 2 outputs, but " + source->getName() + " has " +
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
2019-03-26 18:28:37 +00:00
}
2020-08-04 13:06:59 +00:00
void QueryPipeline::init(Pipe pipe_)
2019-03-26 18:28:37 +00:00
{
if (initialized())
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
2020-08-04 13:06:59 +00:00
if (pipe.empty())
throw Exception("Can't initialize pipeline with empty pipe.", ErrorCodes::LOGICAL_ERROR);
2019-04-09 10:17:25 +00:00
2020-08-04 13:06:59 +00:00
pipe = std::move(pipe_);
2019-04-09 10:17:25 +00:00
}
2020-08-04 13:06:59 +00:00
void QueryPipeline::addSimpleTransform(const Pipe::ProcessorGetter & getter)
2019-03-26 18:28:37 +00:00
{
2020-05-27 18:20:26 +00:00
checkInitializedAndNotCompleted();
2020-08-04 13:06:59 +00:00
pipe.addSimpleTransform(getter);
2019-03-26 18:28:37 +00:00
}
2020-08-04 13:06:59 +00:00
void QueryPipeline::addSimpleTransform(const Pipe::ProcessorGetterWithStreamKind & getter)
2019-04-09 10:17:25 +00:00
{
2020-08-04 13:06:59 +00:00
checkInitializedAndNotCompleted();
pipe.addSimpleTransform(getter);
2019-04-09 10:17:25 +00:00
}
2020-08-04 13:06:59 +00:00
void QueryPipeline::addTransform(ProcessorPtr transform)
2020-05-27 18:20:26 +00:00
{
checkInitializedAndNotCompleted();
2020-08-04 13:06:59 +00:00
pipe.addTransform(std::move(transform));
2020-05-27 18:20:26 +00:00
}
2020-08-04 13:06:59 +00:00
void QueryPipeline::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
2019-03-26 18:28:37 +00:00
{
2020-05-27 18:20:26 +00:00
checkInitializedAndNotCompleted();
2020-08-04 13:06:59 +00:00
pipe.setSinks(getter);
2019-03-26 18:28:37 +00:00
}
void QueryPipeline::addDelayedStream(ProcessorPtr source)
{
2020-05-27 18:20:26 +00:00
checkInitializedAndNotCompleted();
2019-03-26 18:28:37 +00:00
2019-04-17 14:58:34 +00:00
checkSource(source, false);
2020-08-04 13:06:59 +00:00
assertBlocksHaveEqualStructure(getHeader(), source->getOutputs().front().getHeader(), "QueryPipeline");
2019-03-26 18:28:37 +00:00
2020-08-04 13:06:59 +00:00
IProcessor::PortNumbers delayed_streams = { pipe.numOutputPorts() };
pipe.addSource(std::move(source));
2019-04-08 09:31:49 +00:00
2020-08-04 13:06:59 +00:00
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), pipe.numOutputPorts(), delayed_streams);
addTransform(std::move(processor));
}
void QueryPipeline::addMergingAggregatedMemoryEfficientTransform(AggregatingTransformParamsPtr params, size_t num_merging_processors)
{
DB::addMergingAggregatedMemoryEfficientTransform(pipe, std::move(params), num_merging_processors);
2019-03-26 18:28:37 +00:00
}
2020-01-13 12:04:02 +00:00
void QueryPipeline::resize(size_t num_streams, bool force, bool strict)
2019-03-26 18:28:37 +00:00
{
2020-05-27 18:20:26 +00:00
checkInitializedAndNotCompleted();
2019-03-26 18:28:37 +00:00
if (!force && num_streams == getNumStreams())
2019-03-26 18:28:37 +00:00
return;
2020-01-13 12:04:02 +00:00
ProcessorPtr resize;
if (strict)
2020-08-04 13:06:59 +00:00
resize = std::make_shared<StrictResizeProcessor>(getHeader(), getNumStreams(), num_streams);
2020-01-13 12:04:02 +00:00
else
2020-08-04 13:06:59 +00:00
resize = std::make_shared<ResizeProcessor>(getHeader(), getNumStreams(), num_streams);
2019-03-26 18:28:37 +00:00
2020-08-04 13:06:59 +00:00
pipe.addTransform(std::move(resize));
2019-12-26 16:52:15 +00:00
}
2019-03-26 18:28:37 +00:00
void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform)
{
2020-05-27 18:20:26 +00:00
checkInitializedAndNotCompleted();
2019-03-26 18:28:37 +00:00
if (!typeid_cast<const TotalsHavingTransform *>(transform.get()))
throw Exception("TotalsHavingTransform expected for QueryPipeline::addTotalsHavingTransform.",
ErrorCodes::LOGICAL_ERROR);
2020-08-04 13:06:59 +00:00
if (pipe.getTotalsPort())
2019-03-26 18:28:37 +00:00
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
resize(1);
2020-08-04 13:06:59 +00:00
auto * totals_port = &transform->getOutputs().back();
pipe.addTransform(std::move(transform));
pipe.totals_port = totals_port;
2019-03-26 18:28:37 +00:00
}
2019-04-09 14:51:38 +00:00
void QueryPipeline::addDefaultTotals()
{
2020-05-27 18:20:26 +00:00
checkInitializedAndNotCompleted();
2019-04-09 14:51:38 +00:00
2020-08-04 13:06:59 +00:00
if (pipe.getTotalsPort())
2019-04-09 14:51:38 +00:00
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
2020-08-04 13:06:59 +00:00
const auto & current_header = getHeader();
2019-04-09 14:51:38 +00:00
Columns columns;
columns.reserve(current_header.columns());
for (size_t i = 0; i < current_header.columns(); ++i)
{
auto column = current_header.getByPosition(i).type->createColumn();
column->insertDefault();
columns.emplace_back(std::move(column));
}
auto source = std::make_shared<SourceFromSingleChunk>(current_header, Chunk(std::move(columns), 1));
2020-08-04 13:06:59 +00:00
pipe.addTotalsSource(std::move(source));
2019-04-10 16:28:37 +00:00
}
void QueryPipeline::dropTotalsAndExtremes()
2019-04-17 15:35:22 +00:00
{
auto drop_port = [&](OutputPort *& port)
2019-04-17 15:35:22 +00:00
{
auto null_sink = std::make_shared<NullSink>(port->getHeader());
connect(*port, null_sink->getPort());
2020-06-25 09:39:17 +00:00
processors.emplace(std::move(null_sink));
port = nullptr;
};
if (totals_having_port)
drop_port(totals_having_port);
if (extremes_port)
drop_port(extremes_port);
2019-04-17 15:35:22 +00:00
}
2020-04-08 12:40:04 +00:00
void QueryPipeline::addExtremesTransform()
2019-03-26 18:28:37 +00:00
{
2020-05-27 18:20:26 +00:00
checkInitializedAndNotCompleted();
2019-03-26 18:28:37 +00:00
2019-04-08 09:31:49 +00:00
if (extremes_port)
2019-03-26 18:28:37 +00:00
throw Exception("Extremes transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
2020-04-08 12:40:04 +00:00
std::vector<OutputPort *> extremes;
extremes.reserve(streams.size());
2019-03-26 18:28:37 +00:00
2020-04-08 12:40:04 +00:00
for (auto & stream : streams)
{
auto transform = std::make_shared<ExtremesTransform>(current_header);
connect(*stream, transform->getInputPort());
2019-03-26 18:28:37 +00:00
2020-04-08 12:40:04 +00:00
stream = &transform->getOutputPort();
extremes.push_back(&transform->getExtremesPort());
2019-03-26 18:28:37 +00:00
2020-06-25 09:39:17 +00:00
processors.emplace(std::move(transform));
2020-04-08 12:40:04 +00:00
}
if (extremes.size() == 1)
extremes_port = extremes.front();
else
extremes_port = uniteExtremes(extremes, current_header, processors);
2019-03-26 18:28:37 +00:00
}
void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform)
{
2020-05-27 18:20:26 +00:00
checkInitializedAndNotCompleted();
2019-03-26 18:28:37 +00:00
if (!typeid_cast<const CreatingSetsTransform *>(transform.get()))
throw Exception("CreatingSetsTransform expected for QueryPipeline::addExtremesTransform.",
ErrorCodes::LOGICAL_ERROR);
resize(1);
auto concat = std::make_shared<ConcatProcessor>(current_header, 2);
connect(transform->getOutputs().front(), concat->getInputs().front());
connect(*streams.back(), concat->getInputs().back());
streams.assign({ &concat->getOutputs().front() });
2020-06-25 09:39:17 +00:00
processors.emplace(std::move(transform));
processors.emplace(std::move(concat));
2019-03-26 18:28:37 +00:00
}
2020-05-27 18:20:26 +00:00
void QueryPipeline::setOutputFormat(ProcessorPtr output)
2019-03-26 18:28:37 +00:00
{
2020-05-27 18:20:26 +00:00
checkInitializedAndNotCompleted();
2019-03-26 18:28:37 +00:00
2019-04-05 10:52:07 +00:00
auto * format = dynamic_cast<IOutputFormat * >(output.get());
2019-03-26 18:28:37 +00:00
if (!format)
2020-05-27 18:20:26 +00:00
throw Exception("IOutputFormat processor expected for QueryPipeline::setOutputFormat.", ErrorCodes::LOGICAL_ERROR);
2019-03-26 18:28:37 +00:00
2019-04-08 14:55:20 +00:00
if (output_format)
2019-03-26 18:28:37 +00:00
throw Exception("QueryPipeline already has output.", ErrorCodes::LOGICAL_ERROR);
2019-04-08 14:55:20 +00:00
output_format = format;
2019-03-26 18:28:37 +00:00
resize(1);
auto & main = format->getPort(IOutputFormat::PortKind::Main);
auto & totals = format->getPort(IOutputFormat::PortKind::Totals);
auto & extremes = format->getPort(IOutputFormat::PortKind::Extremes);
2019-04-08 09:31:49 +00:00
if (!totals_having_port)
2019-03-26 18:28:37 +00:00
{
auto null_source = std::make_shared<NullSource>(totals.getHeader());
totals_having_port = &null_source->getPort();
2020-06-25 09:39:17 +00:00
processors.emplace(std::move(null_source));
2019-03-26 18:28:37 +00:00
}
2019-04-08 09:31:49 +00:00
if (!extremes_port)
2019-03-26 18:28:37 +00:00
{
auto null_source = std::make_shared<NullSource>(extremes.getHeader());
extremes_port = &null_source->getPort();
2020-06-25 09:39:17 +00:00
processors.emplace(std::move(null_source));
2019-03-26 18:28:37 +00:00
}
2020-06-25 09:39:17 +00:00
processors.emplace(std::move(output));
2019-04-05 10:52:07 +00:00
2019-03-26 18:28:37 +00:00
connect(*streams.front(), main);
connect(*totals_having_port, totals);
connect(*extremes_port, extremes);
2020-05-27 18:20:26 +00:00
streams.clear();
current_header.clear();
extremes_port = nullptr;
totals_having_port = nullptr;
initRowsBeforeLimit();
2019-03-26 18:28:37 +00:00
}
2019-04-09 13:07:07 +00:00
void QueryPipeline::unitePipelines(
2020-07-03 13:38:35 +00:00
std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header, size_t max_threads_limit)
2019-03-26 18:28:37 +00:00
{
/// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0.
/// If true, result max_threads will be sum(max_threads).
/// Note: it may be > than settings.max_threads, so we should apply this limit again.
2020-07-03 13:38:35 +00:00
bool will_limit_max_threads = !initialized() || max_threads != 0;
2020-05-27 18:20:26 +00:00
if (initialized())
2019-04-09 13:07:07 +00:00
{
2020-05-27 18:20:26 +00:00
addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
});
}
2019-04-09 13:07:07 +00:00
2019-03-26 18:28:37 +00:00
std::vector<OutputPort *> extremes;
2020-04-08 12:40:04 +00:00
std::vector<OutputPort *> totals;
if (extremes_port)
extremes.push_back(extremes_port);
if (totals_having_port)
totals.push_back(totals_having_port);
2019-03-26 18:28:37 +00:00
for (auto & pipeline_ptr : pipelines)
2019-03-26 18:28:37 +00:00
{
auto & pipeline = *pipeline_ptr;
2019-03-26 18:28:37 +00:00
pipeline.checkInitialized();
2020-06-25 09:39:17 +00:00
pipeline.processors.setCollectedProcessors(processors.getCollectedProcessors());
2019-03-26 18:28:37 +00:00
2020-05-27 18:20:26 +00:00
if (!pipeline.isCompleted())
2019-04-05 11:34:11 +00:00
{
2020-05-27 18:20:26 +00:00
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
});
}
2019-03-26 18:28:37 +00:00
if (pipeline.extremes_port)
{
auto converting = std::make_shared<ConvertingTransform>(
2020-04-14 21:05:45 +00:00
pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position);
2019-03-26 18:28:37 +00:00
connect(*pipeline.extremes_port, converting->getInputPort());
extremes.push_back(&converting->getOutputPort());
2020-06-25 09:39:17 +00:00
processors.emplace(std::move(converting));
2019-03-26 18:28:37 +00:00
}
/// Take totals only from first port.
if (pipeline.totals_having_port)
{
2020-04-08 12:40:04 +00:00
auto converting = std::make_shared<ConvertingTransform>(
2020-04-14 21:05:45 +00:00
pipeline.current_header, common_header, ConvertingTransform::MatchColumnsMode::Position);
2019-03-26 18:28:37 +00:00
2020-04-08 12:40:04 +00:00
connect(*pipeline.totals_having_port, converting->getInputPort());
totals.push_back(&converting->getOutputPort());
2020-06-25 09:39:17 +00:00
processors.emplace(std::move(converting));
2019-03-26 18:28:37 +00:00
}
2020-06-25 09:39:17 +00:00
auto * collector = processors.setCollectedProcessors(nullptr);
processors.emplace(pipeline.processors.detach());
processors.setCollectedProcessors(collector);
streams.addStreams(pipeline.streams);
2019-11-21 15:37:59 +00:00
table_locks.insert(table_locks.end(), std::make_move_iterator(pipeline.table_locks.begin()), std::make_move_iterator(pipeline.table_locks.end()));
interpreter_context.insert(interpreter_context.end(), pipeline.interpreter_context.begin(), pipeline.interpreter_context.end());
storage_holders.insert(storage_holders.end(), pipeline.storage_holders.begin(), pipeline.storage_holders.end());
2020-01-01 10:42:46 +00:00
max_threads += pipeline.max_threads;
2020-07-03 13:38:35 +00:00
will_limit_max_threads = will_limit_max_threads && pipeline.max_threads != 0;
/// If one of pipelines uses more threads then current limit, will keep it.
/// It may happen if max_distributed_connections > max_threads
if (pipeline.max_threads > max_threads_limit)
max_threads_limit = pipeline.max_threads;
2019-03-26 18:28:37 +00:00
}
2020-07-03 13:38:35 +00:00
if (!will_limit_max_threads)
max_threads = 0;
2020-07-03 13:38:35 +00:00
else
2020-07-07 06:19:03 +00:00
limitMaxThreads(max_threads_limit);
2019-03-26 18:28:37 +00:00
if (!extremes.empty())
{
2020-04-08 12:40:04 +00:00
if (extremes.size() == 1)
extremes_port = extremes.back();
2019-03-26 18:28:37 +00:00
else
extremes_port = uniteExtremes(extremes, common_header, processors);
2020-04-08 12:40:04 +00:00
}
2019-03-26 18:28:37 +00:00
2020-04-08 12:40:04 +00:00
if (!totals.empty())
{
if (totals.size() == 1)
totals_having_port = totals.back();
else
totals_having_port = uniteTotals(totals, common_header, processors);
2019-03-26 18:28:37 +00:00
}
2020-06-18 19:42:28 +00:00
current_header = common_header;
2019-03-26 18:28:37 +00:00
}
void QueryPipeline::setProgressCallback(const ProgressCallback & callback)
{
2020-06-25 09:39:17 +00:00
for (auto & processor : processors.get())
2019-04-10 12:38:57 +00:00
{
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
2019-10-04 17:46:48 +00:00
source->setProgressCallback(callback);
2019-04-10 12:38:57 +00:00
if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get()))
source->setProgressCallback(callback);
}
2019-03-26 18:28:37 +00:00
}
void QueryPipeline::setProcessListElement(QueryStatus * elem)
{
process_list_element = elem;
2020-06-25 09:39:17 +00:00
for (auto & processor : processors.get())
2019-04-10 12:38:57 +00:00
{
2019-10-04 17:46:48 +00:00
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
source->setProcessListElement(elem);
2019-04-10 12:38:57 +00:00
if (auto * source = typeid_cast<CreatingSetsTransform *>(processor.get()))
source->setProcessListElement(elem);
}
2019-03-26 18:28:37 +00:00
}
void QueryPipeline::initRowsBeforeLimit()
2019-04-08 14:55:20 +00:00
{
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
2019-04-29 16:52:50 +00:00
2020-06-03 19:50:11 +00:00
/// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor.
std::vector<LimitTransform *> limits;
std::vector<SourceFromInputStream *> sources;
2020-06-03 19:50:11 +00:00
std::vector<RemoteSource *> remote_sources;
2019-04-08 14:55:20 +00:00
2019-04-29 16:52:50 +00:00
std::unordered_set<IProcessor *> visited;
struct QueuedEntry
2019-04-08 14:55:20 +00:00
{
2019-04-29 16:52:50 +00:00
IProcessor * processor;
bool visited_limit;
};
2019-04-12 15:20:24 +00:00
2019-04-29 16:52:50 +00:00
std::queue<QueuedEntry> queue;
queue.push({ output_format, false });
visited.emplace(output_format);
while (!queue.empty())
{
2020-04-22 06:34:20 +00:00
auto * processor = queue.front().processor;
2019-04-29 16:52:50 +00:00
auto visited_limit = queue.front().visited_limit;
queue.pop();
if (!visited_limit)
2019-04-12 15:20:24 +00:00
{
if (auto * limit = typeid_cast<LimitTransform *>(processor))
2019-04-12 15:20:24 +00:00
{
visited_limit = true;
limits.emplace_back(limit);
2019-04-12 15:20:24 +00:00
}
2019-04-08 14:55:20 +00:00
2019-04-29 16:52:50 +00:00
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
sources.emplace_back(source);
2020-06-03 19:50:11 +00:00
if (auto * source = typeid_cast<RemoteSource *>(processor))
remote_sources.emplace_back(source);
2019-04-29 16:52:50 +00:00
}
2020-03-19 17:18:33 +00:00
else if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
2019-04-08 14:55:20 +00:00
{
if (!rows_before_limit_at_least)
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
sorting->setRowsBeforeLimitCounter(rows_before_limit_at_least);
2019-04-29 16:52:50 +00:00
/// Don't go to children. Take rows_before_limit from last PartialSortingTransform.
2020-03-19 17:18:33 +00:00
continue;
2019-04-29 16:52:50 +00:00
}
2019-05-13 12:08:02 +00:00
/// Skip totals and extremes port for output format.
2019-05-13 13:04:52 +00:00
if (auto * format = dynamic_cast<IOutputFormat *>(processor))
2019-05-13 12:08:02 +00:00
{
auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor();
if (visited.emplace(child_processor).second)
queue.push({ child_processor, visited_limit });
continue;
}
2019-04-29 16:52:50 +00:00
for (auto & child_port : processor->getInputs())
{
auto * child_processor = &child_port.getOutputPort().getProcessor();
if (visited.emplace(child_processor).second)
queue.push({ child_processor, visited_limit });
2019-04-08 14:55:20 +00:00
}
}
2020-06-03 19:50:11 +00:00
if (!rows_before_limit_at_least && (!limits.empty() || !sources.empty() || !remote_sources.empty()))
{
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
for (auto & limit : limits)
limit->setRowsBeforeLimitCounter(rows_before_limit_at_least);
for (auto & source : sources)
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
2020-06-03 19:50:11 +00:00
for (auto & source : remote_sources)
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
}
2020-03-19 17:18:33 +00:00
/// If there is a limit, then enable rows_before_limit_at_least
/// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result.
if (!limits.empty())
rows_before_limit_at_least->add(0);
if (rows_before_limit_at_least)
output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least);
2019-04-08 14:55:20 +00:00
}
2019-03-26 18:28:37 +00:00
Pipe QueryPipeline::getPipe() &&
{
2020-08-03 11:33:11 +00:00
Pipes pipes(processors.detach(), streams.at(0), totals_having_port, extremes_port);
pipe.max_parallel_streams = streams.maxParallelStreams();
for (auto & lock : table_locks)
pipe.addTableLock(lock);
for (auto & context : interpreter_context)
pipe.addInterpreterContext(context);
for (auto & storage : storage_holders)
pipe.addStorageHolder(storage);
2020-02-03 14:28:30 +00:00
if (totals_having_port)
pipe.setTotalsPort(totals_having_port);
2020-04-08 12:40:04 +00:00
if (extremes_port)
pipe.setExtremesPort(extremes_port);
Pipes pipes;
pipes.emplace_back(std::move(pipe));
for (size_t i = 1; i < streams.size(); ++i)
pipes.emplace_back(Pipe(streams[i]));
return pipes;
}
PipelineExecutorPtr QueryPipeline::execute()
2019-03-26 18:28:37 +00:00
{
2020-05-27 18:20:26 +00:00
if (!isCompleted())
throw Exception("Cannot execute pipeline because it is not completed.", ErrorCodes::LOGICAL_ERROR);
2019-03-26 18:28:37 +00:00
2020-06-25 09:39:17 +00:00
return std::make_shared<PipelineExecutor>(processors.get(), process_list_element);
2019-03-26 18:28:37 +00:00
}
2020-02-27 15:40:11 +00:00
QueryPipeline & QueryPipeline::operator= (QueryPipeline && rhs)
{
/// Reset primitive fields
process_list_element = rhs.process_list_element;
rhs.process_list_element = nullptr;
max_threads = rhs.max_threads;
rhs.max_threads = 0;
output_format = rhs.output_format;
rhs.output_format = nullptr;
extremes_port = rhs.extremes_port;
rhs.extremes_port = nullptr;
totals_having_port = rhs.totals_having_port;
rhs.totals_having_port = nullptr;
/// Move these fields in destruction order (it's important)
streams = std::move(rhs.streams);
processors = std::move(rhs.processors);
current_header = std::move(rhs.current_header);
table_locks = std::move(rhs.table_locks);
storage_holders = std::move(rhs.storage_holders);
interpreter_context = std::move(rhs.interpreter_context);
return *this;
}
2020-06-25 09:39:17 +00:00
void QueryPipeline::ProcessorsContainer::emplace(ProcessorPtr processor)
{
if (collected_processors)
collected_processors->emplace_back(processor);
processors.emplace_back(std::move(processor));
}
void QueryPipeline::ProcessorsContainer::emplace(Processors processors_)
{
for (auto & processor : processors_)
emplace(std::move(processor));
}
Processors * QueryPipeline::ProcessorsContainer::setCollectedProcessors(Processors * collected_processors_)
{
if (collected_processors && collected_processors_)
throw Exception("Cannot set collected processors to QueryPipeline because "
"another one object was already created for current pipeline." , ErrorCodes::LOGICAL_ERROR);
std::swap(collected_processors, collected_processors_);
return collected_processors_;
}
QueryPipelineProcessorsCollector::QueryPipelineProcessorsCollector(QueryPipeline & pipeline_, IQueryPlanStep * step_)
: pipeline(pipeline_), step(step_)
{
pipeline.processors.setCollectedProcessors(&processors);
}
QueryPipelineProcessorsCollector::~QueryPipelineProcessorsCollector()
{
pipeline.processors.setCollectedProcessors(nullptr);
}
Processors QueryPipelineProcessorsCollector::detachProcessors(size_t group)
{
for (auto & processor : processors)
processor->setQueryPlanStep(step, group);
2020-06-26 17:56:33 +00:00
Processors res;
res.swap(processors);
2020-06-25 09:39:17 +00:00
return res;
}
2019-03-26 18:28:37 +00:00
}