diff --git a/src/Processors/QueryPipeline.cpp b/src/Processors/QueryPipeline.cpp index fa04082c82f..f3635ac5408 100644 --- a/src/Processors/QueryPipeline.cpp +++ b/src/Processors/QueryPipeline.cpp @@ -564,8 +564,13 @@ void QueryPipeline::setOutputFormat(ProcessorPtr output) } void QueryPipeline::unitePipelines( - std::vector> pipelines, const Block & common_header) + std::vector> pipelines, const Block & common_header, size_t max_threads_limit) { + /// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0. + /// If true, result max_threads will be sum(max_threads). + /// Note: it may be > than settings.max_threads, so we should apply this limit again. + bool will_limit_max_threads = !initialized() || max_threads != 0; + if (initialized()) { addSimpleTransform([&](const Block & header) @@ -630,9 +635,20 @@ void QueryPipeline::unitePipelines( interpreter_context.insert(interpreter_context.end(), pipeline.interpreter_context.begin(), pipeline.interpreter_context.end()); storage_holders.insert(storage_holders.end(), pipeline.storage_holders.begin(), pipeline.storage_holders.end()); - max_threads = std::max(max_threads, pipeline.max_threads); + max_threads += pipeline.max_threads; + will_limit_max_threads = will_limit_max_threads && pipeline.max_threads != 0; + + /// If one of pipelines uses more threads then current limit, will keep it. + /// It may happen if max_distributed_connections > max_threads + if (pipeline.max_threads > max_threads_limit) + max_threads_limit = pipeline.max_threads; } + if (!will_limit_max_threads) + max_threads = 0; + else + limitMaxThreads(max_threads_limit); + if (!extremes.empty()) { if (extremes.size() == 1) diff --git a/src/Processors/QueryPipeline.h b/src/Processors/QueryPipeline.h index 7990b0b79f5..7bd16ff62fd 100644 --- a/src/Processors/QueryPipeline.h +++ b/src/Processors/QueryPipeline.h @@ -161,7 +161,7 @@ public: /// Unite several pipelines together. Result pipeline would have common_header structure. /// If collector is used, it will collect only newly-added processors, but not processors from pipelines. - void unitePipelines(std::vector> pipelines, const Block & common_header); + void unitePipelines(std::vector> pipelines, const Block & common_header, size_t max_threads_limit = 0); PipelineExecutorPtr execute(); diff --git a/src/Processors/QueryPlan/UnionStep.cpp b/src/Processors/QueryPlan/UnionStep.cpp index a8897e778a1..b645eb3f3d7 100644 --- a/src/Processors/QueryPlan/UnionStep.cpp +++ b/src/Processors/QueryPlan/UnionStep.cpp @@ -30,15 +30,7 @@ QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines) return pipeline; } - size_t num_pipelines = pipelines.size(); - pipeline->unitePipelines(std::move(pipelines), output_stream->header); - - if (num_pipelines > 1) - { - // nested queries can force 1 thread (due to simplicity) - // but in case of union this cannot be done. - pipeline->setMaxThreads(std::min(num_pipelines, max_threads)); - } + pipeline->unitePipelines(std::move(pipelines), output_stream->header ,max_threads); processors = collector.detachProcessors(); return pipeline; diff --git a/tests/queries/0_stateless/01358_union_threads_bug.reference b/tests/queries/0_stateless/01358_union_threads_bug.reference new file mode 100644 index 00000000000..4397f4e2fdd --- /dev/null +++ b/tests/queries/0_stateless/01358_union_threads_bug.reference @@ -0,0 +1,2 @@ +300 +1 diff --git a/tests/queries/0_stateless/01358_union_threads_bug.sql b/tests/queries/0_stateless/01358_union_threads_bug.sql new file mode 100644 index 00000000000..0c07364f41c --- /dev/null +++ b/tests/queries/0_stateless/01358_union_threads_bug.sql @@ -0,0 +1,7 @@ +set log_queries = 1; +set max_threads = 16; + +SELECT count() FROM (SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100); + +system flush logs; +select length(thread_ids) >= 16 from system.query_log where event_date >= today() - 1 and query like '%SELECT count() FROM (SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100)%' and type = 'QueryFinish' order by query_start_time desc limit 1;