allow multiple rows TTL with WHERE expression

This commit is contained in:
Anton Popov 2021-01-12 02:07:21 +03:00
parent a8f1786d95
commit 5822ee1f01
12 changed files with 120 additions and 55 deletions

View File

@ -44,6 +44,10 @@ TTLBlockInputStream::TTLBlockInputStream(
algorithms.emplace_back(std::move(algorithm));
}
for (const auto & where_ttl : metadata_snapshot_->getRowsWhereTTL())
algorithms.emplace_back(std::make_unique<TTLDeleteAlgorithm>(
where_ttl, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_));
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
algorithms.emplace_back(std::make_unique<TTLAggregationAlgorithm>(
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, header, storage_));

View File

@ -51,7 +51,11 @@ void TTLDeleteAlgorithm::execute(Block & block)
void TTLDeleteAlgorithm::finalize(const MutableDataPartPtr & data_part) const
{
if (description.where_expression)
data_part->ttl_infos.rows_where_ttl[description.result_column] = new_ttl_info;
else
data_part->ttl_infos.table_ttl = new_ttl_info;
data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max);
}

View File

@ -1142,6 +1142,12 @@ bool IMergeTreeDataPart::checkAllTTLCalculated(const StorageMetadataPtr & metada
return false;
}
for (const auto & rows_where_desc : metadata_snapshot->getRowsWhereTTL())
{
if (!ttl_infos.rows_where_ttl.count(rows_where_desc.result_column))
return false;
}
return true;
}

View File

@ -17,6 +17,12 @@ void MergeTreeDataPartTTLInfos::update(const MergeTreeDataPartTTLInfos & other_i
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
for (const auto & [name, ttl_info] : other_infos.rows_where_ttl)
{
rows_where_ttl[name].update(ttl_info);
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
for (const auto & [name, ttl_info] : other_infos.group_by_ttl)
{
group_by_ttl[name].update(ttl_info);
@ -91,6 +97,11 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
const JSON & group_by = json["group_by"];
fill_ttl_info_map(group_by, group_by_ttl);
}
if (json.has("rows_where"))
{
const JSON & rows_where = json["rows_where"];
fill_ttl_info_map(rows_where, rows_where_ttl);
}
}
@ -127,61 +138,41 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
writeString("}", out);
}
auto write_info_for_expression = [&out](const auto & name, const auto & info)
auto write_infos = [&out](const auto & infos, const auto & type, bool is_first)
{
if (!is_first)
writeString(",", out);
writeString(type, out);
writeString(R"(:[)", out);
for (auto it = infos.begin(); it != infos.end(); ++it)
{
if (it != infos.begin())
writeString(",", out);
writeString(R"({"expression":)", out);
writeString(doubleQuoteString(name), out);
writeString(doubleQuoteString(it->first), out);
writeString(R"(,"min":)", out);
writeIntText(info.min, out);
writeIntText(it->second.min, out);
writeString(R"(,"max":)", out);
writeIntText(info.max, out);
writeIntText(it->second.max, out);
writeString("}", out);
}
writeString("]", out);
};
if (!moves_ttl.empty())
{
if (!columns_ttl.empty() || table_ttl.min)
writeString(",", out);
writeString(R"("moves":[)", out);
for (auto it = moves_ttl.begin(); it != moves_ttl.end(); ++it)
{
if (it != moves_ttl.begin())
writeString(",", out);
bool is_first = columns_ttl.empty() && !table_ttl.min;
write_infos(moves_ttl, "moves", is_first);
write_info_for_expression(it->first, it->second);
}
writeString("]", out);
}
if (!recompression_ttl.empty())
{
if (!moves_ttl.empty() || !columns_ttl.empty() || table_ttl.min)
writeString(",", out);
is_first &= moves_ttl.empty();
write_infos(recompression_ttl, "recompression", is_first);
writeString(R"("recompression":[)", out);
for (auto it = recompression_ttl.begin(); it != recompression_ttl.end(); ++it)
{
if (it != recompression_ttl.begin())
writeString(",", out);
is_first &= recompression_ttl.empty();
write_infos(group_by_ttl, "group_by", is_first);
write_info_for_expression(it->first, it->second);
}
writeString("]", out);
}
if (!group_by_ttl.empty())
{
if (!moves_ttl.empty() || !columns_ttl.empty() || !recompression_ttl.empty() || table_ttl.min)
writeString(",", out);
is_first &= group_by_ttl.empty();
write_infos(rows_where_ttl, "rows_where", is_first);
writeString(R"("group_by":[)", out);
for (auto it = group_by_ttl.begin(); it != group_by_ttl.end(); ++it)
{
if (it != group_by_ttl.begin())
writeString(",", out);
write_info_for_expression(it->first, it->second);
}
writeString("]", out);
}
writeString("}", out);
}

View File

@ -45,6 +45,8 @@ struct MergeTreeDataPartTTLInfos
time_t part_min_ttl = 0;
time_t part_max_ttl = 0;
TTLInfoMap rows_where_ttl;
TTLInfoMap moves_ttl;
TTLInfoMap recompression_ttl;

View File

@ -379,6 +379,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
for (const auto & ttl_entry : metadata_snapshot->getGroupByTTLs())
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.group_by_ttl[ttl_entry.result_column], block, true);
for (const auto & ttl_entry : metadata_snapshot->getRowsWhereTTL())
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.rows_where_ttl[ttl_entry.result_column], block, true);
for (const auto & [name, ttl_entry] : metadata_snapshot->getColumnTTLs())
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);

View File

@ -148,6 +148,16 @@ bool StorageInMemoryMetadata::hasRowsTTL() const
return table_ttl.rows_ttl.expression != nullptr;
}
TTLDescriptions StorageInMemoryMetadata::getRowsWhereTTL() const
{
return table_ttl.rows_where_ttl;
}
bool StorageInMemoryMetadata::hasRowsWhereTTL() const
{
return !table_ttl.rows_where_ttl.empty();
}
TTLDescriptions StorageInMemoryMetadata::getMoveTTLs() const
{
return table_ttl.move_ttl;

View File

@ -109,6 +109,9 @@ struct StorageInMemoryMetadata
TTLDescription getRowsTTL() const;
bool hasRowsTTL() const;
TTLDescriptions getRowsWhereTTL() const;
bool hasRowsWhereTTL() const;
/// Just wrapper for table TTLs, return moves (to disks or volumes) parts of
/// table TTL.
TTLDescriptions getMoveTTLs() const;

View File

@ -260,6 +260,7 @@ TTLDescription TTLDescription::getTTLFromAST(
TTLTableDescription::TTLTableDescription(const TTLTableDescription & other)
: definition_ast(other.definition_ast ? other.definition_ast->clone() : nullptr)
, rows_ttl(other.rows_ttl)
, rows_where_ttl(other.rows_where_ttl)
, move_ttl(other.move_ttl)
, recompression_ttl(other.recompression_ttl)
, group_by_ttl(other.group_by_ttl)
@ -277,6 +278,7 @@ TTLTableDescription & TTLTableDescription::operator=(const TTLTableDescription &
definition_ast.reset();
rows_ttl = other.rows_ttl;
rows_where_ttl = other.rows_where_ttl;
move_ttl = other.move_ttl;
recompression_ttl = other.recompression_ttl;
group_by_ttl = other.group_by_ttl;
@ -296,16 +298,24 @@ TTLTableDescription TTLTableDescription::getTTLForTableFromAST(
result.definition_ast = definition_ast->clone();
bool seen_delete_ttl = false;
bool have_unconditional_delete_ttl = false;
for (const auto & ttl_element_ptr : definition_ast->children)
{
auto ttl = TTLDescription::getTTLFromAST(ttl_element_ptr, columns, context, primary_key);
if (ttl.mode == TTLMode::DELETE)
{
if (seen_delete_ttl)
throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION);
if (!ttl.where_expression)
{
if (have_unconditional_delete_ttl)
throw Exception("More than one DELETE TTL expression without WHERE expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION);
have_unconditional_delete_ttl = true;
result.rows_ttl = ttl;
seen_delete_ttl = true;
}
else
{
result.rows_where_ttl.emplace_back(std::move(ttl));
}
}
else if (ttl.mode == TTLMode::RECOMPRESS)
{

View File

@ -99,9 +99,12 @@ struct TTLTableDescription
/// ^~~~~~~~~~~~~~~definition~~~~~~~~~~~~~~~^
ASTPtr definition_ast;
/// Rows removing TTL
/// Unconditional main removing rows TTL. Can be only one for table.
TTLDescription rows_ttl;
/// Conditional removing rows TTLs.
TTLDescriptions rows_where_ttl;
/// Moving data TTL (to other disks or volumes)
TTLDescriptions move_ttl;

View File

@ -0,0 +1,9 @@
1970-10-10 2
1970-10-10 5
1970-10-10 8
2000-10-10 1
2000-10-10 2
2000-10-10 4
2000-10-10 5
2000-10-10 7
2000-10-10 8

View File

@ -0,0 +1,20 @@
DROP TABLE IF EXISTS ttl_where;
CREATE TABLE ttl_where
(
`d` Date,
`i` UInt32
)
ENGINE = MergeTree
ORDER BY tuple()
TTL d + toIntervalYear(10) DELETE WHERE i % 3 = 0,
d + toIntervalYear(40) DELETE WHERE i % 3 = 1;
INSERT INTO ttl_where SELECT toDate('2000-10-10'), number FROM numbers(10);
INSERT INTO ttl_where SELECT toDate('1970-10-10'), number FROM numbers(10);
OPTIMIZE TABLE ttl_where FINAL;
SELECT * FROM ttl_where ORDER BY d, i;
DROP TABLE ttl_where;