From 727ce48c2378d14ef083037828f2ddf268999066 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 4 Jul 2017 00:04:10 +0300 Subject: [PATCH] Fixed error with selecting number of threads for distributed query processing [#CLICKHOUSE-3115]. --- .../Interpreters/InterpreterSelectQuery.cpp | 21 +++++++------------ .../src/Interpreters/InterpreterSelectQuery.h | 3 +++ dbms/src/Storages/StorageDistributed.cpp | 2 +- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index ffa68aa16fe..f35b43b985c 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -79,6 +79,8 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input, const Names & requi throw Exception("Too deep subqueries. Maximum: " + settings.limits.max_subquery_depth.toString(), ErrorCodes::TOO_DEEP_SUBQUERIES); + max_streams = settings.max_threads; + if (is_first_select_inside_union_all) { /// Create a SELECT query chain. @@ -785,7 +787,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() getLimitLengthAndOffset(query, limit_length, limit_offset); size_t max_block_size = settings.max_block_size; - size_t max_streams = settings.max_threads; /** With distributed query processing, almost no computations are done in the threads, * but wait and receive data from remote servers. @@ -793,10 +794,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() * connect and ask only 8 servers at a time. * To simultaneously query more remote servers, * instead of max_threads, max_distributed_connections is used. - * - * Save the initial value of max_threads in settings_for_storage - * - these settings will be passed to remote servers for distributed query processing, - * and there must be an original value of max_threads, not an increased value. */ bool is_remote = false; if (storage && storage->isRemote()) @@ -947,10 +944,10 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression, { streams[0] = std::make_shared( streams, stream_with_non_joined_data, params, final, - settings.max_threads, + max_streams, settings.aggregation_memory_efficient_merge_threads - ? settings.aggregation_memory_efficient_merge_threads - : settings.max_threads); + ? static_cast(settings.aggregation_memory_efficient_merge_threads) + : max_streams); stream_with_non_joined_data = nullptr; streams.resize(1); @@ -1004,12 +1001,12 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina executeUnion(); /// Now merge the aggregated blocks - streams[0] = std::make_shared(streams[0], params, final, settings.max_threads); + streams[0] = std::make_shared(streams[0], params, final, max_streams); } else { streams[0] = std::make_shared(streams, params, final, - settings.max_threads, + max_streams, settings.aggregation_memory_efficient_merge_threads ? size_t(settings.aggregation_memory_efficient_merge_threads) : size_t(settings.max_threads)); @@ -1185,9 +1182,7 @@ void InterpreterSelectQuery::executeUnion() /// If there are still several streams, then we combine them into one if (hasMoreThanOneStream()) { - const Settings & settings = context.getSettingsRef(); - - streams[0] = std::make_shared>(streams, stream_with_non_joined_data, settings.max_threads); + streams[0] = std::make_shared>(streams, stream_with_non_joined_data, max_streams); stream_with_non_joined_data = nullptr; streams.resize(1); union_within_single_query = false; diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index 67f5a208d79..83b1722b2cc 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -172,6 +172,9 @@ private: std::unique_ptr query_analyzer; NamesAndTypesList table_column_names; + /// How many streams we ask for storage to produce, and in how many threads we will do further processing. + size_t max_streams = 1; + /** Streams of data. * The source data streams are produced in the executeFetchColumns function. * Then they are converted (wrapped in other streams) using the `execute*` functions, diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 181ec2ecedb..ac39930e55e 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -223,7 +223,7 @@ BlockInputStreams StorageDistributed::read( bool enable_shard_multiplexing = false; ClusterProxy::SelectQueryConstructor select_query_constructor( - processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables); + processed_stage, QualifiedTableName{remote_database, remote_table}, external_tables); return ClusterProxy::Query{select_query_constructor, cluster, modified_query_ast, context, settings, enable_shard_multiplexing}.execute();