Fix missing columns

This commit is contained in:
Amos Bird 2021-04-29 20:02:44 +08:00
parent 35961c0c5d
commit e1e560765c
No known key found for this signature in database
GPG Key ID: 80D430DCBECFEDB4
2 changed files with 78 additions and 52 deletions

View File

@ -57,60 +57,74 @@ bool getQueryProcessingStageWithAggregateProjection(
// Let's traverse backward to finish the check.
// TODO what if there is a column with name sum(x) and an aggregate sum(x)?
auto rewrite_before_where =
[&](ProjectionCandidate & candidate, const ProjectionDescription & projection, NameSet & required_columns, const Block & aggregates)
[&](ProjectionCandidate & candidate, const ProjectionDescription & projection,
NameSet & required_columns, const Block & source_block, const Block & aggregates)
{
if (analysis_result.before_where)
{
if (analysis_result.before_where)
{
candidate.before_where = analysis_result.before_where->clone();
required_columns = candidate.before_where->foldActionsByProjection(
required_columns, projection.sample_block_for_keys, query_ptr->as<const ASTSelectQuery &>().where()->getColumnName());
if (required_columns.empty())
return false;
candidate.before_where->addAggregatesViaProjection(aggregates);
}
candidate.before_where = analysis_result.before_where->clone();
required_columns = candidate.before_where->foldActionsByProjection(
required_columns,
projection.sample_block_for_keys,
query_ptr->as<const ASTSelectQuery &>().where()->getColumnName());
if (required_columns.empty())
return false;
candidate.before_where->addAggregatesViaProjection(aggregates);
}
if (analysis_result.prewhere_info)
{
auto & prewhere_info = analysis_result.prewhere_info;
candidate.prewhere_info = std::make_shared<PrewhereInfo>();
candidate.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
candidate.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
candidate.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name;
candidate.prewhere_info->need_filter = prewhere_info->need_filter;
if (analysis_result.prewhere_info)
{
auto & prewhere_info = analysis_result.prewhere_info;
candidate.prewhere_info = std::make_shared<PrewhereInfo>();
candidate.prewhere_info->prewhere_column_name = prewhere_info->prewhere_column_name;
candidate.prewhere_info->remove_prewhere_column = prewhere_info->remove_prewhere_column;
candidate.prewhere_info->row_level_column_name = prewhere_info->row_level_column_name;
candidate.prewhere_info->need_filter = prewhere_info->need_filter;
auto actions_settings = ExpressionActionsSettings::fromSettings(query_context->getSettingsRef());
auto prewhere_actions = prewhere_info->prewhere_actions->clone();
NameSet prewhere_required_columns;
prewhere_required_columns = prewhere_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name);
auto actions_settings = ExpressionActionsSettings::fromSettings(query_context->getSettingsRef());
auto prewhere_actions = prewhere_info->prewhere_actions->clone();
NameSet prewhere_required_columns;
prewhere_required_columns = prewhere_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->prewhere_column_name);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_actions, actions_settings);
if (prewhere_info->row_level_filter_actions)
{
auto row_level_filter_actions = prewhere_info->row_level_filter_actions->clone();
prewhere_required_columns = row_level_filter_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->prewhere_actions = std::make_shared<ExpressionActions>(prewhere_actions, actions_settings);
if (prewhere_info->row_level_filter_actions)
{
auto row_level_filter_actions = prewhere_info->row_level_filter_actions->clone();
prewhere_required_columns = row_level_filter_actions->foldActionsByProjection(
prewhere_required_columns, projection.sample_block_for_keys, prewhere_info->row_level_column_name);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->row_level_filter
= std::make_shared<ExpressionActions>(row_level_filter_actions, actions_settings);
}
if (prewhere_info->alias_actions)
{
auto alias_actions = prewhere_info->alias_actions->clone();
prewhere_required_columns
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(alias_actions, actions_settings);
}
required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
candidate.prewhere_info->row_level_filter
= std::make_shared<ExpressionActions>(row_level_filter_actions, actions_settings);
}
return true;
};
if (prewhere_info->alias_actions)
{
auto alias_actions = prewhere_info->alias_actions->clone();
prewhere_required_columns
= alias_actions->foldActionsByProjection(prewhere_required_columns, projection.sample_block_for_keys);
if (prewhere_required_columns.empty())
return false;
candidate.prewhere_info->alias_actions = std::make_shared<ExpressionActions>(alias_actions, actions_settings);
}
required_columns.insert(prewhere_required_columns.begin(), prewhere_required_columns.end());
}
bool match = true;
for (const auto & column : required_columns)
{
/// There are still missing columns, fail to match
if (!source_block.has(column))
{
match = false;
break;
}
}
return match;
};
for (const auto & projection : metadata_snapshot->projections)
{
@ -152,7 +166,7 @@ bool getQueryProcessingStageWithAggregateProjection(
// Attach aggregates
candidate.before_aggregation->addAggregatesViaProjection(aggregates);
if (rewrite_before_where(candidate, projection, required_columns, aggregates))
if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block_for_keys, aggregates))
{
candidate.required_columns = {required_columns.begin(), required_columns.end()};
for (const auto & aggregate : aggregates)
@ -174,7 +188,7 @@ bool getQueryProcessingStageWithAggregateProjection(
for (const auto & column : analysis_result.prewhere_info->prewhere_actions->getResultColumns())
required_columns.insert(column.name);
}
if (rewrite_before_where(candidate, projection, required_columns, {}))
if (rewrite_before_where(candidate, projection, required_columns, projection.sample_block, {}))
{
candidate.required_columns = {required_columns.begin(), required_columns.end()};
candidates.push_back(std::move(candidate));

View File

@ -1,8 +1,8 @@
drop table if exists projection_test;
create table projection_test (datetime DateTime, domain LowCardinality(String), x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64), projection p (select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration), count(), sum(block_count) / sum(duration), avg(block_count / duration), sum(buffer_time) / sum(duration), avg(buffer_time / duration), sum(valid_bytes) / sum(total_bytes), sum(completed_bytes) / sum(total_bytes), sum(fixed_bytes) / sum(total_bytes), sum(force_bytes) / sum(total_bytes), sum(valid_bytes) / sum(total_bytes), sum(retry_count) / sum(duration), avg(retry_count / duration), countIf(block_count > 0) / count(), countIf(first_time = 0) / count(), uniqHLL12(x_id), uniqHLL12(y_id) group by dt_m, domain) type aggregate) engine MergeTree partition by toDate(datetime) order by (toStartOfTenMinutes(datetime), domain);
create table projection_test (`sum(block_count)` UInt64, domain_alias UInt64 alias length(domain), datetime DateTime, domain LowCardinality(String), x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64), projection p (select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration), count(), sum(block_count) / sum(duration), avg(block_count / duration), sum(buffer_time) / sum(duration), avg(buffer_time / duration), sum(valid_bytes) / sum(total_bytes), sum(completed_bytes) / sum(total_bytes), sum(fixed_bytes) / sum(total_bytes), sum(force_bytes) / sum(total_bytes), sum(valid_bytes) / sum(total_bytes), sum(retry_count) / sum(duration), avg(retry_count / duration), countIf(block_count > 0) / count(), countIf(first_time = 0) / count(), uniqHLL12(x_id), uniqHLL12(y_id) group by dt_m, domain) type aggregate) engine MergeTree partition by toDate(datetime) order by (toStartOfTenMinutes(datetime), domain);
insert into projection_test with rowNumberInAllBlocks() as id select toDateTime('2020-10-24 00:00:00') + (id / 20), toString(id % 100), * from generateRandom('x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64)', 10, 10, 1) limit 1000 settings max_threads = 1;
insert into projection_test with rowNumberInAllBlocks() as id select 1, toDateTime('2020-10-24 00:00:00') + (id / 20), toString(id % 100), * from generateRandom('x_id String, y_id String, block_count Int64, retry_count Int64, duration Int64, kbytes Int64, buffer_time Int64, first_time Int64, total_bytes Nullable(UInt64), valid_bytes Nullable(UInt64), completed_bytes Nullable(UInt64), fixed_bytes Nullable(UInt64), force_bytes Nullable(UInt64)', 10, 10, 1) limit 1000 settings max_threads = 1;
set allow_experimental_projection_optimization = 1, force_optimize_projection = 1;
@ -11,8 +11,20 @@ select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) from projection_t
select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration) from projection_test where domain = '1' group by dt_m order by dt_m;
-- prewhere with alias
select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration) from projection_test prewhere domain_alias = 3 where domain = '1' group by dt_m order by dt_m;
drop row policy if exists filter on projection_test;
create row policy filter on projection_test using (domain = 'non_existing_domain') to all;
-- prewhere with alias with row policy
select toStartOfMinute(datetime) dt_m, countIf(first_time = 0) / count(), avg((kbytes * 8) / duration) from projection_test prewhere domain_alias = 1 where domain = '1' group by dt_m order by dt_m;
drop row policy filter on projection_test;
select toStartOfMinute(datetime) dt_m, count(), sum(block_count) / sum(duration), avg(block_count / duration) from projection_test group by dt_m order by dt_m;
-- TODO figure out how to deal with conflict column names
-- select toStartOfMinute(datetime) dt_m, count(), sum(block_count) / sum(duration), avg(block_count / duration) from projection_test where `sum(block_count)` = 1 group by dt_m order by dt_m;
select toStartOfMinute(datetime) dt_m, sum(buffer_time) / sum(duration), avg(buffer_time / duration), sum(valid_bytes) / sum(total_bytes), sum(completed_bytes) / sum(total_bytes), sum(fixed_bytes) / sum(total_bytes), sum(force_bytes) / sum(total_bytes), sum(valid_bytes) / sum(total_bytes) from projection_test where domain in ('12', '14') group by dt_m order by dt_m;
select toStartOfMinute(datetime) dt_m, domain, sum(retry_count) / sum(duration), avg(retry_count / duration), countIf(block_count > 0) / count(), countIf(first_time = 0) / count() from projection_test group by dt_m, domain having domain = '19' order by dt_m, domain;