Move QueryPipelineBuilder::setMaxThreads() into the InterpreterInsertQuery

This commit is contained in:
Azat Khuzhin 2021-10-06 21:17:22 +03:00
parent ae5570e6d6
commit 83c271f446
2 changed files with 10 additions and 8 deletions

View File

@ -398,19 +398,23 @@ BlockIO InterpreterInsertQuery::execute()
return std::make_shared<ExpressionTransform>(in_header, actions); return std::make_shared<ExpressionTransform>(in_header, actions);
}); });
auto num_select_threads = pipeline.getNumThreads(); size_t num_select_threads = pipeline.getNumThreads();
size_t num_insert_threads = std::max_element(out_chains.begin(), out_chains.end(), [&](const auto &a, const auto &b)
{
return a.getNumThreads() < b.getNumThreads();
})->getNumThreads();
pipeline.addChains(std::move(out_chains)); pipeline.addChains(std::move(out_chains));
pipeline.setMaxThreads(num_insert_threads);
/// Don't use more threads for insert then for select to reduce memory consumption.
if (!settings.parallel_view_processing && pipeline.getNumThreads() > num_select_threads)
pipeline.setMaxThreads(num_select_threads);
pipeline.setSinks([&](const Block & cur_header, QueryPipelineBuilder::StreamType) -> ProcessorPtr pipeline.setSinks([&](const Block & cur_header, QueryPipelineBuilder::StreamType) -> ProcessorPtr
{ {
return std::make_shared<EmptySink>(cur_header); return std::make_shared<EmptySink>(cur_header);
}); });
/// Don't use more threads for insert then for select to reduce memory consumption.
if (!settings.parallel_view_processing && pipeline.getNumThreads() > num_select_threads)
pipeline.setMaxThreads(num_select_threads);
if (!allow_materialized) if (!allow_materialized)
{ {
for (const auto & column : metadata_snapshot->getColumns()) for (const auto & column : metadata_snapshot->getColumns())

View File

@ -139,7 +139,6 @@ void QueryPipelineBuilder::addChains(std::vector<Chain> chains)
{ {
checkInitializedAndNotCompleted(); checkInitializedAndNotCompleted();
pipe.addChains(std::move(chains)); pipe.addChains(std::move(chains));
setMaxThreads(pipe.maxParallelStreams());
} }
void QueryPipelineBuilder::addChain(Chain chain) void QueryPipelineBuilder::addChain(Chain chain)
@ -149,7 +148,6 @@ void QueryPipelineBuilder::addChain(Chain chain)
chains.emplace_back(std::move(chain)); chains.emplace_back(std::move(chain));
pipe.resize(1); pipe.resize(1);
pipe.addChains(std::move(chains)); pipe.addChains(std::move(chains));
setMaxThreads(pipe.maxParallelStreams());
} }
void QueryPipelineBuilder::transform(const Transformer & transformer) void QueryPipelineBuilder::transform(const Transformer & transformer)