Merge pull request #24874 from amosbird/rowpolicyfix1

Try reusing built sets during multi-pass analysis
This commit is contained in:
Nikolai Kochetov 2021-06-04 13:22:25 +03:00 committed by GitHub
commit 1850f411cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 199 additions and 228 deletions

View File

@ -1038,7 +1038,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(*database_catalog.getSystemDatabase(), has_zookeeper);
/// We load temporary database first, because projections need it.
database_catalog.loadTemporaryDatabase();
database_catalog.initializeAndLoadTemporaryDatabase();
/// Then, load remaining databases
loadMetadata(global_context, default_database);
database_catalog.loadDatabases();

View File

@ -132,7 +132,7 @@ StoragePtr TemporaryTableHolder::getTable() const
}
void DatabaseCatalog::loadTemporaryDatabase()
void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
{
drop_delay_sec = getContext()->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);

View File

@ -127,7 +127,7 @@ public:
static DatabaseCatalog & instance();
static void shutdown();
void loadTemporaryDatabase();
void initializeAndLoadTemporaryDatabase();
void loadDatabases();
/// Get an object that protects the table from concurrently executing multiple DDL operations.

View File

@ -136,13 +136,16 @@ ExpressionAnalyzer::ExpressionAnalyzer(
ContextPtr context_,
size_t subquery_depth_,
bool do_global,
SubqueriesForSets subqueries_for_sets_)
SubqueriesForSets subqueries_for_sets_,
PreparedSets prepared_sets_)
: WithContext(context_)
, query(query_), settings(getContext()->getSettings())
, subquery_depth(subquery_depth_)
, syntax(syntax_analyzer_result_)
{
/// Cache prepared sets because we might run analysis multiple times
subqueries_for_sets = std::move(subqueries_for_sets_);
prepared_sets = std::move(prepared_sets_);
/// external_tables, subqueries_for_sets for global subqueries.
/// Replaces global subqueries with the generated names of temporary tables that will be sent to remote servers.
@ -395,8 +398,7 @@ void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)
getRootActions(left_in_operand, true, temp_actions);
if (temp_actions->tryFindInIndex(left_in_operand->getColumnName()))
makeExplicitSet(func, *temp_actions, true, getContext(),
settings.size_limits_for_set, prepared_sets);
makeExplicitSet(func, *temp_actions, true, getContext(), settings.size_limits_for_set, prepared_sets);
}
}
}

View File

@ -96,12 +96,10 @@ private:
public:
/// Ctor for non-select queries. Generally its usage is:
/// auto actions = ExpressionAnalyzer(query, syntax, context).getActions();
ExpressionAnalyzer(
const ASTPtr & query_,
const TreeRewriterResultPtr & syntax_analyzer_result_,
ContextPtr context_)
: ExpressionAnalyzer(query_, syntax_analyzer_result_, context_, 0, false, {})
{}
ExpressionAnalyzer(const ASTPtr & query_, const TreeRewriterResultPtr & syntax_analyzer_result_, ContextPtr context_)
: ExpressionAnalyzer(query_, syntax_analyzer_result_, context_, 0, false, {}, {})
{
}
~ExpressionAnalyzer();
@ -125,6 +123,8 @@ public:
*/
SubqueriesForSets & getSubqueriesForSets() { return subqueries_for_sets; }
PreparedSets & getPreparedSets() { return prepared_sets; }
/// Get intermediates for tests
const ExpressionAnalyzerData & getAnalyzedData() const { return *this; }
@ -153,7 +153,8 @@ protected:
ContextPtr context_,
size_t subquery_depth_,
bool do_global_,
SubqueriesForSets subqueries_for_sets_);
SubqueriesForSets subqueries_for_sets_,
PreparedSets prepared_sets_);
ASTPtr query;
const ExtractedSettings settings;
@ -285,8 +286,16 @@ public:
const NameSet & required_result_columns_ = {},
bool do_global_ = false,
const SelectQueryOptions & options_ = {},
SubqueriesForSets subqueries_for_sets_ = {})
: ExpressionAnalyzer(query_, syntax_analyzer_result_, context_, options_.subquery_depth, do_global_, std::move(subqueries_for_sets_))
SubqueriesForSets subqueries_for_sets_ = {},
PreparedSets prepared_sets_ = {})
: ExpressionAnalyzer(
query_,
syntax_analyzer_result_,
context_,
options_.subquery_depth,
do_global_,
std::move(subqueries_for_sets_),
std::move(prepared_sets_))
, metadata_snapshot(metadata_snapshot_)
, required_result_columns(required_result_columns_)
, query_options(options_)

View File

@ -242,9 +242,11 @@ static void checkAccessRightsForSelect(
if (access->isGranted(AccessType::SELECT, table_id.database_name, table_id.table_name, column.name))
return;
}
throw Exception(context->getUserName() + ": Not enough privileges. "
"To execute this query it's necessary to have grant SELECT for at least one column on " + table_id.getFullTableName(),
ErrorCodes::ACCESS_DENIED);
throw Exception(
ErrorCodes::ACCESS_DENIED,
"{}: Not enough privileges. To execute this query it's necessary to have grant SELECT for at least one column on {}",
context->getUserName(),
table_id.getFullTableName());
}
/// General check.
@ -369,7 +371,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage)
view = dynamic_cast<StorageView *>(storage.get());
/// Reuse already built sets for multiple passes of analysis
SubqueriesForSets subquery_for_sets;
PreparedSets prepared_sets;
auto analyze = [&] (bool try_move_to_prewhere)
{
@ -429,9 +433,15 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}
query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>(
query_ptr, syntax_analyzer_result, context, metadata_snapshot,
NameSet(required_result_column_names.begin(), required_result_column_names.end()),
!options.only_analyze, options, std::move(subquery_for_sets));
query_ptr,
syntax_analyzer_result,
context,
metadata_snapshot,
NameSet(required_result_column_names.begin(), required_result_column_names.end()),
!options.only_analyze,
options,
std::move(subquery_for_sets),
std::move(prepared_sets));
if (!options.only_analyze)
{
@ -439,10 +449,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
if (query.final() && (input || input_pipe || !storage || !storage->supportsFinal()))
throw Exception((!input && !input_pipe && storage) ? "Storage " + storage->getName() + " doesn't support FINAL" : "Illegal FINAL", ErrorCodes::ILLEGAL_FINAL);
throw Exception(
(!input && !input_pipe && storage) ? "Storage " + storage->getName() + " doesn't support FINAL" : "Illegal FINAL",
ErrorCodes::ILLEGAL_FINAL);
if (query.prewhere() && (input || input_pipe || !storage || !storage->supportsPrewhere()))
throw Exception((!input && !input_pipe && storage) ? "Storage " + storage->getName() + " doesn't support PREWHERE" : "Illegal PREWHERE", ErrorCodes::ILLEGAL_PREWHERE);
throw Exception(
(!input && !input_pipe && storage) ? "Storage " + storage->getName() + " doesn't support PREWHERE" : "Illegal PREWHERE",
ErrorCodes::ILLEGAL_PREWHERE);
/// Save the new temporary tables in the query context
for (const auto & it : query_analyzer->getExternalTables())
@ -515,8 +529,10 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (need_analyze_again)
{
LOG_TRACE(log, "Running 'analyze' second time");
query_analyzer->getSubqueriesForSets().clear();
subquery_for_sets = SubqueriesForSets();
/// Reuse already built sets for multiple passes of analysis
subquery_for_sets = std::move(query_analyzer->getSubqueriesForSets());
prepared_sets = std::move(query_analyzer->getPreparedSets());
/// Do not try move conditions to PREWHERE for the second time.
/// Otherwise, we won't be able to fallback from inefficient PREWHERE to WHERE later.
@ -565,10 +581,10 @@ void InterpreterSelectQuery::buildQueryPlan(QueryPlan & query_plan)
if (!options.ignore_aggregation && !blocksHaveEqualStructure(query_plan.getCurrentDataStream().header, result_header))
{
auto convert_actions_dag = ActionsDAG::makeConvertingActions(
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
result_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
true);
query_plan.getCurrentDataStream().header.getColumnsWithTypeAndName(),
result_header.getColumnsWithTypeAndName(),
ActionsDAG::MatchColumnsMode::Name,
true);
auto converting = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), convert_actions_dag);
query_plan.addStep(std::move(converting));
@ -583,8 +599,7 @@ BlockIO InterpreterSelectQuery::execute()
buildQueryPlan(query_plan);
res.pipeline = std::move(*query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context)));
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)));
return res;
}
@ -623,13 +638,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
&& options.to_stage > QueryProcessingStage::WithMergeableState;
analysis_result = ExpressionAnalysisResult(
*query_analyzer,
metadata_snapshot,
first_stage,
second_stage,
options.only_analyze,
filter_info,
source_header);
*query_analyzer, metadata_snapshot, first_stage, second_stage, options.only_analyze, filter_info, source_header);
if (options.to_stage == QueryProcessingStage::Enum::FetchColumns)
{
@ -668,9 +677,8 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
// with this code. See
// https://github.com/ClickHouse/ClickHouse/issues/19857 for details.
if (analysis_result.before_window)
{
return analysis_result.before_window->getResultColumns();
}
return analysis_result.before_order_by->getResultColumns();
}
@ -701,9 +709,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
// It's different from selected_columns, see the comment above for
// WithMergeableState stage.
if (analysis_result.before_window)
{
return analysis_result.before_window->getResultColumns();
}
return analysis_result.before_order_by->getResultColumns();
}
@ -782,8 +788,7 @@ static SortDescription getSortDescription(const ASTSelectQuery & query, ContextP
if (order_by_elem.with_fill)
{
FillColumnDescription fill_desc = getWithFillDescription(order_by_elem, context);
order_descr.emplace_back(name, order_by_elem.direction,
order_by_elem.nulls_direction, collator, true, fill_desc);
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator, true, fill_desc);
}
else
order_descr.emplace_back(name, order_by_elem.direction, order_by_elem.nulls_direction, collator);
@ -811,11 +816,14 @@ static UInt64 getLimitUIntValue(const ASTPtr & node, ContextPtr context, const s
const auto & [field, type] = evaluateConstantExpression(node, context);
if (!isNativeNumber(type))
throw Exception("Illegal type " + type->getName() + " of " + expr + " expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION);
throw Exception(
"Illegal type " + type->getName() + " of " + expr + " expression, must be numeric type", ErrorCodes::INVALID_LIMIT_EXPRESSION);
Field converted = convertFieldToType(field, DataTypeUInt64());
if (converted.isNull())
throw Exception("The value " + applyVisitor(FieldVisitorToString(), field) + " of " + expr + " expression is not representable as UInt64", ErrorCodes::INVALID_LIMIT_EXPRESSION);
throw Exception(
"The value " + applyVisitor(FieldVisitorToString(), field) + " of " + expr + " expression is not representable as UInt64",
ErrorCodes::INVALID_LIMIT_EXPRESSION);
return converted.safeGet<UInt64>();
}
@ -962,10 +970,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
}
auto prewhere_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expressions.prewhere_info->prewhere_actions,
expressions.prewhere_info->prewhere_column_name,
expressions.prewhere_info->remove_prewhere_column);
query_plan.getCurrentDataStream(),
expressions.prewhere_info->prewhere_actions,
expressions.prewhere_info->prewhere_column_name,
expressions.prewhere_info->remove_prewhere_column);
prewhere_step->setStepDescription("PREWHERE");
query_plan.addStep(std::move(prewhere_step));
@ -976,8 +984,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.prewhere_info->remove_columns_actions)
{
auto remove_columns = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(),
expressions.prewhere_info->remove_columns_actions);
query_plan.getCurrentDataStream(), expressions.prewhere_info->remove_columns_actions);
remove_columns->setStepDescription("Remove unnecessary columns after PREWHERE");
query_plan.addStep(std::move(remove_columns));
@ -988,8 +995,8 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
{
if (prepared_input)
{
auto prepared_source_step = std::make_unique<ReadFromPreparedSource>(
Pipe(std::make_shared<SourceFromInputStream>(prepared_input)), context);
auto prepared_source_step
= std::make_unique<ReadFromPreparedSource>(Pipe(std::make_shared<SourceFromInputStream>(prepared_input)), context);
query_plan.addStep(std::move(prepared_source_step));
}
else if (prepared_pipe)
@ -1073,10 +1080,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (!query_info.projection && expressions.filter_info)
{
auto row_level_security_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expressions.filter_info->actions,
expressions.filter_info->column_name,
expressions.filter_info->do_remove_column);
query_plan.getCurrentDataStream(),
expressions.filter_info->actions,
expressions.filter_info->column_name,
expressions.filter_info->do_remove_column);
row_level_security_step->setStepDescription("Row-level security filter");
query_plan.addStep(std::move(row_level_security_step));
@ -1084,18 +1091,16 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.before_array_join)
{
QueryPlanStepPtr before_array_join_step = std::make_unique<ExpressionStep>(
query_plan.getCurrentDataStream(),
expressions.before_array_join);
QueryPlanStepPtr before_array_join_step
= std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expressions.before_array_join);
before_array_join_step->setStepDescription("Before ARRAY JOIN");
query_plan.addStep(std::move(before_array_join_step));
}
if (expressions.array_join)
{
QueryPlanStepPtr array_join_step = std::make_unique<ArrayJoinStep>(
query_plan.getCurrentDataStream(),
expressions.array_join);
QueryPlanStepPtr array_join_step
= std::make_unique<ArrayJoinStep>(query_plan.getCurrentDataStream(), expressions.array_join);
array_join_step->setStepDescription("ARRAY JOIN");
query_plan.addStep(std::move(array_join_step));
@ -1228,7 +1233,8 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (query.group_by_with_totals)
{
bool final = !query.group_by_with_rollup && !query.group_by_with_cube;
executeTotalsAndHaving(query_plan, expressions.hasHaving(), expressions.before_having, aggregate_overflow_row, final);
executeTotalsAndHaving(
query_plan, expressions.hasHaving(), expressions.before_having, aggregate_overflow_row, final);
}
if (query.group_by_with_rollup)
@ -1239,7 +1245,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if ((query.group_by_with_rollup || query.group_by_with_cube) && expressions.hasHaving())
{
if (query.group_by_with_totals)
throw Exception("WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(
"WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING",
ErrorCodes::NOT_IMPLEMENTED);
executeHaving(query_plan, expressions.before_having);
}
}
@ -1259,7 +1267,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (from_aggregation_stage)
{
if (query_analyzer->hasWindow())
throw Exception("Window functions does not support processing from WithMergeableStateAfterAggregation", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(
"Window functions does not support processing from WithMergeableStateAfterAggregation",
ErrorCodes::NOT_IMPLEMENTED);
}
else if (expressions.need_aggregate)
{
@ -1384,8 +1394,7 @@ static StreamLocalLimits getLimitsForStorage(const Settings & settings, const Se
{
StreamLocalLimits limits;
limits.mode = LimitsMode::LIMITS_TOTAL;
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read,
settings.read_overflow_mode);
limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode);
limits.speed_limits.max_execution_time = settings.max_execution_time;
limits.timeout_overflow_mode = settings.timeout_overflow_mode;
@ -1446,11 +1455,11 @@ static void executeMergeAggregatedImpl(
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
std::move(transform_params),
settings.distributed_aggregation_memory_efficient && is_remote_storage,
settings.max_threads,
settings.aggregation_memory_efficient_merge_threads);
query_plan.getCurrentDataStream(),
std::move(transform_params),
settings.distributed_aggregation_memory_efficient && is_remote_storage,
settings.max_threads,
settings.aggregation_memory_efficient_merge_threads);
query_plan.addStep(std::move(merging_aggregated));
}
@ -1467,33 +1476,22 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
if (prewhere_info.alias_actions)
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header,
prewhere_info.alias_actions);
});
pipe.addSimpleTransform(
[&](const Block & header) { return std::make_shared<ExpressionTransform>(header, prewhere_info.alias_actions); });
}
if (prewhere_info.row_level_filter)
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header,
prewhere_info.row_level_filter,
prewhere_info.row_level_column_name,
true);
return std::make_shared<FilterTransform>(header, prewhere_info.row_level_filter, prewhere_info.row_level_column_name, true);
});
}
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FilterTransform>(
header,
prewhere_info.prewhere_actions,
prewhere_info.prewhere_column_name,
prewhere_info.remove_prewhere_column);
header, prewhere_info.prewhere_actions, prewhere_info.prewhere_column_name, prewhere_info.remove_prewhere_column);
});
// To remove additional columns
@ -1502,12 +1500,8 @@ void InterpreterSelectQuery::addEmptySourceToQueryPlan(
// This leads to mismatched header in distributed table
if (prewhere_info.remove_columns_actions)
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ExpressionTransform>(
header,
prewhere_info.remove_columns_actions);
});
pipe.addSimpleTransform(
[&](const Block & header) { return std::make_shared<ExpressionTransform>(header, prewhere_info.remove_columns_actions); });
}
}
@ -1703,12 +1697,7 @@ void InterpreterSelectQuery::addPrewhereAliasActions()
prewhere_info->remove_prewhere_column = false;
/// Remove columns which will be added by prewhere.
required_columns.erase(
std::remove_if(
required_columns.begin(),
required_columns.end(),
[&](const String & name) { return required_columns_after_prewhere_set.count(name) != 0; }),
required_columns.end());
std::erase_if(required_columns, [&](const String & name) { return required_columns_after_prewhere_set.count(name) != 0; });
if (prewhere_info)
{
@ -1813,10 +1802,11 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
/// Limitation on the number of columns to read.
/// It's not applied in 'only_analyze' mode, because the query could be analyzed without removal of unnecessary columns.
if (!options.only_analyze && settings.max_columns_to_read && required_columns.size() > settings.max_columns_to_read)
throw Exception("Limit for number of columns to read exceeded. "
"Requested: " + toString(required_columns.size())
+ ", maximum: " + settings.max_columns_to_read.toString(),
ErrorCodes::TOO_MANY_COLUMNS);
throw Exception(
ErrorCodes::TOO_MANY_COLUMNS,
"Limit for number of columns to read exceeded. Requested: {}, maximum: {}",
required_columns.size(),
settings.max_columns_to_read);
/// General limit for the number of threads.
size_t max_threads_execute_query = settings.max_threads;
@ -1909,14 +1899,18 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
auto actions_settings = ExpressionActionsSettings::fromContext(context, CompileExpressions::yes);
query_info.prewhere_info = std::make_shared<PrewhereInfo>();
query_info.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings);
query_info.prewhere_info->prewhere_actions
= std::make_shared<ExpressionActions>(prewhere_info->prewhere_actions, actions_settings);
if (prewhere_info->row_level_filter_actions)
query_info.prewhere_info->row_level_filter = std::make_shared<ExpressionActions>(prewhere_info->row_level_filter_actions, actions_settings);
query_info.prewhere_info->row_level_filter
= std::make_shared<ExpressionActions>(prewhere_info->row_level_filter_actions, actions_settings);
if (prewhere_info->alias_actions)
query_info.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(prewhere_info->alias_actions, actions_settings);
query_info.prewhere_info->alias_actions
= std::make_shared<ExpressionActions>(prewhere_info->alias_actions, actions_settings);
if (prewhere_info->remove_columns_actions)
query_info.prewhere_info->remove_columns_actions = std::make_shared<ExpressionActions>(prewhere_info->remove_columns_actions, actions_settings);
query_info.prewhere_info->remove_columns_actions
= std::make_shared<ExpressionActions>(prewhere_info->remove_columns_actions, actions_settings);
query_info.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
query_info.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
@ -1976,15 +1970,13 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
if (!options.ignore_limits)
{
limits = getLimitsForStorage(settings, options);
leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, settings.max_bytes_to_read_leaf,
settings.read_overflow_mode_leaf);
leaf_limits = SizeLimits(settings.max_rows_to_read_leaf, settings.max_bytes_to_read_leaf, settings.read_overflow_mode_leaf);
}
if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
quota = context->getQuota();
storage->read(query_plan, required_columns, metadata_snapshot,
query_info, context, processing_stage, max_block_size, max_streams);
storage->read(query_plan, required_columns, metadata_snapshot, query_info, context, processing_stage, max_block_size, max_streams);
if (context->hasQueryContext() && !options.is_internal)
{
@ -2009,13 +2001,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
/// Extend lifetime of context, table lock, storage. Set limits and quota.
auto adding_limits_and_quota = std::make_unique<SettingQuotaAndLimitsStep>(
query_plan.getCurrentDataStream(),
storage,
std::move(table_lock),
limits,
leaf_limits,
std::move(quota),
context);
query_plan.getCurrentDataStream(), storage, std::move(table_lock), limits, leaf_limits, std::move(quota), context);
adding_limits_and_quota->setStepDescription("Set limits and quota after reading from storage");
query_plan.addStep(std::move(adding_limits_and_quota));
}
@ -2044,10 +2030,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
void InterpreterSelectQuery::executeWhere(QueryPlan & query_plan, const ActionsDAGPtr & expression, bool remove_filter)
{
auto where_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expression,
getSelectQuery().where()->getColumnName(),
remove_filter);
query_plan.getCurrentDataStream(), expression, getSelectQuery().where()->getColumnName(), remove_filter);
where_step->setStepDescription("WHERE");
query_plan.addStep(std::move(where_step));
@ -2076,15 +2059,20 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
const Settings & settings = context->getSettingsRef();
Aggregator::Params params(header_before_aggregation, keys, aggregates,
overflow_row, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
Aggregator::Params params(
header_before_aggregation,
keys,
aggregates,
overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
SortDescription group_by_sort_description;
@ -2095,20 +2083,21 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
auto merge_threads = max_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
bool storage_has_evenly_distributed_read = storage && storage->hasEvenlyDistributedRead();
auto aggregating_step = std::make_unique<AggregatingStep>(
query_plan.getCurrentDataStream(),
params, final,
settings.max_block_size,
merge_threads,
temporary_data_merge_threads,
storage_has_evenly_distributed_read,
std::move(group_by_info),
std::move(group_by_sort_description));
query_plan.getCurrentDataStream(),
params,
final,
settings.max_block_size,
merge_threads,
temporary_data_merge_threads,
storage_has_evenly_distributed_read,
std::move(group_by_info),
std::move(group_by_sort_description));
query_plan.addStep(std::move(aggregating_step));
}
@ -2135,24 +2124,27 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
void InterpreterSelectQuery::executeHaving(QueryPlan & query_plan, const ActionsDAGPtr & expression)
{
auto having_step = std::make_unique<FilterStep>(
query_plan.getCurrentDataStream(),
expression, getSelectQuery().having()->getColumnName(), false);
auto having_step
= std::make_unique<FilterStep>(query_plan.getCurrentDataStream(), expression, getSelectQuery().having()->getColumnName(), false);
having_step->setStepDescription("HAVING");
query_plan.addStep(std::move(having_step));
}
void InterpreterSelectQuery::executeTotalsAndHaving(QueryPlan & query_plan, bool has_having, const ActionsDAGPtr & expression, bool overflow_row, bool final)
void InterpreterSelectQuery::executeTotalsAndHaving(
QueryPlan & query_plan, bool has_having, const ActionsDAGPtr & expression, bool overflow_row, bool final)
{
const Settings & settings = context->getSettingsRef();
auto totals_having_step = std::make_unique<TotalsHavingStep>(
query_plan.getCurrentDataStream(),
overflow_row, expression,
has_having ? getSelectQuery().having()->getColumnName() : "",
settings.totals_mode, settings.totals_auto_threshold, final);
query_plan.getCurrentDataStream(),
overflow_row,
expression,
has_having ? getSelectQuery().having()->getColumnName() : "",
settings.totals_mode,
settings.totals_auto_threshold,
final);
query_plan.addStep(std::move(totals_having_step));
}
@ -2169,10 +2161,20 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
const Settings & settings = context->getSettingsRef();
Aggregator::Params params(header_before_transform, keys, query_analyzer->aggregates(),
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, 0, 0,
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
Aggregator::Params params(
header_before_transform,
keys,
query_analyzer->aggregates(),
false,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
0,
0,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
@ -2189,9 +2191,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const ActionsDAGPtr & expression, const std::string & description)
{
if (!expression)
{
return;
}
auto expression_step = std::make_unique<ExpressionStep>(query_plan.getCurrentDataStream(), expression);
@ -2199,8 +2199,7 @@ void InterpreterSelectQuery::executeExpression(QueryPlan & query_plan, const Act
query_plan.addStep(std::move(expression_step));
}
static bool windowDescriptionComparator(const WindowDescription * _left,
const WindowDescription * _right)
static bool windowDescriptionComparator(const WindowDescription * _left, const WindowDescription * _right)
{
const auto & left = _left->full_sort_description;
const auto & right = _right->full_sort_description;
@ -2208,37 +2207,21 @@ static bool windowDescriptionComparator(const WindowDescription * _left,
for (size_t i = 0; i < std::min(left.size(), right.size()); ++i)
{
if (left[i].column_name < right[i].column_name)
{
return true;
}
else if (left[i].column_name > right[i].column_name)
{
return false;
}
else if (left[i].column_number < right[i].column_number)
{
return true;
}
else if (left[i].column_number > right[i].column_number)
{
return false;
}
else if (left[i].direction < right[i].direction)
{
return true;
}
else if (left[i].direction > right[i].direction)
{
return false;
}
else if (left[i].nulls_direction < right[i].nulls_direction)
{
return true;
}
else if (left[i].nulls_direction > right[i].nulls_direction)
{
return false;
}
assert(left[i] == right[i]);
}
@ -2255,16 +2238,12 @@ static bool sortIsPrefix(const WindowDescription & _prefix,
const auto & full = _full.full_sort_description;
if (prefix.size() > full.size())
{
return false;
}
for (size_t i = 0; i < prefix.size(); ++i)
{
if (full[i] != prefix[i])
{
return false;
}
}
return true;
@ -2276,12 +2255,9 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
// sort description goes first, and all window that use its prefixes follow.
std::vector<const WindowDescription *> windows_sorted;
for (const auto & [_, w] : query_analyzer->windowDescriptions())
{
windows_sorted.push_back(&w);
}
std::sort(windows_sorted.begin(), windows_sorted.end(),
windowDescriptionComparator);
std::sort(windows_sorted.begin(), windows_sorted.end(), windowDescriptionComparator);
const Settings & settings = context->getSettingsRef();
for (size_t i = 0; i < windows_sorted.size(); ++i)
@ -2292,17 +2268,14 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
// has suitable sorting. Also don't create sort steps when there are no
// columns to sort by, because the sort nodes are confused by this. It
// happens in case of `over ()`.
if (!w.full_sort_description.empty()
&& (i == 0 || !sortIsPrefix(w, *windows_sorted[i - 1])))
if (!w.full_sort_description.empty() && (i == 0 || !sortIsPrefix(w, *windows_sorted[i - 1])))
{
auto partial_sorting = std::make_unique<PartialSortingStep>(
query_plan.getCurrentDataStream(),
w.full_sort_description,
0 /* LIMIT */,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort,
settings.sort_overflow_mode));
partial_sorting->setStepDescription("Sort each block for window '"
+ w.window_name + "'");
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode));
partial_sorting->setStepDescription("Sort each block for window '" + w.window_name + "'");
query_plan.addStep(std::move(partial_sorting));
auto merge_sorting_step = std::make_unique<MergeSortingStep>(
@ -2315,8 +2288,7 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
settings.max_bytes_before_external_sort,
context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
merge_sorting_step->setStepDescription(
"Merge sorted blocks for window '" + w.window_name + "'");
merge_sorting_step->setStepDescription("Merge sorted blocks for window '" + w.window_name + "'");
query_plan.addStep(std::move(merge_sorting_step));
// First MergeSorted, now MergingSorted.
@ -2325,17 +2297,12 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
w.full_sort_description,
settings.max_block_size,
0 /* LIMIT */);
merging_sorted->setStepDescription(
"Merge sorted streams for window '" + w.window_name + "'");
merging_sorted->setStepDescription("Merge sorted streams for window '" + w.window_name + "'");
query_plan.addStep(std::move(merging_sorted));
}
auto window_step = std::make_unique<WindowStep>(
query_plan.getCurrentDataStream(),
w,
w.window_functions);
window_step->setStepDescription("Window step for window '"
+ w.window_name + "'");
auto window_step = std::make_unique<WindowStep>(query_plan.getCurrentDataStream(), w, w.window_functions);
window_step->setStepDescription("Window step for window '" + w.window_name + "'");
query_plan.addStep(std::move(window_step));
}
@ -2347,11 +2314,7 @@ void InterpreterSelectQuery::executeOrderOptimized(QueryPlan & query_plan, Input
const Settings & settings = context->getSettingsRef();
auto finish_sorting_step = std::make_unique<FinishSortingStep>(
query_plan.getCurrentDataStream(),
input_sorting_info->order_key_prefix_descr,
output_order_descr,
settings.max_block_size,
limit);
query_plan.getCurrentDataStream(), input_sorting_info->order_key_prefix_descr, output_order_descr, settings.max_block_size, limit);
query_plan.addStep(std::move(finish_sorting_step));
}
@ -2377,25 +2340,25 @@ void InterpreterSelectQuery::executeOrder(QueryPlan & query_plan, InputOrderInfo
const Settings & settings = context->getSettingsRef();
auto partial_sorting = std::make_unique<PartialSortingStep>(
query_plan.getCurrentDataStream(),
output_order_descr,
limit,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode));
query_plan.getCurrentDataStream(),
output_order_descr,
limit,
SizeLimits(settings.max_rows_to_sort, settings.max_bytes_to_sort, settings.sort_overflow_mode));
partial_sorting->setStepDescription("Sort each block for ORDER BY");
query_plan.addStep(std::move(partial_sorting));
/// Merge the sorted blocks.
auto merge_sorting_step = std::make_unique<MergeSortingStep>(
query_plan.getCurrentDataStream(),
output_order_descr,
settings.max_block_size,
limit,
settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
query_plan.getCurrentDataStream(),
output_order_descr,
settings.max_block_size,
limit,
settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
merge_sorting_step->setStepDescription("Merge sorted blocks for ORDER BY");
query_plan.addStep(std::move(merge_sorting_step));
@ -2418,10 +2381,8 @@ void InterpreterSelectQuery::executeMergeSorted(QueryPlan & query_plan, const So
{
const Settings & settings = context->getSettingsRef();
auto merging_sorted = std::make_unique<MergingSortedStep>(
query_plan.getCurrentDataStream(),
sort_description,
settings.max_block_size, limit);
auto merging_sorted
= std::make_unique<MergingSortedStep>(query_plan.getCurrentDataStream(), sort_description, settings.max_block_size, limit);
merging_sorted->setStepDescription("Merge sorted streams " + description);
query_plan.addStep(std::move(merging_sorted));
@ -2453,9 +2414,8 @@ void InterpreterSelectQuery::executeDistinct(QueryPlan & query_plan, bool before
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
auto distinct_step = std::make_unique<DistinctStep>(
query_plan.getCurrentDataStream(),
limits, limit_for_distinct, columns, pre_distinct);
auto distinct_step
= std::make_unique<DistinctStep>(query_plan.getCurrentDataStream(), limits, limit_for_distinct, columns, pre_distinct);
if (pre_distinct)
distinct_step->setStepDescription("Preliminary DISTINCT");

View File

@ -61,10 +61,10 @@ public:
/// Read data not from the table specified in the query, but from the prepared pipe `input`.
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
Pipe input_pipe_,
const SelectQueryOptions & = {});
const ASTPtr & query_ptr_,
ContextPtr context_,
Pipe input_pipe_,
const SelectQueryOptions & = {});
/// Read data not from the table specified in the query, but from the specified `storage_`.
InterpreterSelectQuery(