2021-09-14 16:28:41 +00:00
|
|
|
#include <Processors/QueryPipelineBuilder.h>
|
2019-03-26 18:28:37 +00:00
|
|
|
|
|
|
|
#include <Processors/ResizeProcessor.h>
|
2019-04-08 14:55:20 +00:00
|
|
|
#include <Processors/LimitTransform.h>
|
2019-03-26 18:28:37 +00:00
|
|
|
#include <Processors/Transforms/TotalsHavingTransform.h>
|
2019-04-05 11:34:11 +00:00
|
|
|
#include <Processors/Transforms/ExtremesTransform.h>
|
2019-03-26 18:28:37 +00:00
|
|
|
#include <Processors/Transforms/CreatingSetsTransform.h>
|
2020-11-17 17:16:55 +00:00
|
|
|
#include <Processors/Transforms/ExpressionTransform.h>
|
2020-08-04 13:06:59 +00:00
|
|
|
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
|
2021-04-28 17:32:12 +00:00
|
|
|
#include <Processors/Transforms/JoiningTransform.h>
|
2019-03-26 18:28:37 +00:00
|
|
|
#include <Processors/Formats/IOutputFormat.h>
|
|
|
|
#include <Processors/Sources/SourceFromInputStream.h>
|
|
|
|
#include <Processors/Executors/PipelineExecutor.h>
|
2019-04-18 14:55:43 +00:00
|
|
|
#include <Processors/Transforms/PartialSortingTransform.h>
|
|
|
|
#include <Processors/Sources/SourceFromSingleChunk.h>
|
2019-03-26 18:28:37 +00:00
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
#include <Interpreters/Context.h>
|
2020-11-17 17:16:55 +00:00
|
|
|
#include <Interpreters/ExpressionActions.h>
|
2019-03-26 18:28:37 +00:00
|
|
|
#include <Common/typeid_cast.h>
|
2019-04-18 14:55:43 +00:00
|
|
|
#include <Common/CurrentThread.h>
|
2020-01-09 11:52:34 +00:00
|
|
|
#include <Processors/DelayedPortsProcessor.h>
|
2020-03-19 11:45:52 +00:00
|
|
|
#include <Processors/RowsBeforeLimitCounter.h>
|
2020-06-03 19:50:11 +00:00
|
|
|
#include <Processors/Sources/RemoteSource.h>
|
2021-09-08 18:29:38 +00:00
|
|
|
#include <Processors/QueryPlan/QueryPlan.h>
|
2019-03-26 18:28:37 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
2020-02-25 18:10:48 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
}
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addQueryPlan(std::unique_ptr<QueryPlan> plan)
|
2021-09-08 18:29:38 +00:00
|
|
|
{
|
|
|
|
pipe.addQueryPlan(std::move(plan));
|
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::checkInitialized()
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
|
|
|
if (!initialized())
|
|
|
|
throw Exception("QueryPipeline wasn't initialized.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::checkInitializedAndNotCompleted()
|
2020-05-27 18:20:26 +00:00
|
|
|
{
|
|
|
|
checkInitialized();
|
|
|
|
|
2020-08-04 13:06:59 +00:00
|
|
|
if (pipe.isCompleted())
|
2020-05-27 18:20:26 +00:00
|
|
|
throw Exception("QueryPipeline was already completed.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
}
|
|
|
|
|
2020-08-04 13:06:59 +00:00
|
|
|
static void checkSource(const ProcessorPtr & source, bool can_have_totals)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
|
|
|
if (!source->getInputs().empty())
|
|
|
|
throw Exception("Source for query pipeline shouldn't have any input, but " + source->getName() + " has " +
|
|
|
|
toString(source->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2019-04-17 14:38:16 +00:00
|
|
|
if (source->getOutputs().empty())
|
|
|
|
throw Exception("Source for query pipeline should have single output, but it doesn't have any",
|
|
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
if (!can_have_totals && source->getOutputs().size() != 1)
|
2019-03-26 18:28:37 +00:00
|
|
|
throw Exception("Source for query pipeline should have single output, but " + source->getName() + " has " +
|
|
|
|
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
|
2019-04-17 14:38:16 +00:00
|
|
|
|
|
|
|
if (source->getOutputs().size() > 2)
|
|
|
|
throw Exception("Source for query pipeline should have 1 or 2 outputs, but " + source->getName() + " has " +
|
|
|
|
toString(source->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::init(Pipe pipe_)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
|
|
|
if (initialized())
|
|
|
|
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2020-08-06 12:24:05 +00:00
|
|
|
if (pipe_.empty())
|
2020-08-04 13:06:59 +00:00
|
|
|
throw Exception("Can't initialize pipeline with empty pipe.", ErrorCodes::LOGICAL_ERROR);
|
2019-04-09 10:17:25 +00:00
|
|
|
|
2020-08-04 13:06:59 +00:00
|
|
|
pipe = std::move(pipe_);
|
2019-04-09 10:17:25 +00:00
|
|
|
}
|
|
|
|
|
2021-09-16 17:40:42 +00:00
|
|
|
void QueryPipelineBuilder::init(QueryPipeline pipeline)
|
|
|
|
{
|
|
|
|
if (initialized())
|
|
|
|
throw Exception("Pipeline has already been initialized.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
if (!pipeline.pulling())
|
|
|
|
throw Exception("Can't initialize not pulling pipeline.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
pipe.holder = std::move(pipeline.resources);
|
|
|
|
pipe.processors = std::move(pipeline.processors);
|
|
|
|
pipe.output_ports = {pipeline.output};
|
2021-09-19 18:53:36 +00:00
|
|
|
pipe.header = pipeline.output->getHeader();
|
2021-09-16 17:40:42 +00:00
|
|
|
pipe.totals_port = pipeline.totals;
|
|
|
|
pipe.extremes_port = pipeline.extremes;
|
|
|
|
pipe.max_parallel_streams = pipeline.num_threads;
|
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::reset()
|
2020-08-06 12:24:05 +00:00
|
|
|
{
|
|
|
|
Pipe pipe_to_destroy(std::move(pipe));
|
2021-09-14 16:28:41 +00:00
|
|
|
*this = QueryPipelineBuilder();
|
2020-08-06 12:24:05 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addSimpleTransform(const Pipe::ProcessorGetter & getter)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-05-27 18:20:26 +00:00
|
|
|
checkInitializedAndNotCompleted();
|
2020-08-04 13:06:59 +00:00
|
|
|
pipe.addSimpleTransform(getter);
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addSimpleTransform(const Pipe::ProcessorGetterWithStreamKind & getter)
|
2019-04-09 10:17:25 +00:00
|
|
|
{
|
2020-08-04 13:06:59 +00:00
|
|
|
checkInitializedAndNotCompleted();
|
|
|
|
pipe.addSimpleTransform(getter);
|
2019-04-09 10:17:25 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addTransform(ProcessorPtr transform)
|
2020-05-27 18:20:26 +00:00
|
|
|
{
|
|
|
|
checkInitializedAndNotCompleted();
|
2020-08-04 13:06:59 +00:00
|
|
|
pipe.addTransform(std::move(transform));
|
2020-05-27 18:20:26 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes)
|
2021-04-27 17:30:33 +00:00
|
|
|
{
|
|
|
|
checkInitializedAndNotCompleted();
|
|
|
|
pipe.addTransform(std::move(transform), totals, extremes);
|
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addChains(std::vector<Chain> chains)
|
2021-09-01 18:41:50 +00:00
|
|
|
{
|
|
|
|
checkInitializedAndNotCompleted();
|
|
|
|
pipe.addChains(std::move(chains));
|
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addChain(Chain chain)
|
2021-09-03 17:29:36 +00:00
|
|
|
{
|
|
|
|
checkInitializedAndNotCompleted();
|
|
|
|
std::vector<Chain> chains;
|
|
|
|
chains.emplace_back(std::move(chain));
|
|
|
|
pipe.resize(1);
|
|
|
|
pipe.addChains(std::move(chains));
|
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::transform(const Transformer & transformer)
|
2020-10-01 17:34:22 +00:00
|
|
|
{
|
|
|
|
checkInitializedAndNotCompleted();
|
|
|
|
pipe.transform(transformer);
|
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::setSinks(const Pipe::ProcessorGetterWithStreamKind & getter)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-05-27 18:20:26 +00:00
|
|
|
checkInitializedAndNotCompleted();
|
2020-08-04 13:06:59 +00:00
|
|
|
pipe.setSinks(getter);
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addDelayedStream(ProcessorPtr source)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-05-27 18:20:26 +00:00
|
|
|
checkInitializedAndNotCompleted();
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2019-04-17 14:58:34 +00:00
|
|
|
checkSource(source, false);
|
2020-08-04 13:06:59 +00:00
|
|
|
assertBlocksHaveEqualStructure(getHeader(), source->getOutputs().front().getHeader(), "QueryPipeline");
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2020-08-04 13:06:59 +00:00
|
|
|
IProcessor::PortNumbers delayed_streams = { pipe.numOutputPorts() };
|
|
|
|
pipe.addSource(std::move(source));
|
2019-04-08 09:31:49 +00:00
|
|
|
|
2020-08-04 13:06:59 +00:00
|
|
|
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), pipe.numOutputPorts(), delayed_streams);
|
|
|
|
addTransform(std::move(processor));
|
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addMergingAggregatedMemoryEfficientTransform(AggregatingTransformParamsPtr params, size_t num_merging_processors)
|
2020-08-04 13:06:59 +00:00
|
|
|
{
|
|
|
|
DB::addMergingAggregatedMemoryEfficientTransform(pipe, std::move(params), num_merging_processors);
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::resize(size_t num_streams, bool force, bool strict)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-05-27 18:20:26 +00:00
|
|
|
checkInitializedAndNotCompleted();
|
2020-10-12 09:30:05 +00:00
|
|
|
pipe.resize(num_streams, force, strict);
|
2019-12-26 16:52:15 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addTotalsHavingTransform(ProcessorPtr transform)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-05-27 18:20:26 +00:00
|
|
|
checkInitializedAndNotCompleted();
|
2019-03-26 18:28:37 +00:00
|
|
|
|
|
|
|
if (!typeid_cast<const TotalsHavingTransform *>(transform.get()))
|
|
|
|
throw Exception("TotalsHavingTransform expected for QueryPipeline::addTotalsHavingTransform.",
|
|
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2020-08-04 13:06:59 +00:00
|
|
|
if (pipe.getTotalsPort())
|
2019-03-26 18:28:37 +00:00
|
|
|
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
resize(1);
|
|
|
|
|
2020-08-04 13:06:59 +00:00
|
|
|
auto * totals_port = &transform->getOutputs().back();
|
2020-08-04 15:51:56 +00:00
|
|
|
pipe.addTransform(std::move(transform), totals_port, nullptr);
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addDefaultTotals()
|
2019-04-09 14:51:38 +00:00
|
|
|
{
|
2020-05-27 18:20:26 +00:00
|
|
|
checkInitializedAndNotCompleted();
|
2019-04-09 14:51:38 +00:00
|
|
|
|
2020-08-04 13:06:59 +00:00
|
|
|
if (pipe.getTotalsPort())
|
2019-04-09 14:51:38 +00:00
|
|
|
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2020-08-04 13:06:59 +00:00
|
|
|
const auto & current_header = getHeader();
|
2019-04-09 14:51:38 +00:00
|
|
|
Columns columns;
|
|
|
|
columns.reserve(current_header.columns());
|
|
|
|
|
|
|
|
for (size_t i = 0; i < current_header.columns(); ++i)
|
|
|
|
{
|
|
|
|
auto column = current_header.getByPosition(i).type->createColumn();
|
|
|
|
column->insertDefault();
|
|
|
|
columns.emplace_back(std::move(column));
|
|
|
|
}
|
|
|
|
|
|
|
|
auto source = std::make_shared<SourceFromSingleChunk>(current_header, Chunk(std::move(columns), 1));
|
2020-08-04 13:06:59 +00:00
|
|
|
pipe.addTotalsSource(std::move(source));
|
2019-04-10 16:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::dropTotalsAndExtremes()
|
2019-04-17 15:35:22 +00:00
|
|
|
{
|
2020-08-04 15:51:56 +00:00
|
|
|
pipe.dropTotals();
|
|
|
|
pipe.dropExtremes();
|
2019-04-17 15:35:22 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addExtremesTransform()
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-05-27 18:20:26 +00:00
|
|
|
checkInitializedAndNotCompleted();
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2021-01-22 10:44:24 +00:00
|
|
|
/// It is possible that pipeline already have extremes.
|
|
|
|
/// For example, it may be added from VIEW subquery.
|
|
|
|
/// In this case, recalculate extremes again - they should be calculated for different rows.
|
2020-08-04 15:51:56 +00:00
|
|
|
if (pipe.getExtremesPort())
|
2021-01-22 10:44:24 +00:00
|
|
|
pipe.dropExtremes();
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2020-08-04 15:51:56 +00:00
|
|
|
resize(1);
|
|
|
|
auto transform = std::make_shared<ExtremesTransform>(getHeader());
|
|
|
|
auto * port = &transform->getExtremesPort();
|
|
|
|
pipe.addTransform(std::move(transform), nullptr, port);
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::setOutputFormat(ProcessorPtr output)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-05-27 18:20:26 +00:00
|
|
|
checkInitializedAndNotCompleted();
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2019-04-08 14:55:20 +00:00
|
|
|
if (output_format)
|
2019-03-26 18:28:37 +00:00
|
|
|
throw Exception("QueryPipeline already has output.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
resize(1);
|
|
|
|
|
2020-08-04 15:51:56 +00:00
|
|
|
output_format = dynamic_cast<IOutputFormat * >(output.get());
|
|
|
|
pipe.setOutputFormat(std::move(output));
|
2020-05-27 18:20:26 +00:00
|
|
|
|
2020-03-19 11:45:52 +00:00
|
|
|
initRowsBeforeLimit();
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
QueryPipelineBuilder QueryPipelineBuilder::unitePipelines(
|
|
|
|
std::vector<std::unique_ptr<QueryPipelineBuilder>> pipelines,
|
2020-08-04 15:51:56 +00:00
|
|
|
size_t max_threads_limit,
|
|
|
|
Processors * collected_processors)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2021-03-25 09:57:14 +00:00
|
|
|
if (pipelines.empty())
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot unite an empty set of pipelines");
|
|
|
|
|
|
|
|
Block common_header = pipelines.front()->getHeader();
|
|
|
|
|
2020-07-03 08:10:52 +00:00
|
|
|
/// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0.
|
|
|
|
/// If true, result max_threads will be sum(max_threads).
|
|
|
|
/// Note: it may be > than settings.max_threads, so we should apply this limit again.
|
2020-08-04 15:51:56 +00:00
|
|
|
bool will_limit_max_threads = true;
|
|
|
|
size_t max_threads = 0;
|
|
|
|
Pipes pipes;
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2020-06-18 17:45:00 +00:00
|
|
|
for (auto & pipeline_ptr : pipelines)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-06-18 17:45:00 +00:00
|
|
|
auto & pipeline = *pipeline_ptr;
|
2019-03-26 18:28:37 +00:00
|
|
|
pipeline.checkInitialized();
|
2020-08-04 15:51:56 +00:00
|
|
|
pipeline.pipe.collected_processors = collected_processors;
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2020-08-04 15:51:56 +00:00
|
|
|
pipes.emplace_back(std::move(pipeline.pipe));
|
2020-01-01 10:42:46 +00:00
|
|
|
|
2020-07-03 08:10:52 +00:00
|
|
|
max_threads += pipeline.max_threads;
|
2020-07-03 13:38:35 +00:00
|
|
|
will_limit_max_threads = will_limit_max_threads && pipeline.max_threads != 0;
|
|
|
|
|
|
|
|
/// If one of pipelines uses more threads then current limit, will keep it.
|
|
|
|
/// It may happen if max_distributed_connections > max_threads
|
|
|
|
if (pipeline.max_threads > max_threads_limit)
|
|
|
|
max_threads_limit = pipeline.max_threads;
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
QueryPipelineBuilder pipeline;
|
2021-03-25 08:39:55 +00:00
|
|
|
pipeline.init(Pipe::unitePipes(std::move(pipes), collected_processors, false));
|
2020-07-03 08:10:52 +00:00
|
|
|
|
2020-08-04 15:51:56 +00:00
|
|
|
if (will_limit_max_threads)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-08-04 15:51:56 +00:00
|
|
|
pipeline.setMaxThreads(max_threads);
|
|
|
|
pipeline.limitMaxThreads(max_threads_limit);
|
2020-04-08 12:40:04 +00:00
|
|
|
}
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2020-08-04 15:51:56 +00:00
|
|
|
return pipeline;
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
|
|
|
|
std::unique_ptr<QueryPipelineBuilder> left,
|
|
|
|
std::unique_ptr<QueryPipelineBuilder> right,
|
2021-04-28 17:32:12 +00:00
|
|
|
JoinPtr join,
|
|
|
|
size_t max_block_size,
|
|
|
|
Processors * collected_processors)
|
|
|
|
{
|
|
|
|
left->checkInitializedAndNotCompleted();
|
|
|
|
right->checkInitializedAndNotCompleted();
|
|
|
|
|
2021-04-29 17:51:35 +00:00
|
|
|
/// Extremes before join are useless. They will be calculated after if needed.
|
2021-04-28 17:32:12 +00:00
|
|
|
left->pipe.dropExtremes();
|
|
|
|
right->pipe.dropExtremes();
|
|
|
|
|
|
|
|
left->pipe.collected_processors = collected_processors;
|
|
|
|
right->pipe.collected_processors = collected_processors;
|
|
|
|
|
|
|
|
/// In case joined subquery has totals, and we don't, add default chunk to totals.
|
|
|
|
bool default_totals = false;
|
|
|
|
if (!left->hasTotals() && right->hasTotals())
|
|
|
|
{
|
|
|
|
left->addDefaultTotals();
|
|
|
|
default_totals = true;
|
|
|
|
}
|
|
|
|
|
2021-04-30 13:06:17 +00:00
|
|
|
/// (left) ──────┐
|
|
|
|
/// ╞> Joining ─> (joined)
|
|
|
|
/// (left) ─┐┌───┘
|
|
|
|
/// └┼───┐
|
|
|
|
/// (right) ┐ (totals) ──┼─┐ ╞> Joining ─> (joined)
|
|
|
|
/// ╞> Resize ┐ ╓─┘┌┼─┘
|
|
|
|
/// (right) ┘ │ ╟──┘└─┐
|
|
|
|
/// ╞> FillingJoin ─> Resize ╣ ╞> Joining ─> (totals)
|
|
|
|
/// (totals) ─────────┘ ╙─────┘
|
2021-04-29 17:51:35 +00:00
|
|
|
|
2021-04-28 17:32:12 +00:00
|
|
|
size_t num_streams = left->getNumStreams();
|
|
|
|
right->resize(1);
|
|
|
|
|
2021-04-30 08:25:39 +00:00
|
|
|
auto adding_joined = std::make_shared<FillingRightJoinSideTransform>(right->getHeader(), join);
|
2021-04-28 17:32:12 +00:00
|
|
|
InputPort * totals_port = nullptr;
|
|
|
|
if (right->hasTotals())
|
2021-04-29 17:51:35 +00:00
|
|
|
totals_port = adding_joined->addTotalsPort();
|
2021-04-28 17:32:12 +00:00
|
|
|
|
|
|
|
right->addTransform(std::move(adding_joined), totals_port, nullptr);
|
|
|
|
|
|
|
|
size_t num_streams_including_totals = num_streams + (left->hasTotals() ? 1 : 0);
|
|
|
|
right->resize(num_streams_including_totals);
|
|
|
|
|
2021-04-29 17:51:35 +00:00
|
|
|
/// This counter is needed for every Joining except totals, to decide which Joining will generate non joined rows.
|
2021-04-28 17:32:12 +00:00
|
|
|
auto finish_counter = std::make_shared<JoiningTransform::FinishCounter>(num_streams);
|
|
|
|
|
|
|
|
auto lit = left->pipe.output_ports.begin();
|
|
|
|
auto rit = right->pipe.output_ports.begin();
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_streams; ++i)
|
|
|
|
{
|
|
|
|
auto joining = std::make_shared<JoiningTransform>(left->getHeader(), join, max_block_size, false, default_totals, finish_counter);
|
|
|
|
connect(**lit, joining->getInputs().front());
|
|
|
|
connect(**rit, joining->getInputs().back());
|
|
|
|
*lit = &joining->getOutputs().front();
|
|
|
|
|
|
|
|
++lit;
|
|
|
|
++rit;
|
|
|
|
|
|
|
|
if (collected_processors)
|
|
|
|
collected_processors->emplace_back(joining);
|
|
|
|
|
|
|
|
left->pipe.processors.emplace_back(std::move(joining));
|
|
|
|
}
|
|
|
|
|
|
|
|
if (left->hasTotals())
|
|
|
|
{
|
|
|
|
auto joining = std::make_shared<JoiningTransform>(left->getHeader(), join, max_block_size, true, default_totals);
|
|
|
|
connect(*left->pipe.totals_port, joining->getInputs().front());
|
|
|
|
connect(**rit, joining->getInputs().back());
|
|
|
|
left->pipe.totals_port = &joining->getOutputs().front();
|
|
|
|
|
|
|
|
++rit;
|
|
|
|
|
|
|
|
if (collected_processors)
|
|
|
|
collected_processors->emplace_back(joining);
|
|
|
|
|
|
|
|
left->pipe.processors.emplace_back(std::move(joining));
|
|
|
|
}
|
|
|
|
|
|
|
|
left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end());
|
|
|
|
left->pipe.holder = std::move(right->pipe.holder);
|
|
|
|
left->pipe.header = left->pipe.output_ports.front()->getHeader();
|
2021-07-07 17:44:30 +00:00
|
|
|
left->pipe.max_parallel_streams = std::max(left->pipe.max_parallel_streams, right->pipe.max_parallel_streams);
|
2021-04-28 17:32:12 +00:00
|
|
|
return left;
|
|
|
|
}
|
2020-09-16 16:11:16 +00:00
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addCreatingSetsTransform(const Block & res_header, SubqueryForSet subquery_for_set, const SizeLimits & limits, ContextPtr context)
|
2020-09-16 16:11:16 +00:00
|
|
|
{
|
2020-09-16 16:30:48 +00:00
|
|
|
resize(1);
|
2020-09-16 16:11:16 +00:00
|
|
|
|
|
|
|
auto transform = std::make_shared<CreatingSetsTransform>(
|
2020-09-16 16:45:56 +00:00
|
|
|
getHeader(),
|
2020-09-16 16:38:33 +00:00
|
|
|
res_header,
|
2020-09-16 16:11:16 +00:00
|
|
|
std::move(subquery_for_set),
|
2020-09-16 16:30:48 +00:00
|
|
|
limits,
|
2020-09-16 16:45:56 +00:00
|
|
|
context);
|
2020-09-16 16:11:16 +00:00
|
|
|
|
|
|
|
InputPort * totals_port = nullptr;
|
|
|
|
|
|
|
|
if (pipe.getTotalsPort())
|
|
|
|
totals_port = transform->addTotalsPort();
|
|
|
|
|
|
|
|
pipe.addTransform(std::move(transform), totals_port, nullptr);
|
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::addPipelineBefore(QueryPipelineBuilder pipeline)
|
2020-09-15 13:25:14 +00:00
|
|
|
{
|
2020-09-16 10:25:01 +00:00
|
|
|
checkInitializedAndNotCompleted();
|
2021-03-25 08:39:55 +00:00
|
|
|
if (pipeline.getHeader())
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for CreatingSets should have empty header. Got: {}",
|
|
|
|
pipeline.getHeader().dumpStructure());
|
2020-09-16 10:25:01 +00:00
|
|
|
|
|
|
|
IProcessor::PortNumbers delayed_streams(pipe.numOutputPorts());
|
|
|
|
for (size_t i = 0; i < delayed_streams.size(); ++i)
|
|
|
|
delayed_streams[i] = i;
|
2020-09-15 13:25:14 +00:00
|
|
|
|
|
|
|
auto * collected_processors = pipe.collected_processors;
|
|
|
|
|
|
|
|
Pipes pipes;
|
|
|
|
pipes.emplace_back(std::move(pipe));
|
2021-09-14 16:28:41 +00:00
|
|
|
pipes.emplace_back(QueryPipelineBuilder::getPipe(std::move(pipeline)));
|
2021-03-25 08:39:55 +00:00
|
|
|
pipe = Pipe::unitePipes(std::move(pipes), collected_processors, true);
|
2020-09-15 13:25:14 +00:00
|
|
|
|
2020-12-27 11:02:21 +00:00
|
|
|
auto processor = std::make_shared<DelayedPortsProcessor>(getHeader(), pipe.numOutputPorts(), delayed_streams, true);
|
2020-09-16 10:25:01 +00:00
|
|
|
addTransform(std::move(processor));
|
2020-09-15 13:25:14 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::setProgressCallback(const ProgressCallback & callback)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-08-04 15:51:56 +00:00
|
|
|
for (auto & processor : pipe.processors)
|
2019-04-10 12:38:57 +00:00
|
|
|
{
|
2019-10-09 09:33:16 +00:00
|
|
|
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
|
2019-10-04 17:46:48 +00:00
|
|
|
source->setProgressCallback(callback);
|
2019-04-10 12:38:57 +00:00
|
|
|
}
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::setProcessListElement(QueryStatus * elem)
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-01-28 12:59:34 +00:00
|
|
|
process_list_element = elem;
|
|
|
|
|
2020-08-04 15:51:56 +00:00
|
|
|
for (auto & processor : pipe.processors)
|
2019-04-10 12:38:57 +00:00
|
|
|
{
|
2019-10-04 17:46:48 +00:00
|
|
|
if (auto * source = dynamic_cast<ISourceWithProgress *>(processor.get()))
|
|
|
|
source->setProcessListElement(elem);
|
2019-04-10 12:38:57 +00:00
|
|
|
}
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::initRowsBeforeLimit()
|
2019-04-08 14:55:20 +00:00
|
|
|
{
|
2020-03-19 11:45:52 +00:00
|
|
|
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
|
2019-04-29 16:52:50 +00:00
|
|
|
|
2020-06-03 19:50:11 +00:00
|
|
|
/// TODO: add setRowsBeforeLimitCounter as virtual method to IProcessor.
|
2020-03-19 11:45:52 +00:00
|
|
|
std::vector<LimitTransform *> limits;
|
|
|
|
std::vector<SourceFromInputStream *> sources;
|
2020-06-03 19:50:11 +00:00
|
|
|
std::vector<RemoteSource *> remote_sources;
|
2019-04-08 14:55:20 +00:00
|
|
|
|
2019-04-29 16:52:50 +00:00
|
|
|
std::unordered_set<IProcessor *> visited;
|
|
|
|
|
|
|
|
struct QueuedEntry
|
2019-04-08 14:55:20 +00:00
|
|
|
{
|
2019-04-29 16:52:50 +00:00
|
|
|
IProcessor * processor;
|
|
|
|
bool visited_limit;
|
|
|
|
};
|
2019-04-12 15:20:24 +00:00
|
|
|
|
2019-04-29 16:52:50 +00:00
|
|
|
std::queue<QueuedEntry> queue;
|
|
|
|
|
|
|
|
queue.push({ output_format, false });
|
|
|
|
visited.emplace(output_format);
|
|
|
|
|
|
|
|
while (!queue.empty())
|
|
|
|
{
|
2020-04-22 06:34:20 +00:00
|
|
|
auto * processor = queue.front().processor;
|
2019-04-29 16:52:50 +00:00
|
|
|
auto visited_limit = queue.front().visited_limit;
|
|
|
|
queue.pop();
|
|
|
|
|
|
|
|
if (!visited_limit)
|
2019-04-12 15:20:24 +00:00
|
|
|
{
|
2020-03-19 11:45:52 +00:00
|
|
|
if (auto * limit = typeid_cast<LimitTransform *>(processor))
|
2019-04-12 15:20:24 +00:00
|
|
|
{
|
2020-03-19 11:45:52 +00:00
|
|
|
visited_limit = true;
|
|
|
|
limits.emplace_back(limit);
|
2019-04-12 15:20:24 +00:00
|
|
|
}
|
2019-04-08 14:55:20 +00:00
|
|
|
|
2019-04-29 16:52:50 +00:00
|
|
|
if (auto * source = typeid_cast<SourceFromInputStream *>(processor))
|
2020-03-19 11:45:52 +00:00
|
|
|
sources.emplace_back(source);
|
2020-06-03 19:50:11 +00:00
|
|
|
|
|
|
|
if (auto * source = typeid_cast<RemoteSource *>(processor))
|
|
|
|
remote_sources.emplace_back(source);
|
2019-04-29 16:52:50 +00:00
|
|
|
}
|
2020-03-19 17:18:33 +00:00
|
|
|
else if (auto * sorting = typeid_cast<PartialSortingTransform *>(processor))
|
2019-04-08 14:55:20 +00:00
|
|
|
{
|
2020-03-19 11:45:52 +00:00
|
|
|
if (!rows_before_limit_at_least)
|
|
|
|
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
|
|
|
|
|
|
|
|
sorting->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
2019-04-29 16:52:50 +00:00
|
|
|
|
|
|
|
/// Don't go to children. Take rows_before_limit from last PartialSortingTransform.
|
2020-03-19 17:18:33 +00:00
|
|
|
continue;
|
2019-04-29 16:52:50 +00:00
|
|
|
}
|
|
|
|
|
2019-05-13 12:08:02 +00:00
|
|
|
/// Skip totals and extremes port for output format.
|
2019-05-13 13:04:52 +00:00
|
|
|
if (auto * format = dynamic_cast<IOutputFormat *>(processor))
|
2019-05-13 12:08:02 +00:00
|
|
|
{
|
|
|
|
auto * child_processor = &format->getPort(IOutputFormat::PortKind::Main).getOutputPort().getProcessor();
|
|
|
|
if (visited.emplace(child_processor).second)
|
|
|
|
queue.push({ child_processor, visited_limit });
|
|
|
|
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
2019-04-29 16:52:50 +00:00
|
|
|
for (auto & child_port : processor->getInputs())
|
|
|
|
{
|
|
|
|
auto * child_processor = &child_port.getOutputPort().getProcessor();
|
|
|
|
if (visited.emplace(child_processor).second)
|
|
|
|
queue.push({ child_processor, visited_limit });
|
2019-04-08 14:55:20 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-06-03 19:50:11 +00:00
|
|
|
if (!rows_before_limit_at_least && (!limits.empty() || !sources.empty() || !remote_sources.empty()))
|
2020-03-19 11:45:52 +00:00
|
|
|
{
|
|
|
|
rows_before_limit_at_least = std::make_shared<RowsBeforeLimitCounter>();
|
|
|
|
|
|
|
|
for (auto & limit : limits)
|
|
|
|
limit->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
|
|
|
|
|
|
|
for (auto & source : sources)
|
|
|
|
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
2020-06-03 19:50:11 +00:00
|
|
|
|
|
|
|
for (auto & source : remote_sources)
|
|
|
|
source->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
2020-03-19 11:45:52 +00:00
|
|
|
}
|
|
|
|
|
2020-03-19 17:18:33 +00:00
|
|
|
/// If there is a limit, then enable rows_before_limit_at_least
|
|
|
|
/// It is needed when zero rows is read, but we still want rows_before_limit_at_least in result.
|
|
|
|
if (!limits.empty())
|
|
|
|
rows_before_limit_at_least->add(0);
|
|
|
|
|
2020-03-19 11:45:52 +00:00
|
|
|
if (rows_before_limit_at_least)
|
|
|
|
output_format->setRowsBeforeLimitCounter(rows_before_limit_at_least);
|
2019-04-08 14:55:20 +00:00
|
|
|
}
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
PipelineExecutorPtr QueryPipelineBuilder::execute()
|
2019-03-26 18:28:37 +00:00
|
|
|
{
|
2020-05-27 18:20:26 +00:00
|
|
|
if (!isCompleted())
|
|
|
|
throw Exception("Cannot execute pipeline because it is not completed.", ErrorCodes::LOGICAL_ERROR);
|
2019-03-26 18:28:37 +00:00
|
|
|
|
2020-08-04 15:51:56 +00:00
|
|
|
return std::make_shared<PipelineExecutor>(pipe.processors, process_list_element);
|
2020-06-25 09:39:17 +00:00
|
|
|
}
|
|
|
|
|
2021-09-15 19:35:48 +00:00
|
|
|
QueryPipeline QueryPipelineBuilder::getPipeline(QueryPipelineBuilder builder)
|
|
|
|
{
|
|
|
|
QueryPipeline res(std::move(builder.pipe));
|
|
|
|
res.setNumThreads(builder.getNumThreads());
|
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
void QueryPipelineBuilder::setCollectedProcessors(Processors * processors)
|
2020-06-25 09:39:17 +00:00
|
|
|
{
|
2020-08-04 15:51:56 +00:00
|
|
|
pipe.collected_processors = processors;
|
2020-06-25 09:39:17 +00:00
|
|
|
}
|
|
|
|
|
2020-08-07 08:28:12 +00:00
|
|
|
|
2021-09-14 16:28:41 +00:00
|
|
|
QueryPipelineProcessorsCollector::QueryPipelineProcessorsCollector(QueryPipelineBuilder & pipeline_, IQueryPlanStep * step_)
|
2020-06-25 09:39:17 +00:00
|
|
|
: pipeline(pipeline_), step(step_)
|
|
|
|
{
|
2020-08-04 15:51:56 +00:00
|
|
|
pipeline.setCollectedProcessors(&processors);
|
2020-06-25 09:39:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
QueryPipelineProcessorsCollector::~QueryPipelineProcessorsCollector()
|
|
|
|
{
|
2020-08-04 15:51:56 +00:00
|
|
|
pipeline.setCollectedProcessors(nullptr);
|
2020-06-25 09:39:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
Processors QueryPipelineProcessorsCollector::detachProcessors(size_t group)
|
|
|
|
{
|
|
|
|
for (auto & processor : processors)
|
|
|
|
processor->setQueryPlanStep(step, group);
|
|
|
|
|
2020-06-26 17:56:33 +00:00
|
|
|
Processors res;
|
|
|
|
res.swap(processors);
|
2020-06-25 09:39:17 +00:00
|
|
|
return res;
|
|
|
|
}
|
|
|
|
|
2019-03-26 18:28:37 +00:00
|
|
|
}
|