diff --git a/src/Storages/MergeTree/MergeTreeDataUtils.cpp b/src/Storages/MergeTree/MergeTreeDataUtils.cpp index 0da4b4dbed1..65dea9c901f 100644 --- a/src/Storages/MergeTree/MergeTreeDataUtils.cpp +++ b/src/Storages/MergeTree/MergeTreeDataUtils.cpp @@ -57,60 +57,74 @@ bool getQueryProcessingStageWithAggregateProjection( // Let's traverse backward to finish the check. // TODO what if there is a column with name sum(x) and an aggregate sum(x)? auto rewrite_before_where = - [&](ProjectionCandidate & candidate, const ProjectionDescription & projection, NameSet & required_columns, const Block & aggregates) + [&](ProjectionCandidate & candidate, const ProjectionDescription & projection, + NameSet & required_columns, const Block & source_block, const Block & aggregates) + { + if (analysis_result.before_where) { - if (analysis_result.before_where) - { - candidate.before_where = analysis_result.before_where->clone(); - required_columns = candidate.before_where->foldActionsByProjection( - required_columns, projection.sample_block_for_keys, query_ptr->as().where()->getColumnName()); - if (required_columns.empty()) - return false; - candidate.before_where->addAggregatesViaProjection(aggregates); - } + candidate.before_where = analysis_result.before_where->clone(); + required_columns = candidate.before_where->foldActionsByProjection( + required_columns, + projection.sample_block_for_keys, + query_ptr->as().where()->getColumnName()); + if (required_columns.empty()) + return false; + candidate.before_where->addAggregatesViaProjection(aggregates); + } - if (analysis_result.prewhere_info) - { - auto & prewhere_info = analysis_result.prewhere_info; - candidate.prewhere_info = std::make_shared(); - candidate.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name; - candidate.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column; - candidate.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name; - candidate.prewhere_info->need_filter = prewhere_info->need_filter; + if (analysis_result.prewhere_info) + { + auto & prewhere_info = analysis_result.prewhere_info; + candidate.prewhere_info = std::make_shared(); + candidate.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name; + candidate.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column; + candidate.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name; + candidate.prewhere_info->need_filter = prewhere_info->need_filter; - auto actions_settings = ExpressionActionsSettings::fromSettings(query_context->getSettingsRef()); - auto prewhere_actions = prewhere_info->prewhere_actions->clone(); - NameSet prewhere_required_columns; - prewhere_required_columns = prewhere_actions->foldActionsByProjection( - prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name); + auto actions_settings = ExpressionActionsSettings::fromSettings(query_context->getSettingsRef()); + auto prewhere_actions = prewhere_info->prewhere_actions->clone(); + NameSet prewhere_required_columns; + prewhere_required_columns = prewhere_actions->foldActionsByProjection( + prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name); + if (prewhere_required_columns.empty()) + return false; + candidate.prewhere_info->prewhere_actions = std::make_shared(prewhere_actions, actions_settings); + + if (prewhere_info->row_level_filter_actions) + { + auto row_level_filter_actions = prewhere_info->row_level_filter_actions->clone(); + prewhere_required_columns = row_level_filter_actions->foldActionsByProjection( + prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name); if (prewhere_required_columns.empty()) return false; - candidate.prewhere_info->prewhere_actions = std::make_shared(prewhere_actions, actions_settings); - - if (prewhere_info->row_level_filter_actions) - { - auto row_level_filter_actions = prewhere_info->row_level_filter_actions->clone(); - prewhere_required_columns = row_level_filter_actions->foldActionsByProjection( - prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name); - if (prewhere_required_columns.empty()) - return false; - candidate.prewhere_info->row_level_filter - = std::make_shared(row_level_filter_actions, actions_settings); - } - - if (prewhere_info->alias_actions) - { - auto alias_actions = prewhere_info->alias_actions->clone(); - prewhere_required_columns - = alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys); - if (prewhere_required_columns.empty()) - return false; - candidate.prewhere_info->alias_actions = std::make_shared(alias_actions, actions_settings); - } - required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end()); + candidate.prewhere_info->row_level_filter + = std::make_shared(row_level_filter_actions, actions_settings); } - return true; - }; + + if (prewhere_info->alias_actions) + { + auto alias_actions = prewhere_info->alias_actions->clone(); + prewhere_required_columns + = alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys); + if (prewhere_required_columns.empty()) + return false; + candidate.prewhere_info->alias_actions = std::make_shared(alias_actions, actions_settings); + } + required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end()); + } + + bool match = true; + for (const auto & column : required_columns) + { + /// There are still missing columns, fail to match + if (!source_block.has(column)) + { + match = false; + break; + } + } + return match; + }; for (const auto & projection : metadata_snapshot->projections) { @@ -152,7 +166,7 @@ bool getQueryProcessingStageWithAggregateProjection( // Attach aggregates candidate.before_aggregation->addAggregatesViaProjection(aggregates); - if (rewrite_before_where(candidate, projection, required_columns, aggregates)) + if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block_for_keys, aggregates)) { candidate.required_columns = {required_columns.begin(), required_columns.end()}; for (const auto & aggregate : aggregates) @@ -174,7 +188,7 @@ bool getQueryProcessingStageWithAggregateProjection( for (const auto & column : analysis_result.prewhere_info->prewhere_actions->getResultColumns()) required_columns.insert(column.name); } - if (rewrite_before_where(candidate, projection, required_columns, {})) + if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block, {})) { candidate.required_columns = {required_columns.begin(), required_columns.end()}; candidates.push_back(std::move(candidate)); diff --git a/tests/queries/0_stateless/01710_projections.sql b/tests/queries/0_stateless/01710_projections.sql index 5740bd05f5c..e1824bb4273 100644 --- a/tests/queries/0_stateless/01710_projections.sql +++ b/tests/queries/0_stateless/01710_projections.sql @@ -1,8 +1,8 @@ drop table if exists projection_test; -create table projection_test (datetime DateTime, domain LowCardinality(String), x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64), projection p (select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration), count(), sum(block_count) / sum(duration), avg(block_count / duration), sum(buffer_time) / sum(duration), avg(buffer_time / duration), sum(valid_bytes) / sum(total_bytes), sum(completed_bytes) / sum(total_bytes), sum(fixed_bytes) / sum(total_bytes), sum(force_bytes) / sum(total_bytes), sum(valid_bytes) / sum(total_bytes), sum(retry_count) / sum(duration), avg(retry_count / duration), countIf(block_count > 0) / count(), countIf(first_time = 0) / count(), uniqHLL12(x_id), uniqHLL12(y_id) group by dt_m, domain) type aggregate) engine MergeTree partition by toDate(datetime) order by (toStartOfTenMinutes(datetime), domain); +create table projection_test (`sum(block_count)` UInt64, domain_alias UInt64 alias length(domain), datetime DateTime, domain LowCardinality(String), x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64), projection p (select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration), count(), sum(block_count) / sum(duration), avg(block_count / duration), sum(buffer_time) / sum(duration), avg(buffer_time / duration), sum(valid_bytes) / sum(total_bytes), sum(completed_bytes) / sum(total_bytes), sum(fixed_bytes) / sum(total_bytes), sum(force_bytes) / sum(total_bytes), sum(valid_bytes) / sum(total_bytes), sum(retry_count) / sum(duration), avg(retry_count / duration), countIf(block_count > 0) / count(), countIf(first_time = 0) / count(), uniqHLL12(x_id), uniqHLL12(y_id) group by dt_m, domain) type aggregate) engine MergeTree partition by toDate(datetime) order by (toStartOfTenMinutes(datetime), domain); -insert into projection_test with rowNumberInAllBlocks() as id select toDateTime('2020-10-24 00:00:00') + (id / 20), toString(id % 100), * from generateRandom('x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64)', 10, 10, 1) limit 1000 settings max_threads = 1; +insert into projection_test with rowNumberInAllBlocks() as id select 1, toDateTime('2020-10-24 00:00:00') + (id / 20), toString(id % 100), * from generateRandom('x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64)', 10, 10, 1) limit 1000 settings max_threads = 1; set allow_experimental_projection_optimization = 1, force_optimize_projection = 1; @@ -11,8 +11,20 @@ select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) from projection_t select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration) from projection_test where domain = '1' group by dt_m order by dt_m; +-- prewhere with alias +select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration) from projection_test prewhere domain_alias = 3 where domain = '1' group by dt_m order by dt_m; + +drop row policy if exists filter on projection_test; +create row policy filter on projection_test using (domain = 'non_existing_domain') to all; +-- prewhere with alias with row policy +select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration) from projection_test prewhere domain_alias = 1 where domain = '1' group by dt_m order by dt_m; +drop row policy filter on projection_test; + select toStartOfMinute(datetime) dt_m, count(), sum(block_count) / sum(duration), avg(block_count / duration) from projection_test group by dt_m order by dt_m; +-- TODO figure out how to deal with conflict column names +-- select toStartOfMinute(datetime) dt_m, count(), sum(block_count) / sum(duration), avg(block_count / duration) from projection_test where `sum(block_count)` = 1 group by dt_m order by dt_m; + select toStartOfMinute(datetime) dt_m, sum(buffer_time) / sum(duration), avg(buffer_time / duration), sum(valid_bytes) / sum(total_bytes), sum(completed_bytes) / sum(total_bytes), sum(fixed_bytes) / sum(total_bytes), sum(force_bytes) / sum(total_bytes), sum(valid_bytes) / sum(total_bytes) from projection_test where domain in ('12', '14') group by dt_m order by dt_m; select toStartOfMinute(datetime) dt_m, domain, sum(retry_count) / sum(duration), avg(retry_count / duration), countIf(block_count > 0) / count(), countIf(first_time = 0) / count() from projection_test group by dt_m, domain having domain = '19' order by dt_m, domain;