Preparation: untangle settings manipulation in InterpreterSelectQuery [#CLICKHOUSE-31].

This commit is contained in:
Alexey Milovidov 2017-05-24 23:25:01 +03:00
parent cb83b200cb
commit 59ac7f8063
2 changed files with 23 additions and 25 deletions

View File

@ -72,8 +72,6 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input, const Names & requi
initSettings();
const Settings & settings = context.getSettingsRef();
original_max_threads = settings.max_threads;
if (settings.limits.max_subquery_depth && subquery_depth > settings.limits.max_subquery_depth)
throw Exception("Too deep subqueries. Maximum: " + settings.limits.max_subquery_depth.toString(),
ErrorCodes::TOO_DEEP_SUBQUERIES);
@ -770,6 +768,22 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
if (query.prewhere_expression && (!storage || !storage->supportsPrewhere()))
throw Exception(storage ? "Storage " + storage->getName() + " doesn't support PREWHERE" : "Illegal PREWHERE", ErrorCodes::ILLEGAL_PREWHERE);
const Settings & settings = context.getSettingsRef();
/// Limitation on the number of columns to read.
if (settings.limits.max_columns_to_read && required_columns.size() > settings.limits.max_columns_to_read)
throw Exception("Limit for number of columns to read exceeded. "
"Requested: " + toString(required_columns.size())
+ ", maximum: " + settings.limits.max_columns_to_read.toString(),
ErrorCodes::TOO_MUCH_COLUMNS);
size_t limit_length = 0;
size_t limit_offset = 0;
getLimitLengthAndOffset(query, limit_length, limit_offset);
size_t max_block_size = settings.max_block_size;
size_t max_streams = settings.max_threads;
/** With distributed query processing, almost no computations are done in the threads,
* but wait and receive data from remote servers.
* If we have 20 remote servers, and max_threads = 8, then it would not be very good
@ -782,25 +796,12 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
* and there must be an original value of max_threads, not an increased value.
*/
bool is_remote = false;
Settings & settings = context.getSettingsRef();
Settings settings_for_storage = settings;
if (storage && storage->isRemote())
{
is_remote = true;
settings.max_threads = settings.max_distributed_connections;
max_streams = settings.max_distributed_connections;
}
/// Limitation on the number of columns to read.
if (settings.limits.max_columns_to_read && required_columns.size() > settings.limits.max_columns_to_read)
throw Exception("Limit for number of columns to read exceeded. "
"Requested: " + toString(required_columns.size())
+ ", maximum: " + settings.limits.max_columns_to_read.toString(),
ErrorCodes::TOO_MUCH_COLUMNS);
size_t limit_length = 0;
size_t limit_offset = 0;
getLimitLengthAndOffset(query, limit_length, limit_offset);
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY but LIMIT is specified, and limit + offset < max_block_size,
* then as the block size we will use limit + offset (not to read more from the table than requested),
* and also set the number of threads to 1.
@ -816,8 +817,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
&& !query_analyzer->hasAggregation()
&& limit_length + limit_offset < settings.max_block_size)
{
settings.max_block_size = limit_length + limit_offset;
settings.max_threads = 1;
max_block_size = limit_length + limit_offset;
max_streams = 1;
}
QueryProcessingStage::Enum from_stage = QueryProcessingStage::FetchColumns;
@ -827,8 +828,6 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
/// Initialize the initial data streams to which the query transforms are superimposed. Table or subquery?
if (!interpreter_subquery)
{
size_t max_streams = settings.max_threads;
if (max_streams == 0)
throw Exception("Logical error: zero number of streams requested", ErrorCodes::LOGICAL_ERROR);
@ -846,8 +845,8 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
actual_query_ptr = query_ptr;
streams = storage->read(required_columns, actual_query_ptr,
context, settings_for_storage, from_stage,
settings.max_block_size, max_streams);
context, settings, from_stage,
max_block_size, max_streams);
if (alias_actions)
/// Wrap each stream returned from the table to calculate and add ALIAS columns
@ -996,7 +995,7 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina
executeUnion();
/// Now merge the aggregated blocks
streams[0] = std::make_shared<MergingAggregatedBlockInputStream>(streams[0], params, final, original_max_threads);
streams[0] = std::make_shared<MergingAggregatedBlockInputStream>(streams[0], params, final, settings.max_threads);
}
else
{
@ -1004,7 +1003,7 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina
settings.max_threads,
settings.aggregation_memory_efficient_merge_threads
? size_t(settings.aggregation_memory_efficient_merge_threads)
: original_max_threads);
: size_t(settings.max_threads));
streams.resize(1);
}

View File

@ -167,7 +167,6 @@ private:
ASTPtr query_ptr;
ASTSelectQuery & query;
Context context;
size_t original_max_threads; /// В settings настройка max_threads может быть изменена. В original_max_threads сохраняется изначальное значение.
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;
std::unique_ptr<ExpressionAnalyzer> query_analyzer;