Merge pull request #12103 from ClickHouse/fix-12030

Fix over-limiting the number of threads for union.
This commit is contained in:
alexey-milovidov 2020-07-09 04:28:27 +03:00 committed by GitHub
commit cbb539c682
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 29 additions and 12 deletions

View File

@ -564,8 +564,13 @@ void QueryPipeline::setOutputFormat(ProcessorPtr output)
}
void QueryPipeline::unitePipelines(
std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header)
std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header, size_t max_threads_limit)
{
/// Should we limit the number of threads for united pipeline. True if all pipelines have max_threads != 0.
/// If true, result max_threads will be sum(max_threads).
/// Note: it may be > than settings.max_threads, so we should apply this limit again.
bool will_limit_max_threads = !initialized() || max_threads != 0;
if (initialized())
{
addSimpleTransform([&](const Block & header)
@ -630,9 +635,20 @@ void QueryPipeline::unitePipelines(
interpreter_context.insert(interpreter_context.end(), pipeline.interpreter_context.begin(), pipeline.interpreter_context.end());
storage_holders.insert(storage_holders.end(), pipeline.storage_holders.begin(), pipeline.storage_holders.end());
max_threads = std::max(max_threads, pipeline.max_threads);
max_threads += pipeline.max_threads;
will_limit_max_threads = will_limit_max_threads && pipeline.max_threads != 0;
/// If one of pipelines uses more threads then current limit, will keep it.
/// It may happen if max_distributed_connections > max_threads
if (pipeline.max_threads > max_threads_limit)
max_threads_limit = pipeline.max_threads;
}
if (!will_limit_max_threads)
max_threads = 0;
else
limitMaxThreads(max_threads_limit);
if (!extremes.empty())
{
if (extremes.size() == 1)

View File

@ -161,7 +161,7 @@ public:
/// Unite several pipelines together. Result pipeline would have common_header structure.
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
void unitePipelines(std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header);
void unitePipelines(std::vector<std::unique_ptr<QueryPipeline>> pipelines, const Block & common_header, size_t max_threads_limit = 0);
PipelineExecutorPtr execute();

View File

@ -30,15 +30,7 @@ QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines)
return pipeline;
}
size_t num_pipelines = pipelines.size();
pipeline->unitePipelines(std::move(pipelines), output_stream->header);
if (num_pipelines > 1)
{
// nested queries can force 1 thread (due to simplicity)
// but in case of union this cannot be done.
pipeline->setMaxThreads(std::min<UInt64>(num_pipelines, max_threads));
}
pipeline->unitePipelines(std::move(pipelines), output_stream->header ,max_threads);
processors = collector.detachProcessors();
return pipeline;

View File

@ -0,0 +1,2 @@
300
1

View File

@ -0,0 +1,7 @@
set log_queries = 1;
set max_threads = 16;
SELECT count() FROM (SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100);
system flush logs;
select length(thread_ids) >= 16 from system.query_log where event_date >= today() - 1 and query like '%SELECT count() FROM (SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100 UNION ALL SELECT number FROM numbers_mt(1000000) ORDER BY number DESC LIMIT 100)%' and type = 'QueryFinish' order by query_start_time desc limit 1;