From 0d79474acc3cfb6f2c8dfbed26aa5d5f0346fc4f Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 30 Sep 2020 19:10:15 +0300 Subject: [PATCH] Fix TTL with GROUP BY and fix test according to new logic --- src/DataStreams/TTLBlockInputStream.cpp | 184 ++++++++++-------- src/DataStreams/TTLBlockInputStream.h | 7 +- src/Storages/TTLDescription.cpp | 17 +- .../01280_ttl_where_group_by.reference | 16 +- .../0_stateless/01280_ttl_where_group_by.sh | 8 +- .../01280_ttl_where_group_by_negative.sql | 3 - 6 files changed, 134 insertions(+), 101 deletions(-) diff --git a/src/DataStreams/TTLBlockInputStream.cpp b/src/DataStreams/TTLBlockInputStream.cpp index 6dba8968f79..5c49b9f11c2 100644 --- a/src/DataStreams/TTLBlockInputStream.cpp +++ b/src/DataStreams/TTLBlockInputStream.cpp @@ -88,7 +88,6 @@ TTLBlockInputStream::TTLBlockInputStream( descr.arguments.push_back(header.getPositionByName(name)); agg_aggregate_columns.resize(storage_rows_ttl.aggregate_descriptions.size()); - const Settings & settings = storage.global_context.getSettingsRef(); Aggregator::Params params(header, keys, aggregates, @@ -108,14 +107,15 @@ Block TTLBlockInputStream::readImpl() { /// Skip all data if table ttl is expired for part auto storage_rows_ttl = metadata_snapshot->getRowsTTL(); - if (metadata_snapshot->hasRowsTTL() && !storage_rows_ttl.where_expression && storage_rows_ttl.mode != TTLMode::GROUP_BY + if (metadata_snapshot->hasRowsTTL() + && !storage_rows_ttl.where_expression + && storage_rows_ttl.mode != TTLMode::GROUP_BY && isTTLExpired(old_ttl_infos.table_ttl.max)) { rows_removed = data_part->rows_count; return {}; } - Block block = children.at(0)->read(); if (!block) { @@ -130,10 +130,9 @@ Block TTLBlockInputStream::readImpl() } if (metadata_snapshot->hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min))) - removeRowsWithExpiredTableTTL(block); + executeRowsTTL(block); removeValuesWithExpiredColumnTTL(block); - updateMovesTTL(block); updateRecompressionTTL(block); @@ -167,107 +166,117 @@ static ColumnPtr extractRequieredColumn(const ExpressionActions & expression, co return block_copy.getByName(result_column).column; } -void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) +void TTLBlockInputStream::executeRowsTTL(Block & block) { auto rows_ttl = metadata_snapshot->getRowsTTL(); auto ttl_column = extractRequieredColumn(*rows_ttl.expression, block, rows_ttl.result_column); auto where_result_column = rows_ttl.where_expression ? - extractRequieredColumn(*rows_ttl.where_expression, block, rows_ttl.where_result_column) : nullptr; + extractRequieredColumn(*rows_ttl.where_expression, block, rows_ttl.where_result_column): nullptr; + if (aggregator) + aggregateRowsWithExpiredTTL(block, ttl_column, where_result_column); + else + removeRowsWithExpiredTTL(block, ttl_column, where_result_column); +} + +void TTLBlockInputStream::removeRowsWithExpiredTTL(Block & block, ColumnPtr ttl_column, ColumnPtr where_column) +{ + MutableColumns result_columns; const auto & column_names = header.getNames(); - if (!aggregator) + result_columns.reserve(column_names.size()); + for (auto it = column_names.begin(); it != column_names.end(); ++it) { - MutableColumns result_columns; - result_columns.reserve(column_names.size()); - for (auto it = column_names.begin(); it != column_names.end(); ++it) - { - const IColumn * values_column = block.getByName(*it).column.get(); - MutableColumnPtr result_column = values_column->cloneEmpty(); - result_column->reserve(block.rows()); + const IColumn * values_column = block.getByName(*it).column.get(); + MutableColumnPtr result_column = values_column->cloneEmpty(); + result_column->reserve(block.rows()); - for (size_t i = 0; i < block.rows(); ++i) - { - UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i); - bool where_filter_passed = !where_result_column || where_result_column->getBool(i); - if (!isTTLExpired(cur_ttl) || !where_filter_passed) - { - new_ttl_infos.table_ttl.update(cur_ttl); - result_column->insertFrom(*values_column, i); - } - else if (it == column_names.begin()) - ++rows_removed; - } - result_columns.emplace_back(std::move(result_column)); - } - block = header.cloneWithColumns(std::move(result_columns)); - } - else - { - MutableColumns result_columns = header.cloneEmptyColumns(); - MutableColumns aggregate_columns = header.cloneEmptyColumns(); - - size_t rows_aggregated = 0; - size_t current_key_start = 0; - size_t rows_with_current_key = 0; - auto storage_rows_ttl = metadata_snapshot->getRowsTTL(); for (size_t i = 0; i < block.rows(); ++i) { UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i); - bool where_filter_passed = !where_result_column || where_result_column->getBool(i); - bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed; - - bool same_as_current = true; - for (size_t j = 0; j < storage_rows_ttl.group_by_keys.size(); ++j) - { - const String & key_column = storage_rows_ttl.group_by_keys[j]; - const IColumn * values_column = block.getByName(key_column).column.get(); - if (!same_as_current || (*values_column)[i] != current_key_value[j]) - { - values_column->get(i, current_key_value[j]); - same_as_current = false; - } - } - - if (!same_as_current) - { - if (rows_with_current_key) - calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key); - finalizeAggregates(result_columns); - - current_key_start = rows_aggregated; - rows_with_current_key = 0; - } - - if (ttl_expired) - { - ++rows_with_current_key; - ++rows_aggregated; - for (const auto & name : column_names) - { - const IColumn * values_column = block.getByName(name).column.get(); - auto & column = aggregate_columns[header.getPositionByName(name)]; - column->insertFrom(*values_column, i); - } - } - else + bool where_filter_passed = !where_column || where_column->getBool(i); + if (!isTTLExpired(cur_ttl) || !where_filter_passed) { new_ttl_infos.table_ttl.update(cur_ttl); - for (const auto & name : column_names) - { - const IColumn * values_column = block.getByName(name).column.get(); - auto & column = result_columns[header.getPositionByName(name)]; - column->insertFrom(*values_column, i); - } + result_column->insertFrom(*values_column, i); + } + else if (it == column_names.begin()) + ++rows_removed; + } + + result_columns.emplace_back(std::move(result_column)); + } + + block = header.cloneWithColumns(std::move(result_columns)); +} + +void TTLBlockInputStream::aggregateRowsWithExpiredTTL(Block & block, ColumnPtr ttl_column, ColumnPtr where_column) +{ + const auto & column_names = header.getNames(); + MutableColumns result_columns = header.cloneEmptyColumns(); + MutableColumns aggregate_columns = header.cloneEmptyColumns(); + + size_t rows_aggregated = 0; + size_t current_key_start = 0; + size_t rows_with_current_key = 0; + auto storage_rows_ttl = metadata_snapshot->getRowsTTL(); + + for (size_t i = 0; i < block.rows(); ++i) + { + UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i); + bool where_filter_passed = !where_column || where_column->getBool(i); + bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed; + + bool same_as_current = true; + for (size_t j = 0; j < storage_rows_ttl.group_by_keys.size(); ++j) + { + const String & key_column = storage_rows_ttl.group_by_keys[j]; + const IColumn * values_column = block.getByName(key_column).column.get(); + if (!same_as_current || (*values_column)[i] != current_key_value[j]) + { + values_column->get(i, current_key_value[j]); + same_as_current = false; } } - if (rows_with_current_key) - calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key); + if (!same_as_current) + { + if (rows_with_current_key) + calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key); + finalizeAggregates(result_columns); - block = header.cloneWithColumns(std::move(result_columns)); + current_key_start = rows_aggregated; + rows_with_current_key = 0; + } + + if (ttl_expired) + { + ++rows_with_current_key; + ++rows_aggregated; + for (const auto & name : column_names) + { + const IColumn * values_column = block.getByName(name).column.get(); + auto & column = aggregate_columns[header.getPositionByName(name)]; + column->insertFrom(*values_column, i); + } + } + else + { + new_ttl_infos.table_ttl.update(cur_ttl); + for (const auto & name : column_names) + { + const IColumn * values_column = block.getByName(name).column.get(); + auto & column = result_columns[header.getPositionByName(name)]; + column->insertFrom(*values_column, i); + } + } } + + if (rows_with_current_key) + calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key); + + block = header.cloneWithColumns(std::move(result_columns)); } void TTLBlockInputStream::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length) @@ -294,12 +303,14 @@ void TTLBlockInputStream::finalizeAggregates(MutableColumns & result_columns) { for (const auto & it : storage_rows_ttl.set_parts) it.expression->execute(agg_block); + for (const auto & name : storage_rows_ttl.group_by_keys) { const IColumn * values_column = agg_block.getByName(name).column.get(); auto & result_column = result_columns[header.getPositionByName(name)]; result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); } + for (const auto & it : storage_rows_ttl.set_parts) { const IColumn * values_column = agg_block.getByName(it.expression_result_column_name).column.get(); @@ -308,6 +319,7 @@ void TTLBlockInputStream::finalizeAggregates(MutableColumns & result_columns) } } } + agg_result.invalidate(); } diff --git a/src/DataStreams/TTLBlockInputStream.h b/src/DataStreams/TTLBlockInputStream.h index 1d3b69f61c5..bbe1f8782a4 100644 --- a/src/DataStreams/TTLBlockInputStream.h +++ b/src/DataStreams/TTLBlockInputStream.h @@ -67,8 +67,13 @@ private: /// Removes values with expired ttl and computes new_ttl_infos and empty_columns for part void removeValuesWithExpiredColumnTTL(Block & block); + void executeRowsTTL(Block & block); + /// Removes rows with expired table ttl and computes new ttl_infos for part - void removeRowsWithExpiredTableTTL(Block & block); + void removeRowsWithExpiredTTL(Block & block, ColumnPtr ttl_column, ColumnPtr where_column); + + /// Aggregates rows with expired table ttl and computes new ttl_infos for part + void aggregateRowsWithExpiredTTL(Block & block, ColumnPtr ttl_column, ColumnPtr where_column); // Calculate aggregates of aggregate_columns into agg_result void calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length); diff --git a/src/Storages/TTLDescription.cpp b/src/Storages/TTLDescription.cpp index 7499f1de292..e412653a972 100644 --- a/src/Storages/TTLDescription.cpp +++ b/src/Storages/TTLDescription.cpp @@ -184,11 +184,8 @@ TTLDescription TTLDescription::getTTLFromAST( if (ttl_element->group_by_key.size() > pk_columns.size()) throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); - NameSet primary_key_columns_set(pk_columns.begin(), pk_columns.end()); NameSet aggregation_columns_set; - - for (const auto & column : primary_key.expression->getRequiredColumns()) - primary_key_columns_set.insert(column); + NameSet used_primary_key_columns_set; for (size_t i = 0; i < ttl_element->group_by_key.size(); ++i) { @@ -196,6 +193,8 @@ TTLDescription TTLDescription::getTTLFromAST( throw Exception( "TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); + + used_primary_key_columns_set.insert(pk_columns[i]); } for (const auto & [name, _] : ttl_element->group_by_aggregations) @@ -209,9 +208,17 @@ TTLDescription TTLDescription::getTTLFromAST( result.group_by_keys = Names(pk_columns.begin(), pk_columns.begin() + ttl_element->group_by_key.size()); auto aggregations = ttl_element->group_by_aggregations; + const auto & primary_key_expressions = primary_key.expression_list_ast->children; + for (size_t i = ttl_element->group_by_key.size(); i < primary_key_expressions.size(); ++i) + { + ASTPtr expr = makeASTFunction("any", primary_key_expressions[i]->clone()); + aggregations.emplace_back(pk_columns[i], std::move(expr)); + aggregation_columns_set.insert(pk_columns[i]); + } + for (const auto & column : columns.getOrdinary()) { - if (!aggregation_columns_set.count(column.name)) + if (!aggregation_columns_set.count(column.name) && !used_primary_key_columns_set.count(column.name)) { ASTPtr expr = makeASTFunction("any", std::make_shared(column.name)); aggregations.emplace_back(column.name, std::move(expr)); diff --git a/tests/queries/0_stateless/01280_ttl_where_group_by.reference b/tests/queries/0_stateless/01280_ttl_where_group_by.reference index ad20d38f2e6..7fe00709dee 100644 --- a/tests/queries/0_stateless/01280_ttl_where_group_by.reference +++ b/tests/queries/0_stateless/01280_ttl_where_group_by.reference @@ -1,20 +1,26 @@ +ttl_01280_1 1 1 0 4 1 2 3 7 1 3 0 5 2 1 0 1 2 1 20 1 +ttl_01280_2 1 1 [0,2,3] 4 1 1 [5,4,1] 13 1 3 [1,0,1,0] 17 2 1 [3,1,0,3] 8 3 1 [2,4,5] 8 +ttl_01280_3 1 1 0 4 -1 3 10 6 +1 1 10 6 2 1 0 3 -3 5 8 2 +3 1 8 2 +ttl_01280_4 1 1 0 4 -3 3 13 9 +10 2 13 9 +ttl_01280_5 1 2 7 5 2 3 6 5 -1 2 3 5 -2 3 3 5 +ttl_01280_6 +1 5 3 5 +2 10 3 5 diff --git a/tests/queries/0_stateless/01280_ttl_where_group_by.sh b/tests/queries/0_stateless/01280_ttl_where_group_by.sh index 9b05606f928..531f2951d36 100755 --- a/tests/queries/0_stateless/01280_ttl_where_group_by.sh +++ b/tests/queries/0_stateless/01280_ttl_where_group_by.sh @@ -13,6 +13,7 @@ function optimize() done } +echo "ttl_01280_1" $CLICKHOUSE_CLIENT -n --query " create table ttl_01280_1 (a Int, b Int, x Int, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second delete where x % 10 == 0 and y > 5; insert into ttl_01280_1 values (1, 1, 0, 4, now() + 10); @@ -29,6 +30,7 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_1 ORDER BY a, b, x, $CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_2" +echo "ttl_01280_2" $CLICKHOUSE_CLIENT -n --query " create table ttl_01280_2 (a Int, b Int, x Array(Int32), y Double, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set x = minForEach(x), y = sum(y), d = max(d); insert into ttl_01280_2 values (1, 1, array(0, 2, 3), 4, now() + 10); @@ -47,6 +49,7 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_2 ORDER BY a, b, x, $CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_3" +echo "ttl_01280_3" $CLICKHOUSE_CLIENT -n --query " create table ttl_01280_3 (a Int, b Int, x Int64, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set x = argMax(x, d), y = argMax(y, d), d = max(d); insert into ttl_01280_3 values (1, 1, 0, 4, now() + 10); @@ -65,6 +68,7 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_3 ORDER BY a, b, x, $CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_4" +echo "ttl_01280_4" $CLICKHOUSE_CLIENT -n --query " create table ttl_01280_4 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), -(a + b)) ttl d + interval 1 second group by toDate(d) set x = sum(x), y = max(y); insert into ttl_01280_4 values (1, 1, 0, 4, now() + 10); @@ -79,7 +83,8 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_4 ORDER BY a, b, x, $CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_5" -$CLICKHOUSE_CLIENT -n --query "create table ttl_01280_5 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a set x = sum(x); +echo "ttl_01280_5" +$CLICKHOUSE_CLIENT -n --query "create table ttl_01280_5 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a set x = sum(x), b = argMax(b, -b); insert into ttl_01280_5 values (1, 2, 3, 5, now()); insert into ttl_01280_5 values (2, 10, 1, 5, now()); insert into ttl_01280_5 values (2, 3, 5, 5, now()); @@ -91,6 +96,7 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_5 ORDER BY a, b, x, $CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_6" +echo "ttl_01280_6" $CLICKHOUSE_CLIENT -n --query " create table ttl_01280_6 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a; insert into ttl_01280_6 values (1, 2, 3, 5, now()); diff --git a/tests/queries/0_stateless/01280_ttl_where_group_by_negative.sql b/tests/queries/0_stateless/01280_ttl_where_group_by_negative.sql index f2c26a3d495..b273e065bcc 100644 --- a/tests/queries/0_stateless/01280_ttl_where_group_by_negative.sql +++ b/tests/queries/0_stateless/01280_ttl_where_group_by_negative.sql @@ -1,7 +1,4 @@ create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by x set y = max(y); -- { serverError 450} create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by b set y = max(y); -- { serverError 450} create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b, x set y = max(y); -- { serverError 450} -create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set b = min(b), y = max(y); -- { serverError 450} create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set y = max(y), y = max(y); -- { serverError 450} -create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a) ttl d + interval 1 second group by toDate(d), a set d = min(d), b = max(b); -- { serverError 450} -create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (d, -(a + b)) ttl d + interval 1 second group by d, -(a + b) set a = sum(a), b = min(b); -- { serverError 450}