Preparation [#CLICKHOUSE-31].

This commit is contained in:
Alexey Milovidov 2017-05-24 23:13:04 +03:00
parent 8d570e2768
commit cb83b200cb
2 changed files with 22 additions and 4 deletions

View File

@ -70,6 +70,7 @@ void InterpreterSelectQuery::init(BlockInputStreamPtr input, const Names & requi
ProfileEvents::increment(ProfileEvents::SelectQuery);
initSettings();
const Settings & settings = context.getSettingsRef();
original_max_threads = settings.max_threads;
@ -369,6 +370,8 @@ BlockIO InterpreterSelectQuery::execute()
/// Constraints apply only to the final result.
if (to_stage == QueryProcessingStage::Complete)
{
const Settings & settings = context.getSettingsRef();
IProfilingBlockInputStream::LocalLimits limits;
limits.mode = IProfilingBlockInputStream::LIMITS_CURRENT;
limits.max_rows_to_read = settings.limits.max_result_rows;
@ -431,6 +434,8 @@ void InterpreterSelectQuery::executeSingleQuery()
LOG_TRACE(log, QueryProcessingStage::toString(from_stage) << " -> " << QueryProcessingStage::toString(to_stage));
const Settings & settings = context.getSettingsRef();
if (to_stage > QueryProcessingStage::FetchColumns)
{
bool has_join = false;
@ -777,6 +782,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
* and there must be an original value of max_threads, not an increased value.
*/
bool is_remote = false;
Settings & settings = context.getSettingsRef();
Settings settings_for_storage = settings;
if (storage && storage->isRemote())
{
@ -913,6 +919,8 @@ void InterpreterSelectQuery::executeAggregation(ExpressionActionsPtr expression,
AggregateDescriptions aggregates;
query_analyzer->getAggregateInfo(key_names, aggregates);
const Settings & settings = context.getSettingsRef();
/** Two-level aggregation is useful in two cases:
* 1. Parallel aggregation is done, and the results should be measured in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged memory efficient.
@ -980,6 +988,8 @@ void InterpreterSelectQuery::executeMergeAggregated(bool overflow_row, bool fina
Aggregator::Params params(key_names, aggregates, overflow_row);
const Settings & settings = context.getSettingsRef();
if (!settings.distributed_aggregation_memory_efficient)
{
/// We union several sources into one, parallelizing the work.
@ -1014,6 +1024,8 @@ void InterpreterSelectQuery::executeTotalsAndHaving(bool has_having, ExpressionA
{
executeUnion();
const Settings & settings = context.getSettingsRef();
streams[0] = std::make_shared<TotalsHavingBlockInputStream>(
streams[0], overflow_row, expression,
has_having ? query.having_expression->getColumnName() : "", settings.totals_mode, settings.totals_auto_threshold);
@ -1069,6 +1081,8 @@ void InterpreterSelectQuery::executeOrder()
SortDescription order_descr = getSortDescription(query);
size_t limit = getLimitForSorting(query);
const Settings & settings = context.getSettingsRef();
transformStreams([&](auto & stream)
{
auto sorting_stream = std::make_shared<PartialSortingBlockInputStream>(stream, order_descr, limit);
@ -1099,6 +1113,8 @@ void InterpreterSelectQuery::executeMergeSorted()
SortDescription order_descr = getSortDescription(query);
size_t limit = getLimitForSorting(query);
const Settings & settings = context.getSettingsRef();
/// If there are several streams, then we merge them into one
if (hasMoreThanOneStream())
{
@ -1130,6 +1146,8 @@ void InterpreterSelectQuery::executeDistinct(bool before_order, Names columns)
{
if (query.distinct)
{
const Settings & settings = context.getSettingsRef();
size_t limit_length = 0;
size_t limit_offset = 0;
getLimitLengthAndOffset(query, limit_length, limit_offset);
@ -1159,6 +1177,8 @@ void InterpreterSelectQuery::executeUnion()
/// If there are still several streams, then we combine them into one
if (hasMoreThanOneStream())
{
const Settings & settings = context.getSettingsRef();
streams[0] = std::make_shared<UnionBlockInputStream<>>(streams, stream_with_non_joined_data, settings.max_threads);
stream_with_non_joined_data = nullptr;
streams.resize(1);
@ -1281,6 +1301,8 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(SubqueriesForSets &
for (auto & elem : subqueries_for_sets)
elem.second.table.reset();
const Settings & settings = context.getSettingsRef();
executeUnion();
streams[0] = std::make_shared<CreatingSetsBlockInputStream>(streams[0], subqueries_for_sets, settings.limits);
}
@ -1318,8 +1340,6 @@ void InterpreterSelectQuery::initSettings()
{
if (query.settings)
InterpreterSetQuery(query.settings, context).executeForCurrentContext();
settings = context.getSettings();
}
}

View File

@ -157,7 +157,6 @@ private:
void ignoreWithTotals();
/** Если в запросе SELECT есть секция SETTINGS, то применить настройки из неё.
* Затем достать настройки из context и поместить их в settings.
*
* Секция SETTINGS - настройки для конкретного запроса.
* Обычно настройки могут быть переданы другими способами, не внутри запроса.
@ -168,7 +167,6 @@ private:
ASTPtr query_ptr;
ASTSelectQuery & query;
Context context;
Settings settings;
size_t original_max_threads; /// В settings настройка max_threads может быть изменена. В original_max_threads сохраняется изначальное значение.
QueryProcessingStage::Enum to_stage;
size_t subquery_depth;