Merge pull request #10537 from expl0si0nn/ttl_expr_data_rollup

Support of WHERE and GROUP BY in TTL expressions
This commit is contained in:
alexey-milovidov 2020-05-27 21:35:55 +03:00 committed by GitHub
commit dffeec8637
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 651 additions and 87 deletions

View File

@ -5,6 +5,8 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Columns/ColumnConst.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <Storages/MergeTree/TTLMode.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -67,6 +69,32 @@ TTLBlockInputStream::TTLBlockInputStream(
default_expr_list, storage.getColumns().getAllPhysical());
defaults_expression = ExpressionAnalyzer{default_expr_list, syntax_result, storage.global_context}.getActions(true);
}
if (storage.hasRowsTTL() && storage.rows_ttl_entry.mode == TTLMode::GROUP_BY)
{
current_key_value.resize(storage.rows_ttl_entry.group_by_keys.size());
ColumnNumbers keys;
for (const auto & key : storage.rows_ttl_entry.group_by_keys)
keys.push_back(header.getPositionByName(key));
agg_key_columns.resize(storage.rows_ttl_entry.group_by_keys.size());
AggregateDescriptions aggregates = storage.rows_ttl_entry.aggregate_descriptions;
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
descr.arguments.push_back(header.getPositionByName(name));
agg_aggregate_columns.resize(storage.rows_ttl_entry.aggregate_descriptions.size());
const Settings & settings = storage.global_context.getSettingsRef();
Aggregator::Params params(header, keys, aggregates,
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
storage.global_context.getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
aggregator = std::make_unique<Aggregator>(params);
}
}
bool TTLBlockInputStream::isTTLExpired(time_t ttl) const
@ -77,7 +105,8 @@ bool TTLBlockInputStream::isTTLExpired(time_t ttl) const
Block TTLBlockInputStream::readImpl()
{
/// Skip all data if table ttl is expired for part
if (storage.hasRowsTTL() && isTTLExpired(old_ttl_infos.table_ttl.max))
if (storage.hasRowsTTL() && !storage.rows_ttl_entry.where_expression &&
storage.rows_ttl_entry.mode != TTLMode::GROUP_BY && isTTLExpired(old_ttl_infos.table_ttl.max))
{
rows_removed = data_part->rows_count;
return {};
@ -85,7 +114,16 @@ Block TTLBlockInputStream::readImpl()
Block block = children.at(0)->read();
if (!block)
{
if (aggregator && !agg_result.empty())
{
MutableColumns result_columns = header.cloneEmptyColumns();
finalizeAggregates(result_columns);
block = header.cloneWithColumns(std::move(result_columns));
}
return block;
}
if (storage.hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
removeRowsWithExpiredTableTTL(block);
@ -114,35 +152,146 @@ void TTLBlockInputStream::readSuffixImpl()
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
{
storage.rows_ttl_entry.expression->execute(block);
if (storage.rows_ttl_entry.where_expression)
storage.rows_ttl_entry.where_expression->execute(block);
const IColumn * ttl_column =
block.getByName(storage.rows_ttl_entry.result_column).column.get();
const IColumn * where_result_column = storage.rows_ttl_entry.where_expression ?
block.getByName(storage.rows_ttl_entry.where_result_column).column.get() : nullptr;
const auto & column_names = header.getNames();
MutableColumns result_columns;
result_columns.reserve(column_names.size());
for (auto it = column_names.begin(); it != column_names.end(); ++it)
if (!aggregator)
{
const IColumn * values_column = block.getByName(*it).column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
MutableColumns result_columns;
result_columns.reserve(column_names.size());
for (auto it = column_names.begin(); it != column_names.end(); ++it)
{
const IColumn * values_column = block.getByName(*it).column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
bool where_filter_passed = !where_result_column || where_result_column->getBool(i);
if (!isTTLExpired(cur_ttl) || !where_filter_passed)
{
new_ttl_infos.table_ttl.update(cur_ttl);
result_column->insertFrom(*values_column, i);
}
else if (it == column_names.begin())
++rows_removed;
}
result_columns.emplace_back(std::move(result_column));
}
block = header.cloneWithColumns(std::move(result_columns));
}
else
{
MutableColumns result_columns = header.cloneEmptyColumns();
MutableColumns aggregate_columns = header.cloneEmptyColumns();
size_t rows_aggregated = 0;
size_t current_key_start = 0;
size_t rows_with_current_key = 0;
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
if (!isTTLExpired(cur_ttl))
bool where_filter_passed = !where_result_column || where_result_column->getBool(i);
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
bool same_as_current = true;
for (size_t j = 0; j < storage.rows_ttl_entry.group_by_keys.size(); ++j)
{
const String & key_column = storage.rows_ttl_entry.group_by_keys[j];
const IColumn * values_column = block.getByName(key_column).column.get();
if (!same_as_current || (*values_column)[i] != current_key_value[j])
{
values_column->get(i, current_key_value[j]);
same_as_current = false;
}
}
if (!same_as_current)
{
if (rows_with_current_key)
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
finalizeAggregates(result_columns);
current_key_start = rows_aggregated;
rows_with_current_key = 0;
}
if (ttl_expired)
{
++rows_with_current_key;
++rows_aggregated;
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = aggregate_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
else
{
new_ttl_infos.table_ttl.update(cur_ttl);
result_column->insertFrom(*values_column, i);
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = result_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
else if (it == column_names.begin())
++rows_removed;
}
result_columns.emplace_back(std::move(result_column));
}
block = header.cloneWithColumns(std::move(result_columns));
if (rows_with_current_key)
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
block = header.cloneWithColumns(std::move(result_columns));
}
}
void TTLBlockInputStream::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length)
{
Columns aggregate_chunk;
aggregate_chunk.reserve(aggregate_columns.size());
for (const auto & name : header.getNames())
{
const auto & column = aggregate_columns[header.getPositionByName(name)];
ColumnPtr chunk_column = column->cut(start_pos, length);
aggregate_chunk.emplace_back(std::move(chunk_column));
}
aggregator->executeOnBlock(aggregate_chunk, length, agg_result, agg_key_columns,
agg_aggregate_columns, agg_no_more_keys);
}
void TTLBlockInputStream::finalizeAggregates(MutableColumns & result_columns)
{
if (!agg_result.empty())
{
auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1);
for (auto & agg_block : aggregated_res)
{
for (const auto & it : storage.rows_ttl_entry.group_by_aggregations)
std::get<2>(it)->execute(agg_block);
for (const auto & name : storage.rows_ttl_entry.group_by_keys)
{
const IColumn * values_column = agg_block.getByName(name).column.get();
auto & result_column = result_columns[header.getPositionByName(name)];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
for (const auto & it : storage.rows_ttl_entry.group_by_aggregations)
{
const IColumn * values_column = agg_block.getByName(get<1>(it)).column.get();
auto & result_column = result_columns[header.getPositionByName(std::get<0>(it))];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
}
}
agg_result.invalidate();
}
void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
#include <Interpreters/Aggregator.h>
#include <common/DateLUT.h>
@ -39,6 +40,13 @@ private:
time_t current_time;
bool force;
std::unique_ptr<Aggregator> aggregator;
std::vector<Field> current_key_value;
AggregatedDataVariants agg_result;
ColumnRawPtrs agg_key_columns;
Aggregator::AggregateColumns agg_aggregate_columns;
bool agg_no_more_keys = false;
IMergeTreeDataPart::TTLInfos old_ttl_infos;
IMergeTreeDataPart::TTLInfos new_ttl_infos;
NameSet empty_columns;
@ -59,6 +67,12 @@ private:
/// Removes rows with expired table ttl and computes new ttl_infos for part
void removeRowsWithExpiredTableTTL(Block & block);
// Calculate aggregates of aggregate_columns into agg_result
void calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length);
/// Finalize agg_result into result_columns
void finalizeAggregates(MutableColumns & result_columns);
/// Updates TTL for moves
void updateMovesTTL(Block & block);

View File

@ -192,61 +192,65 @@ void ExpressionAnalyzer::analyzeAggregation()
if (has_aggregation)
{
getSelectQuery(); /// assertSelect()
/// Find out aggregation keys.
if (select_query->groupBy())
if (select_query)
{
NameSet unique_keys;
ASTs & group_asts = select_query->groupBy()->children;
for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i)
if (select_query->groupBy())
{
ssize_t size = group_asts.size();
getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false);
const auto & column_name = group_asts[i]->getColumnName();
const auto & block = temp_actions->getSampleBlock();
if (!block.has(column_name))
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
const auto & col = block.getByName(column_name);
/// Constant expressions have non-null column pointer at this stage.
if (col.column && isColumnConst(*col.column))
NameSet unique_keys;
ASTs & group_asts = select_query->groupBy()->children;
for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i)
{
/// But don't remove last key column if no aggregate functions, otherwise aggregation will not work.
if (!aggregate_descriptions.empty() || size > 1)
ssize_t size = group_asts.size();
getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false);
const auto & column_name = group_asts[i]->getColumnName();
const auto & block = temp_actions->getSampleBlock();
if (!block.has(column_name))
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
const auto & col = block.getByName(column_name);
/// Constant expressions have non-null column pointer at this stage.
if (col.column && isColumnConst(*col.column))
{
if (i + 1 < static_cast<ssize_t>(size))
group_asts[i] = std::move(group_asts.back());
/// But don't remove last key column if no aggregate functions, otherwise aggregation will not work.
if (!aggregate_descriptions.empty() || size > 1)
{
if (i + 1 < static_cast<ssize_t>(size))
group_asts[i] = std::move(group_asts.back());
group_asts.pop_back();
group_asts.pop_back();
--i;
continue;
--i;
continue;
}
}
NameAndTypePair key{column_name, col.type};
/// Aggregation keys are uniqued.
if (!unique_keys.count(key.name))
{
unique_keys.insert(key.name);
aggregation_keys.push_back(key);
/// Key is no longer needed, therefore we can save a little by moving it.
aggregated_columns.push_back(std::move(key));
}
}
NameAndTypePair key{column_name, col.type};
/// Aggregation keys are uniqued.
if (!unique_keys.count(key.name))
if (group_asts.empty())
{
unique_keys.insert(key.name);
aggregation_keys.push_back(key);
/// Key is no longer needed, therefore we can save a little by moving it.
aggregated_columns.push_back(std::move(key));
select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, {});
has_aggregation = select_query->having() || !aggregate_descriptions.empty();
}
}
if (group_asts.empty())
{
select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, {});
has_aggregation = select_query->having() || !aggregate_descriptions.empty();
}
}
else
aggregated_columns = temp_actions->getSampleBlock().getNamesAndTypesList();
for (const auto & desc : aggregate_descriptions)
aggregated_columns.emplace_back(desc.column_name, desc.function->getReturnType());
@ -926,7 +930,7 @@ void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const
ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result)
{
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(sourceColumns(), context);
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(aggregated_columns, context);
NamesWithAliases result_columns;
Names result_names;

View File

@ -839,7 +839,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyzeSelect(
return std::make_shared<const SyntaxAnalyzerResult>(result);
}
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage) const
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage, bool allow_aggregations) const
{
if (query->as<ASTSelectQuery>())
throw Exception("Not select analyze for select asts.", ErrorCodes::LOGICAL_ERROR);
@ -855,7 +855,20 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTy
optimizeIf(query, result.aliases, settings.optimize_if_chain_to_miltiif);
assertNoAggregates(query, "in wrong place");
if (allow_aggregations)
{
GetAggregatesVisitor::Data data;
GetAggregatesVisitor(data).visit(query);
/// There can not be other aggregate functions within the aggregate functions.
for (const ASTFunction * node : data.aggregates)
for (auto & arg : node->arguments->children)
assertNoAggregates(arg, "inside another aggregate function");
result.aggregates = data.aggregates;
}
else
assertNoAggregates(query, "in wrong place");
result.collectUsedColumns(query);
return std::make_shared<const SyntaxAnalyzerResult>(result);
}

View File

@ -86,7 +86,7 @@ public:
{}
/// Analyze and rewrite not select query
SyntaxAnalyzerResultPtr analyze(ASTPtr & query, const NamesAndTypesList & source_columns_, ConstStoragePtr storage = {}) const;
SyntaxAnalyzerResultPtr analyze(ASTPtr & query, const NamesAndTypesList & source_columns_, ConstStoragePtr storage = {}, bool allow_aggregations = false) const;
/// Analyze and rewrite select query
SyntaxAnalyzerResultPtr analyzeSelect(

View File

@ -7,21 +7,90 @@
namespace DB
{
ASTPtr ASTTTLElement::clone() const
{
auto clone = std::make_shared<ASTTTLElement>(*this);
clone->children.clear();
clone->ttl_expr_pos = -1;
clone->where_expr_pos = -1;
clone->setExpression(clone->ttl_expr_pos, getExpression(ttl_expr_pos, true));
clone->setExpression(clone->where_expr_pos, getExpression(where_expr_pos, true));
for (auto & expr : clone->group_by_key)
expr = expr->clone();
for (auto & [name, expr] : clone->group_by_aggregations)
expr = expr->clone();
return clone;
}
void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
children.front()->formatImpl(settings, state, frame);
if (destination_type == PartDestinationType::DISK)
ttl()->formatImpl(settings, state, frame);
if (mode == TTLMode::MOVE && destination_type == PartDestinationType::DISK)
{
settings.ostr << " TO DISK " << quoteString(destination_name);
}
else if (destination_type == PartDestinationType::VOLUME)
else if (mode == TTLMode::MOVE && destination_type == PartDestinationType::VOLUME)
{
settings.ostr << " TO VOLUME " << quoteString(destination_name);
}
else if (destination_type == PartDestinationType::DELETE)
else if (mode == TTLMode::GROUP_BY)
{
settings.ostr << " GROUP BY ";
for (auto it = group_by_key.begin(); it != group_by_key.end(); ++it)
{
if (it != group_by_key.begin())
settings.ostr << ", ";
(*it)->formatImpl(settings, state, frame);
}
if (!group_by_aggregations.empty())
{
settings.ostr << " SET ";
for (auto it = group_by_aggregations.begin(); it != group_by_aggregations.end(); ++it)
{
if (it != group_by_aggregations.begin())
settings.ostr << ", ";
settings.ostr << it->first << " = ";
it->second->formatImpl(settings, state, frame);
}
}
}
else if (mode == TTLMode::DELETE)
{
/// It would be better to output "DELETE" here but that will break compatibility with earlier versions.
}
if (where())
{
settings.ostr << " WHERE ";
where()->formatImpl(settings, state, frame);
}
}
void ASTTTLElement::setExpression(int & pos, ASTPtr && ast)
{
if (ast)
{
if (pos == -1)
{
pos = children.size();
children.emplace_back(ast);
}
else
children[pos] = ast;
}
else if (pos != -1)
{
children[pos] = ASTPtr{};
pos = -1;
}
}
ASTPtr ASTTTLElement::getExpression(int pos, bool clone) const
{
return pos != -1 ? (clone ? children[pos]->clone() : children[pos]) : ASTPtr{};
}
}

View File

@ -2,35 +2,53 @@
#include <Parsers/IAST.h>
#include <Storages/MergeTree/PartDestinationType.h>
#include <Storages/MergeTree/TTLMode.h>
namespace DB
{
/** Element of TTL expression.
*/
class ASTTTLElement : public IAST
{
public:
TTLMode mode;
PartDestinationType destination_type;
String destination_name;
ASTTTLElement(PartDestinationType destination_type_, const String & destination_name_)
: destination_type(destination_type_)
ASTs group_by_key;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations;
ASTTTLElement(TTLMode mode_, PartDestinationType destination_type_, const String & destination_name_)
: mode(mode_)
, destination_type(destination_type_)
, destination_name(destination_name_)
, ttl_expr_pos(-1)
, where_expr_pos(-1)
{
}
String getID(char) const override { return "TTLElement"; }
ASTPtr clone() const override
{
auto clone = std::make_shared<ASTTTLElement>(*this);
clone->cloneChildren();
return clone;
}
ASTPtr clone() const override;
const ASTPtr ttl() const { return getExpression(ttl_expr_pos); }
const ASTPtr where() const { return getExpression(where_expr_pos); }
void setTTL(ASTPtr && ast) { setExpression(ttl_expr_pos, std::forward<ASTPtr>(ast)); }
void setWhere(ASTPtr && ast) { setExpression(where_expr_pos, std::forward<ASTPtr>(ast)); }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
private:
int ttl_expr_pos;
int where_expr_pos;
private:
void setExpression(int & pos, ASTPtr && ast);
ASTPtr getExpression(int pos, bool clone = false) const;
};
}

View File

@ -1455,23 +1455,50 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_to_disk("TO DISK");
ParserKeyword s_to_volume("TO VOLUME");
ParserKeyword s_delete("DELETE");
ParserKeyword s_where("WHERE");
ParserKeyword s_group_by("GROUP BY");
ParserKeyword s_set("SET");
ParserToken s_comma(TokenType::Comma);
ParserToken s_eq(TokenType::Equals);
ParserIdentifier parser_identifier;
ParserStringLiteral parser_string_literal;
ParserExpression parser_exp;
ParserExpressionList parser_expression_list(false);
ASTPtr expr_elem;
if (!parser_exp.parse(pos, expr_elem, expected))
ASTPtr ttl_expr;
if (!parser_exp.parse(pos, ttl_expr, expected))
return false;
TTLMode mode;
PartDestinationType destination_type = PartDestinationType::DELETE;
String destination_name;
if (s_to_disk.ignore(pos))
destination_type = PartDestinationType::DISK;
else if (s_to_volume.ignore(pos))
destination_type = PartDestinationType::VOLUME;
else
s_delete.ignore(pos);
if (destination_type == PartDestinationType::DISK || destination_type == PartDestinationType::VOLUME)
if (s_to_disk.ignore(pos))
{
mode = TTLMode::MOVE;
destination_type = PartDestinationType::DISK;
}
else if (s_to_volume.ignore(pos))
{
mode = TTLMode::MOVE;
destination_type = PartDestinationType::VOLUME;
}
else if (s_group_by.ignore(pos))
{
mode = TTLMode::GROUP_BY;
}
else
{
s_delete.ignore(pos);
mode = TTLMode::DELETE;
}
ASTPtr where_expr;
ASTPtr ast_group_by_key;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations;
if (mode == TTLMode::MOVE)
{
ASTPtr ast_space_name;
if (!parser_string_literal.parse(pos, ast_space_name, expected))
@ -1479,10 +1506,52 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
destination_name = ast_space_name->as<ASTLiteral &>().value.get<const String &>();
}
else if (mode == TTLMode::GROUP_BY)
{
if (!parser_expression_list.parse(pos, ast_group_by_key, expected))
return false;
node = std::make_shared<ASTTTLElement>(destination_type, destination_name);
node->children.push_back(expr_elem);
if (s_set.ignore(pos))
{
while (true)
{
if (!group_by_aggregations.empty() && !s_comma.ignore(pos))
break;
ASTPtr name;
ASTPtr value;
if (!parser_identifier.parse(pos, name, expected))
return false;
if (!s_eq.ignore(pos))
return false;
if (!parser_exp.parse(pos, value, expected))
return false;
String name_str;
if (!tryGetIdentifierNameInto(name, name_str))
return false;
group_by_aggregations.emplace_back(name_str, std::move(value));
}
}
}
else if (mode == TTLMode::DELETE && s_where.ignore(pos))
{
if (!parser_exp.parse(pos, where_expr, expected))
return false;
}
auto ttl_element = std::make_shared<ASTTTLElement>(mode, destination_type, destination_name);
ttl_element->setTTL(std::move(ttl_expr));
if (where_expr)
ttl_element->setWhere(std::move(where_expr));
if (mode == TTLMode::GROUP_BY)
{
ttl_element->group_by_key = std::move(ast_group_by_key->children);
ttl_element->group_by_aggregations = std::move(group_by_aggregations);
}
node = ttl_element;
return true;
}

View File

@ -615,19 +615,115 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
auto new_column_ttls = new_columns.getColumnTTLs();
auto create_ttl_entry = [this, &new_columns](ASTPtr ttl_ast)
auto create_ttl_entry = [this, &new_columns](ASTPtr ttl_expr_ast)
{
TTLEntry result;
auto syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_ast, new_columns.getAllPhysical());
result.expression = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false);
auto ttl_syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_expr_ast, new_columns.getAllPhysical());
result.expression = ExpressionAnalyzer(ttl_expr_ast, ttl_syntax_result, global_context).getActions(false);
result.result_column = ttl_expr_ast->getColumnName();
result.destination_type = PartDestinationType::DELETE;
result.result_column = ttl_ast->getColumnName();
result.mode = TTLMode::DELETE;
checkTTLExpression(result.expression, result.result_column);
return result;
};
auto create_rows_ttl_entry = [this, &new_columns, &create_ttl_entry](const ASTTTLElement * ttl_element)
{
auto result = create_ttl_entry(ttl_element->ttl());
result.mode = ttl_element->mode;
if (ttl_element->mode == TTLMode::DELETE)
{
if (ASTPtr where_expr_ast = ttl_element->where())
{
auto where_syntax_result = SyntaxAnalyzer(global_context).analyze(where_expr_ast, new_columns.getAllPhysical());
result.where_expression = ExpressionAnalyzer(where_expr_ast, where_syntax_result, global_context).getActions(false);
result.where_result_column = where_expr_ast->getColumnName();
}
}
else if (ttl_element->mode == TTLMode::GROUP_BY)
{
if (ttl_element->group_by_key.size() > this->getPrimaryKey().column_names.size())
throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION);
NameSet primary_key_columns_set(this->getPrimaryKey().column_names.begin(), this->getPrimaryKey().column_names.end());
NameSet aggregation_columns_set;
for (const auto & column : this->getPrimaryKey().expression->getRequiredColumns())
primary_key_columns_set.insert(column);
for (size_t i = 0; i < ttl_element->group_by_key.size(); ++i)
{
if (ttl_element->group_by_key[i]->getColumnName() != this->getPrimaryKey().column_names[i])
throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION);
}
for (const auto & [name, value] : ttl_element->group_by_aggregations)
{
if (primary_key_columns_set.count(name))
throw Exception("Can not set custom aggregation for column in primary key in TTL Expression", ErrorCodes::BAD_TTL_EXPRESSION);
aggregation_columns_set.insert(name);
}
if (aggregation_columns_set.size() != ttl_element->group_by_aggregations.size())
throw Exception("Multiple aggregations set for one column in TTL Expression", ErrorCodes::BAD_TTL_EXPRESSION);
result.group_by_keys = Names(this->getPrimaryKey().column_names.begin(), this->getPrimaryKey().column_names.begin() + ttl_element->group_by_key.size());
auto aggregations = ttl_element->group_by_aggregations;
for (size_t i = 0; i < this->getPrimaryKey().column_names.size(); ++i)
{
ASTPtr value = this->getPrimaryKey().expression_list_ast->children[i]->clone();
if (i >= ttl_element->group_by_key.size())
{
ASTPtr value_max = makeASTFunction("max", value->clone());
aggregations.emplace_back(value->getColumnName(), std::move(value_max));
}
if (value->as<ASTFunction>())
{
auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true);
auto expr_actions = ExpressionAnalyzer(value, syntax_result, global_context).getActions(false);
for (const auto & column : expr_actions->getRequiredColumns())
{
if (i < ttl_element->group_by_key.size())
{
ASTPtr expr = makeASTFunction("any", std::make_shared<ASTIdentifier>(column));
aggregations.emplace_back(column, std::move(expr));
}
else
{
ASTPtr expr = makeASTFunction("argMax", std::make_shared<ASTIdentifier>(column), value->clone());
aggregations.emplace_back(column, std::move(expr));
}
}
}
}
for (const auto & column : new_columns.getAllPhysical())
{
if (!primary_key_columns_set.count(column.name) && !aggregation_columns_set.count(column.name))
{
ASTPtr expr = makeASTFunction("any", std::make_shared<ASTIdentifier>(column.name));
aggregations.emplace_back(column.name, std::move(expr));
}
}
for (auto [name, value] : aggregations)
{
auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true);
auto expr_analyzer = ExpressionAnalyzer(value, syntax_result, global_context);
result.group_by_aggregations.emplace_back(name, value->getColumnName(), expr_analyzer.getActions(false));
for (const auto & descr : expr_analyzer.getAnalyzedData().aggregate_descriptions)
result.aggregate_descriptions.push_back(descr);
}
}
return result;
};
if (!new_column_ttls.empty())
{
NameSet columns_ttl_forbidden;
@ -672,7 +768,7 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION);
}
auto new_rows_ttl_entry = create_ttl_entry(ttl_element->children[0]);
auto new_rows_ttl_entry = create_rows_ttl_entry(ttl_element);
if (!only_check)
update_rows_ttl_entry = new_rows_ttl_entry;
@ -680,7 +776,7 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
}
else
{
auto new_ttl_entry = create_ttl_entry(ttl_element->children[0]);
auto new_ttl_entry = create_rows_ttl_entry(ttl_element);
new_ttl_entry.entry_ast = ttl_element_ptr;
new_ttl_entry.destination_type = ttl_element->destination_type;

View File

@ -20,6 +20,8 @@
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Interpreters/PartLog.h>
#include <Disks/StoragePolicy.h>
#include <Interpreters/Aggregator.h>
#include <Storages/MergeTree/TTLMode.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
@ -649,9 +651,18 @@ public:
struct TTLEntry
{
TTLMode mode;
ExpressionActionsPtr expression;
String result_column;
ExpressionActionsPtr where_expression;
String where_result_column;
Names group_by_keys;
std::vector<std::tuple<String, String, ExpressionActionsPtr>> group_by_aggregations;
AggregateDescriptions aggregate_descriptions;
/// Name and type of a destination are only valid in table-level context.
PartDestinationType destination_type;
String destination_name;

View File

@ -0,0 +1,14 @@
#pragma once
namespace DB
{
enum class TTLMode
{
DELETE,
MOVE,
GROUP_BY
};
}

View File

@ -0,0 +1,20 @@
1 1 0 4
1 2 3 7
1 3 0 5
2 1 20 1
2 1 0 1
1 1 [0,2,3] 4
1 1 [5,4,1] 13
1 3 [1,0,1,0] 17
2 1 [3,1,0,3] 8
3 1 [2,4,5] 8
1 1 0 4
1 3 10 6
2 1 0 3
3 5 8 2
1 1 0 4
3 3 13 9
1 2 7 5
2 3 6 5
1 2 3 5
2 3 3 5

View File

@ -0,0 +1,87 @@
drop table if exists ttl_01280_1;
create table ttl_01280_1 (a Int, b Int, x Int, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second delete where x % 10 == 0 and y > 5;
insert into ttl_01280_1 values (1, 1, 0, 4, now() + 10);
insert into ttl_01280_1 values (1, 1, 10, 6, now());
insert into ttl_01280_1 values (1, 2, 3, 7, now());
insert into ttl_01280_1 values (1, 3, 0, 5, now());
insert into ttl_01280_1 values (2, 1, 20, 1, now());
insert into ttl_01280_1 values (2, 1, 0, 1, now());
insert into ttl_01280_1 values (3, 1, 0, 8, now());
select sleep(1.1) format Null;
optimize table ttl_01280_1 final;
select a, b, x, y from ttl_01280_1;
drop table if exists ttl_01280_2;
create table ttl_01280_2 (a Int, b Int, x Array(Int32), y Double, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set x = minForEach(x), y = sum(y), d = max(d);
insert into ttl_01280_2 values (1, 1, array(0, 2, 3), 4, now() + 10);
insert into ttl_01280_2 values (1, 1, array(5, 4, 3), 6, now());
insert into ttl_01280_2 values (1, 1, array(5, 5, 1), 7, now());
insert into ttl_01280_2 values (1, 3, array(3, 0, 4), 5, now());
insert into ttl_01280_2 values (1, 3, array(1, 1, 2, 1), 9, now());
insert into ttl_01280_2 values (1, 3, array(3, 2, 1, 0), 3, now());
insert into ttl_01280_2 values (2, 1, array(3, 3, 3), 7, now());
insert into ttl_01280_2 values (2, 1, array(11, 1, 0, 3), 1, now());
insert into ttl_01280_2 values (3, 1, array(2, 4, 5), 8, now());
select sleep(1.1) format Null;
optimize table ttl_01280_2 final;
select a, b, x, y from ttl_01280_2;
drop table if exists ttl_01280_3;
create table ttl_01280_3 (a Int, b Int, x Int64, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set x = argMax(x, d), y = argMax(y, d), d = max(d);
insert into ttl_01280_3 values (1, 1, 0, 4, now() + 10);
insert into ttl_01280_3 values (1, 1, 10, 6, now() + 1);
insert into ttl_01280_3 values (1, 2, 3, 7, now());
insert into ttl_01280_3 values (1, 3, 0, 5, now());
insert into ttl_01280_3 values (2, 1, 20, 1, now());
insert into ttl_01280_3 values (2, 1, 0, 3, now() + 1);
insert into ttl_01280_3 values (3, 1, 0, 3, now());
insert into ttl_01280_3 values (3, 2, 8, 2, now() + 1);
insert into ttl_01280_3 values (3, 5, 5, 8, now());
select sleep(2.1) format Null;
optimize table ttl_01280_3 final;
select a, b, x, y from ttl_01280_3;
drop table if exists ttl_01280_4;
create table ttl_01280_4 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), -(a + b)) ttl d + interval 1 second group by toDate(d) set x = sum(x), y = max(y);
insert into ttl_01280_4 values (1, 1, 0, 4, now() + 10);
insert into ttl_01280_4 values (10, 2, 3, 3, now());
insert into ttl_01280_4 values (2, 10, 1, 7, now());
insert into ttl_01280_4 values (3, 3, 5, 2, now());
insert into ttl_01280_4 values (1, 5, 4, 9, now());
select sleep(1.1) format Null;
optimize table ttl_01280_4 final;
select a, b, x, y from ttl_01280_4;
drop table if exists ttl_01280_5;
create table ttl_01280_5 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a set x = sum(x);
insert into ttl_01280_5 values (1, 2, 3, 5, now());
insert into ttl_01280_5 values (2, 10, 1, 5, now());
insert into ttl_01280_5 values (2, 3, 5, 5, now());
insert into ttl_01280_5 values (1, 5, 4, 5, now());
select sleep(1.1) format Null;
optimize table ttl_01280_5 final;
select a, b, x, y from ttl_01280_5;
drop table if exists ttl_01280_6;
create table ttl_01280_6 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a;
insert into ttl_01280_6 values (1, 2, 3, 5, now());
insert into ttl_01280_6 values (2, 10, 3, 5, now());
insert into ttl_01280_6 values (2, 3, 3, 5, now());
insert into ttl_01280_6 values (1, 5, 3, 5, now());
select sleep(1.1) format Null;
optimize table ttl_01280_6 final;
select a, b, x, y from ttl_01280_6;
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by x set y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by b set y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b, x set y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set b = min(b), y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set y = max(y), y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a) ttl d + interval 1 second group by toDate(d), a set d = min(d), b = max(b); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (d, -(a + b)) ttl d + interval 1 second group by d, -(a + b) set a = sum(a), b = min(b); -- { serverError 450}