diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 85a9ccb376e..b2bc525ac6e 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -398,19 +398,23 @@ BlockIO InterpreterInsertQuery::execute() return std::make_shared(in_header, actions); }); - auto num_select_threads = pipeline.getNumThreads(); - + size_t num_select_threads = pipeline.getNumThreads(); + size_t num_insert_threads = std::max_element(out_chains.begin(), out_chains.end(), [&](const auto &a, const auto &b) + { + return a.getNumThreads() < b.getNumThreads(); + })->getNumThreads(); pipeline.addChains(std::move(out_chains)); + pipeline.setMaxThreads(num_insert_threads); + /// Don't use more threads for insert then for select to reduce memory consumption. + if (!settings.parallel_view_processing && pipeline.getNumThreads() > num_select_threads) + pipeline.setMaxThreads(num_select_threads); + pipeline.setSinks([&](const Block & cur_header, QueryPipelineBuilder::StreamType) -> ProcessorPtr { return std::make_shared(cur_header); }); - /// Don't use more threads for insert then for select to reduce memory consumption. - if (!settings.parallel_view_processing && pipeline.getNumThreads() > num_select_threads) - pipeline.setMaxThreads(num_select_threads); - if (!allow_materialized) { for (const auto & column : metadata_snapshot->getColumns()) diff --git a/src/Processors/QueryPipelineBuilder.cpp b/src/Processors/QueryPipelineBuilder.cpp index d1324b66614..8ed413166da 100644 --- a/src/Processors/QueryPipelineBuilder.cpp +++ b/src/Processors/QueryPipelineBuilder.cpp @@ -139,7 +139,6 @@ void QueryPipelineBuilder::addChains(std::vector chains) { checkInitializedAndNotCompleted(); pipe.addChains(std::move(chains)); - setMaxThreads(pipe.maxParallelStreams()); } void QueryPipelineBuilder::addChain(Chain chain) @@ -149,7 +148,6 @@ void QueryPipelineBuilder::addChain(Chain chain) chains.emplace_back(std::move(chain)); pipe.resize(1); pipe.addChains(std::move(chains)); - setMaxThreads(pipe.maxParallelStreams()); } void QueryPipelineBuilder::transform(const Transformer & transformer)