mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
Fix TTL with GROUP BY and fix test according to new logic
This commit is contained in:
parent
8ea4c2e26f
commit
0d79474acc
@ -88,7 +88,6 @@ TTLBlockInputStream::TTLBlockInputStream(
|
||||
descr.arguments.push_back(header.getPositionByName(name));
|
||||
|
||||
agg_aggregate_columns.resize(storage_rows_ttl.aggregate_descriptions.size());
|
||||
|
||||
const Settings & settings = storage.global_context.getSettingsRef();
|
||||
|
||||
Aggregator::Params params(header, keys, aggregates,
|
||||
@ -108,14 +107,15 @@ Block TTLBlockInputStream::readImpl()
|
||||
{
|
||||
/// Skip all data if table ttl is expired for part
|
||||
auto storage_rows_ttl = metadata_snapshot->getRowsTTL();
|
||||
if (metadata_snapshot->hasRowsTTL() && !storage_rows_ttl.where_expression && storage_rows_ttl.mode != TTLMode::GROUP_BY
|
||||
if (metadata_snapshot->hasRowsTTL()
|
||||
&& !storage_rows_ttl.where_expression
|
||||
&& storage_rows_ttl.mode != TTLMode::GROUP_BY
|
||||
&& isTTLExpired(old_ttl_infos.table_ttl.max))
|
||||
{
|
||||
rows_removed = data_part->rows_count;
|
||||
return {};
|
||||
}
|
||||
|
||||
|
||||
Block block = children.at(0)->read();
|
||||
if (!block)
|
||||
{
|
||||
@ -130,10 +130,9 @@ Block TTLBlockInputStream::readImpl()
|
||||
}
|
||||
|
||||
if (metadata_snapshot->hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
|
||||
removeRowsWithExpiredTableTTL(block);
|
||||
executeRowsTTL(block);
|
||||
|
||||
removeValuesWithExpiredColumnTTL(block);
|
||||
|
||||
updateMovesTTL(block);
|
||||
updateRecompressionTTL(block);
|
||||
|
||||
@ -167,107 +166,117 @@ static ColumnPtr extractRequieredColumn(const ExpressionActions & expression, co
|
||||
return block_copy.getByName(result_column).column;
|
||||
}
|
||||
|
||||
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
|
||||
void TTLBlockInputStream::executeRowsTTL(Block & block)
|
||||
{
|
||||
auto rows_ttl = metadata_snapshot->getRowsTTL();
|
||||
auto ttl_column = extractRequieredColumn(*rows_ttl.expression, block, rows_ttl.result_column);
|
||||
|
||||
auto where_result_column = rows_ttl.where_expression ?
|
||||
extractRequieredColumn(*rows_ttl.where_expression, block, rows_ttl.where_result_column) : nullptr;
|
||||
extractRequieredColumn(*rows_ttl.where_expression, block, rows_ttl.where_result_column): nullptr;
|
||||
|
||||
if (aggregator)
|
||||
aggregateRowsWithExpiredTTL(block, ttl_column, where_result_column);
|
||||
else
|
||||
removeRowsWithExpiredTTL(block, ttl_column, where_result_column);
|
||||
}
|
||||
|
||||
void TTLBlockInputStream::removeRowsWithExpiredTTL(Block & block, ColumnPtr ttl_column, ColumnPtr where_column)
|
||||
{
|
||||
MutableColumns result_columns;
|
||||
const auto & column_names = header.getNames();
|
||||
|
||||
if (!aggregator)
|
||||
result_columns.reserve(column_names.size());
|
||||
for (auto it = column_names.begin(); it != column_names.end(); ++it)
|
||||
{
|
||||
MutableColumns result_columns;
|
||||
result_columns.reserve(column_names.size());
|
||||
for (auto it = column_names.begin(); it != column_names.end(); ++it)
|
||||
{
|
||||
const IColumn * values_column = block.getByName(*it).column.get();
|
||||
MutableColumnPtr result_column = values_column->cloneEmpty();
|
||||
result_column->reserve(block.rows());
|
||||
const IColumn * values_column = block.getByName(*it).column.get();
|
||||
MutableColumnPtr result_column = values_column->cloneEmpty();
|
||||
result_column->reserve(block.rows());
|
||||
|
||||
for (size_t i = 0; i < block.rows(); ++i)
|
||||
{
|
||||
UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
|
||||
bool where_filter_passed = !where_result_column || where_result_column->getBool(i);
|
||||
if (!isTTLExpired(cur_ttl) || !where_filter_passed)
|
||||
{
|
||||
new_ttl_infos.table_ttl.update(cur_ttl);
|
||||
result_column->insertFrom(*values_column, i);
|
||||
}
|
||||
else if (it == column_names.begin())
|
||||
++rows_removed;
|
||||
}
|
||||
result_columns.emplace_back(std::move(result_column));
|
||||
}
|
||||
block = header.cloneWithColumns(std::move(result_columns));
|
||||
}
|
||||
else
|
||||
{
|
||||
MutableColumns result_columns = header.cloneEmptyColumns();
|
||||
MutableColumns aggregate_columns = header.cloneEmptyColumns();
|
||||
|
||||
size_t rows_aggregated = 0;
|
||||
size_t current_key_start = 0;
|
||||
size_t rows_with_current_key = 0;
|
||||
auto storage_rows_ttl = metadata_snapshot->getRowsTTL();
|
||||
for (size_t i = 0; i < block.rows(); ++i)
|
||||
{
|
||||
UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
|
||||
bool where_filter_passed = !where_result_column || where_result_column->getBool(i);
|
||||
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
|
||||
|
||||
bool same_as_current = true;
|
||||
for (size_t j = 0; j < storage_rows_ttl.group_by_keys.size(); ++j)
|
||||
{
|
||||
const String & key_column = storage_rows_ttl.group_by_keys[j];
|
||||
const IColumn * values_column = block.getByName(key_column).column.get();
|
||||
if (!same_as_current || (*values_column)[i] != current_key_value[j])
|
||||
{
|
||||
values_column->get(i, current_key_value[j]);
|
||||
same_as_current = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!same_as_current)
|
||||
{
|
||||
if (rows_with_current_key)
|
||||
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
|
||||
finalizeAggregates(result_columns);
|
||||
|
||||
current_key_start = rows_aggregated;
|
||||
rows_with_current_key = 0;
|
||||
}
|
||||
|
||||
if (ttl_expired)
|
||||
{
|
||||
++rows_with_current_key;
|
||||
++rows_aggregated;
|
||||
for (const auto & name : column_names)
|
||||
{
|
||||
const IColumn * values_column = block.getByName(name).column.get();
|
||||
auto & column = aggregate_columns[header.getPositionByName(name)];
|
||||
column->insertFrom(*values_column, i);
|
||||
}
|
||||
}
|
||||
else
|
||||
bool where_filter_passed = !where_column || where_column->getBool(i);
|
||||
if (!isTTLExpired(cur_ttl) || !where_filter_passed)
|
||||
{
|
||||
new_ttl_infos.table_ttl.update(cur_ttl);
|
||||
for (const auto & name : column_names)
|
||||
{
|
||||
const IColumn * values_column = block.getByName(name).column.get();
|
||||
auto & column = result_columns[header.getPositionByName(name)];
|
||||
column->insertFrom(*values_column, i);
|
||||
}
|
||||
result_column->insertFrom(*values_column, i);
|
||||
}
|
||||
else if (it == column_names.begin())
|
||||
++rows_removed;
|
||||
}
|
||||
|
||||
result_columns.emplace_back(std::move(result_column));
|
||||
}
|
||||
|
||||
block = header.cloneWithColumns(std::move(result_columns));
|
||||
}
|
||||
|
||||
void TTLBlockInputStream::aggregateRowsWithExpiredTTL(Block & block, ColumnPtr ttl_column, ColumnPtr where_column)
|
||||
{
|
||||
const auto & column_names = header.getNames();
|
||||
MutableColumns result_columns = header.cloneEmptyColumns();
|
||||
MutableColumns aggregate_columns = header.cloneEmptyColumns();
|
||||
|
||||
size_t rows_aggregated = 0;
|
||||
size_t current_key_start = 0;
|
||||
size_t rows_with_current_key = 0;
|
||||
auto storage_rows_ttl = metadata_snapshot->getRowsTTL();
|
||||
|
||||
for (size_t i = 0; i < block.rows(); ++i)
|
||||
{
|
||||
UInt32 cur_ttl = getTimestampByIndex(ttl_column.get(), i);
|
||||
bool where_filter_passed = !where_column || where_column->getBool(i);
|
||||
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
|
||||
|
||||
bool same_as_current = true;
|
||||
for (size_t j = 0; j < storage_rows_ttl.group_by_keys.size(); ++j)
|
||||
{
|
||||
const String & key_column = storage_rows_ttl.group_by_keys[j];
|
||||
const IColumn * values_column = block.getByName(key_column).column.get();
|
||||
if (!same_as_current || (*values_column)[i] != current_key_value[j])
|
||||
{
|
||||
values_column->get(i, current_key_value[j]);
|
||||
same_as_current = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (rows_with_current_key)
|
||||
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
|
||||
if (!same_as_current)
|
||||
{
|
||||
if (rows_with_current_key)
|
||||
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
|
||||
finalizeAggregates(result_columns);
|
||||
|
||||
block = header.cloneWithColumns(std::move(result_columns));
|
||||
current_key_start = rows_aggregated;
|
||||
rows_with_current_key = 0;
|
||||
}
|
||||
|
||||
if (ttl_expired)
|
||||
{
|
||||
++rows_with_current_key;
|
||||
++rows_aggregated;
|
||||
for (const auto & name : column_names)
|
||||
{
|
||||
const IColumn * values_column = block.getByName(name).column.get();
|
||||
auto & column = aggregate_columns[header.getPositionByName(name)];
|
||||
column->insertFrom(*values_column, i);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
new_ttl_infos.table_ttl.update(cur_ttl);
|
||||
for (const auto & name : column_names)
|
||||
{
|
||||
const IColumn * values_column = block.getByName(name).column.get();
|
||||
auto & column = result_columns[header.getPositionByName(name)];
|
||||
column->insertFrom(*values_column, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (rows_with_current_key)
|
||||
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
|
||||
|
||||
block = header.cloneWithColumns(std::move(result_columns));
|
||||
}
|
||||
|
||||
void TTLBlockInputStream::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length)
|
||||
@ -294,12 +303,14 @@ void TTLBlockInputStream::finalizeAggregates(MutableColumns & result_columns)
|
||||
{
|
||||
for (const auto & it : storage_rows_ttl.set_parts)
|
||||
it.expression->execute(agg_block);
|
||||
|
||||
for (const auto & name : storage_rows_ttl.group_by_keys)
|
||||
{
|
||||
const IColumn * values_column = agg_block.getByName(name).column.get();
|
||||
auto & result_column = result_columns[header.getPositionByName(name)];
|
||||
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
|
||||
}
|
||||
|
||||
for (const auto & it : storage_rows_ttl.set_parts)
|
||||
{
|
||||
const IColumn * values_column = agg_block.getByName(it.expression_result_column_name).column.get();
|
||||
@ -308,6 +319,7 @@ void TTLBlockInputStream::finalizeAggregates(MutableColumns & result_columns)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
agg_result.invalidate();
|
||||
}
|
||||
|
||||
|
@ -67,8 +67,13 @@ private:
|
||||
/// Removes values with expired ttl and computes new_ttl_infos and empty_columns for part
|
||||
void removeValuesWithExpiredColumnTTL(Block & block);
|
||||
|
||||
void executeRowsTTL(Block & block);
|
||||
|
||||
/// Removes rows with expired table ttl and computes new ttl_infos for part
|
||||
void removeRowsWithExpiredTableTTL(Block & block);
|
||||
void removeRowsWithExpiredTTL(Block & block, ColumnPtr ttl_column, ColumnPtr where_column);
|
||||
|
||||
/// Aggregates rows with expired table ttl and computes new ttl_infos for part
|
||||
void aggregateRowsWithExpiredTTL(Block & block, ColumnPtr ttl_column, ColumnPtr where_column);
|
||||
|
||||
// Calculate aggregates of aggregate_columns into agg_result
|
||||
void calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length);
|
||||
|
@ -184,11 +184,8 @@ TTLDescription TTLDescription::getTTLFromAST(
|
||||
if (ttl_element->group_by_key.size() > pk_columns.size())
|
||||
throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
|
||||
NameSet primary_key_columns_set(pk_columns.begin(), pk_columns.end());
|
||||
NameSet aggregation_columns_set;
|
||||
|
||||
for (const auto & column : primary_key.expression->getRequiredColumns())
|
||||
primary_key_columns_set.insert(column);
|
||||
NameSet used_primary_key_columns_set;
|
||||
|
||||
for (size_t i = 0; i < ttl_element->group_by_key.size(); ++i)
|
||||
{
|
||||
@ -196,6 +193,8 @@ TTLDescription TTLDescription::getTTLFromAST(
|
||||
throw Exception(
|
||||
"TTL Expression GROUP BY key should be a prefix of primary key",
|
||||
ErrorCodes::BAD_TTL_EXPRESSION);
|
||||
|
||||
used_primary_key_columns_set.insert(pk_columns[i]);
|
||||
}
|
||||
|
||||
for (const auto & [name, _] : ttl_element->group_by_aggregations)
|
||||
@ -209,9 +208,17 @@ TTLDescription TTLDescription::getTTLFromAST(
|
||||
result.group_by_keys = Names(pk_columns.begin(), pk_columns.begin() + ttl_element->group_by_key.size());
|
||||
auto aggregations = ttl_element->group_by_aggregations;
|
||||
|
||||
const auto & primary_key_expressions = primary_key.expression_list_ast->children;
|
||||
for (size_t i = ttl_element->group_by_key.size(); i < primary_key_expressions.size(); ++i)
|
||||
{
|
||||
ASTPtr expr = makeASTFunction("any", primary_key_expressions[i]->clone());
|
||||
aggregations.emplace_back(pk_columns[i], std::move(expr));
|
||||
aggregation_columns_set.insert(pk_columns[i]);
|
||||
}
|
||||
|
||||
for (const auto & column : columns.getOrdinary())
|
||||
{
|
||||
if (!aggregation_columns_set.count(column.name))
|
||||
if (!aggregation_columns_set.count(column.name) && !used_primary_key_columns_set.count(column.name))
|
||||
{
|
||||
ASTPtr expr = makeASTFunction("any", std::make_shared<ASTIdentifier>(column.name));
|
||||
aggregations.emplace_back(column.name, std::move(expr));
|
||||
|
@ -1,20 +1,26 @@
|
||||
ttl_01280_1
|
||||
1 1 0 4
|
||||
1 2 3 7
|
||||
1 3 0 5
|
||||
2 1 0 1
|
||||
2 1 20 1
|
||||
ttl_01280_2
|
||||
1 1 [0,2,3] 4
|
||||
1 1 [5,4,1] 13
|
||||
1 3 [1,0,1,0] 17
|
||||
2 1 [3,1,0,3] 8
|
||||
3 1 [2,4,5] 8
|
||||
ttl_01280_3
|
||||
1 1 0 4
|
||||
1 3 10 6
|
||||
1 1 10 6
|
||||
2 1 0 3
|
||||
3 5 8 2
|
||||
3 1 8 2
|
||||
ttl_01280_4
|
||||
1 1 0 4
|
||||
3 3 13 9
|
||||
10 2 13 9
|
||||
ttl_01280_5
|
||||
1 2 7 5
|
||||
2 3 6 5
|
||||
1 2 3 5
|
||||
2 3 3 5
|
||||
ttl_01280_6
|
||||
1 5 3 5
|
||||
2 10 3 5
|
||||
|
@ -13,6 +13,7 @@ function optimize()
|
||||
done
|
||||
}
|
||||
|
||||
echo "ttl_01280_1"
|
||||
$CLICKHOUSE_CLIENT -n --query "
|
||||
create table ttl_01280_1 (a Int, b Int, x Int, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second delete where x % 10 == 0 and y > 5;
|
||||
insert into ttl_01280_1 values (1, 1, 0, 4, now() + 10);
|
||||
@ -29,6 +30,7 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_1 ORDER BY a, b, x,
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_2"
|
||||
|
||||
echo "ttl_01280_2"
|
||||
$CLICKHOUSE_CLIENT -n --query "
|
||||
create table ttl_01280_2 (a Int, b Int, x Array(Int32), y Double, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set x = minForEach(x), y = sum(y), d = max(d);
|
||||
insert into ttl_01280_2 values (1, 1, array(0, 2, 3), 4, now() + 10);
|
||||
@ -47,6 +49,7 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_2 ORDER BY a, b, x,
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_3"
|
||||
|
||||
echo "ttl_01280_3"
|
||||
$CLICKHOUSE_CLIENT -n --query "
|
||||
create table ttl_01280_3 (a Int, b Int, x Int64, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set x = argMax(x, d), y = argMax(y, d), d = max(d);
|
||||
insert into ttl_01280_3 values (1, 1, 0, 4, now() + 10);
|
||||
@ -65,6 +68,7 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_3 ORDER BY a, b, x,
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_4"
|
||||
|
||||
echo "ttl_01280_4"
|
||||
$CLICKHOUSE_CLIENT -n --query "
|
||||
create table ttl_01280_4 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), -(a + b)) ttl d + interval 1 second group by toDate(d) set x = sum(x), y = max(y);
|
||||
insert into ttl_01280_4 values (1, 1, 0, 4, now() + 10);
|
||||
@ -79,7 +83,8 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_4 ORDER BY a, b, x,
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_5"
|
||||
|
||||
$CLICKHOUSE_CLIENT -n --query "create table ttl_01280_5 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a set x = sum(x);
|
||||
echo "ttl_01280_5"
|
||||
$CLICKHOUSE_CLIENT -n --query "create table ttl_01280_5 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a set x = sum(x), b = argMax(b, -b);
|
||||
insert into ttl_01280_5 values (1, 2, 3, 5, now());
|
||||
insert into ttl_01280_5 values (2, 10, 1, 5, now());
|
||||
insert into ttl_01280_5 values (2, 3, 5, 5, now());
|
||||
@ -91,6 +96,7 @@ $CLICKHOUSE_CLIENT --query "select a, b, x, y from ttl_01280_5 ORDER BY a, b, x,
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "drop table if exists ttl_01280_6"
|
||||
|
||||
echo "ttl_01280_6"
|
||||
$CLICKHOUSE_CLIENT -n --query "
|
||||
create table ttl_01280_6 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a;
|
||||
insert into ttl_01280_6 values (1, 2, 3, 5, now());
|
||||
|
@ -1,7 +1,4 @@
|
||||
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by x set y = max(y); -- { serverError 450}
|
||||
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by b set y = max(y); -- { serverError 450}
|
||||
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b, x set y = max(y); -- { serverError 450}
|
||||
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set b = min(b), y = max(y); -- { serverError 450}
|
||||
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set y = max(y), y = max(y); -- { serverError 450}
|
||||
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a) ttl d + interval 1 second group by toDate(d), a set d = min(d), b = max(b); -- { serverError 450}
|
||||
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (d, -(a + b)) ttl d + interval 1 second group by d, -(a + b) set a = sum(a), b = min(b); -- { serverError 450}
|
||||
|
Loading…
Reference in New Issue
Block a user