diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index cf1b4589a3b..e86fb5ea778 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -72,8 +72,6 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input, const Names & requi initSettings(); const Settings & settings = context.getSettingsRef(); - original_max_threads = settings.max_threads; - if (settings.limits.max_subquery_depth && subquery_depth > settings.limits.max_subquery_depth) throw Exception("Too deep subqueries. Maximum: " + settings.limits.max_subquery_depth.toString(), ErrorCodes::TOO_DEEP_SUBQUERIES); @@ -770,6 +768,22 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() if (query.prewhere_expression && (!storage || !storage->supportsPrewhere())) throw Exception(storage ? "Storage " + storage->getName() + " doesn't support PREWHERE" : "Illegal PREWHERE", ErrorCodes::ILLEGAL_PREWHERE); + const Settings & settings = context.getSettingsRef(); + + /// Limitation on the number of columns to read. + if (settings.limits.max_columns_to_read && required_columns.size() > settings.limits.max_columns_to_read) + throw Exception("Limit for number of columns to read exceeded. " + "Requested: " + toString(required_columns.size()) + + ", maximum: " + settings.limits.max_columns_to_read.toString(), + ErrorCodes::TOO_MUCH_COLUMNS); + + size_t limit_length = 0; + size_t limit_offset = 0; + getLimitLengthAndOffset(query, limit_length, limit_offset); + + size_t max_block_size = settings.max_block_size; + size_t max_streams = settings.max_threads; + /** With distributed query processing, almost no computations are done in the threads, * but wait and receive data from remote servers. * If we have 20 remote servers, and max_threads = 8, then it would not be very good @@ -782,25 +796,12 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() * and there must be an original value of max_threads, not an increased value. */ bool is_remote = false; - Settings & settings = context.getSettingsRef(); - Settings settings_for_storage = settings; if (storage && storage->isRemote()) { is_remote = true; - settings.max_threads = settings.max_distributed_connections; + max_streams = settings.max_distributed_connections; } - /// Limitation on the number of columns to read. - if (settings.limits.max_columns_to_read && required_columns.size() > settings.limits.max_columns_to_read) - throw Exception("Limit for number of columns to read exceeded. " - "Requested: " + toString(required_columns.size()) - + ", maximum: " + settings.limits.max_columns_to_read.toString(), - ErrorCodes::TOO_MUCH_COLUMNS); - - size_t limit_length = 0; - size_t limit_offset = 0; - getLimitLengthAndOffset(query, limit_length, limit_offset); - /** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY but LIMIT is specified, and limit + offset < max_block_size, * then as the block size we will use limit + offset (not to read more from the table than requested), * and also set the number of threads to 1. @@ -816,8 +817,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() && !query_analyzer->hasAggregation() && limit_length + limit_offset < settings.max_block_size) { - settings.max_block_size = limit_length + limit_offset; - settings.max_threads = 1; + max_block_size = limit_length + limit_offset; + max_streams = 1; } QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns; @@ -827,8 +828,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() /// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery? if (!interpreter_subquery) { - size_t max_streams = settings.max_threads; - if (max_streams == 0) throw Exception("Logical error: zero number of streams requested", ErrorCodes::LOGICAL_ERROR); @@ -846,8 +845,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() actual_query_ptr = query_ptr; streams = storage->read(required_columns, actual_query_ptr, - context, settings_for_storage, from_stage, - settings.max_block_size, max_streams); + context, settings, from_stage, + max_block_size, max_streams); if (alias_actions) /// Wrap each stream returned from the table to calculate and add ALIAS columns @@ -996,7 +995,7 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina executeUnion(); /// Now merge the aggregated blocks - streams[0] = std::make_shared(streams[0], params, final, original_max_threads); + streams[0] = std::make_shared(streams[0], params, final, settings.max_threads); } else { @@ -1004,7 +1003,7 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina settings.max_threads, settings.aggregation_memory_efficient_merge_threads ? size_t(settings.aggregation_memory_efficient_merge_threads) - : original_max_threads); + : size_t(settings.max_threads)); streams.resize(1); } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.h b/dbms/src/Interpreters/InterpreterSelectQuery.h index 9044c1f3676..5f4a33bf60a 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.h +++ b/dbms/src/Interpreters/InterpreterSelectQuery.h @@ -167,7 +167,6 @@ private: ASTPtr query_ptr; ASTSelectQuery & query; Context context; - size_t original_max_threads; /// В settings настройка max_threads может быть изменена. В original_max_threads сохраняется изначальное значение. QueryProcessingStage::Enum to_stage; size_t subquery_depth; std::unique_ptr query_analyzer;