Allow functions in group by keys; Add default aggregate function; Add more tests

This commit is contained in:
Nikolai Sorokin 2020-05-24 23:21:23 +03:00
parent 7d937b43c7
commit 141ed88751
8 changed files with 146 additions and 67 deletions

View File

@ -17,6 +17,8 @@ ASTPtr ASTTTLElement::clone() const
clone->setExpression(clone->ttl_expr_pos, getExpression(ttl_expr_pos, true)); clone->setExpression(clone->ttl_expr_pos, getExpression(ttl_expr_pos, true));
clone->setExpression(clone->where_expr_pos, getExpression(where_expr_pos, true)); clone->setExpression(clone->where_expr_pos, getExpression(where_expr_pos, true));
for (auto & expr : clone->group_by_key)
expr = expr->clone();
for (auto & [name, expr] : clone->group_by_aggregations) for (auto & [name, expr] : clone->group_by_aggregations)
expr = expr->clone(); expr = expr->clone();
@ -37,19 +39,22 @@ void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & st
else if (mode == TTLMode::GROUP_BY) else if (mode == TTLMode::GROUP_BY)
{ {
settings.ostr << " GROUP BY "; settings.ostr << " GROUP BY ";
for (auto it = group_by_key_columns.begin(); it != group_by_key_columns.end(); ++it) for (auto it = group_by_key.begin(); it != group_by_key.end(); ++it)
{ {
if (it != group_by_key_columns.begin()) if (it != group_by_key.begin())
settings.ostr << ", "; settings.ostr << ", ";
settings.ostr << *it; (*it)->formatImpl(settings, state, frame);
} }
settings.ostr << " SET "; if (!group_by_aggregations.empty())
for (auto it = group_by_aggregations.begin(); it != group_by_aggregations.end(); ++it)
{ {
if (it != group_by_aggregations.begin()) settings.ostr << " SET ";
settings.ostr << ", "; for (auto it = group_by_aggregations.begin(); it != group_by_aggregations.end(); ++it)
settings.ostr << it->first << " = "; {
it->second->formatImpl(settings, state, frame); if (it != group_by_aggregations.begin())
settings.ostr << ", ";
settings.ostr << it->first << " = ";
it->second->formatImpl(settings, state, frame);
}
} }
} }
else if (mode == TTLMode::DELETE) else if (mode == TTLMode::DELETE)

View File

@ -17,7 +17,7 @@ public:
PartDestinationType destination_type; PartDestinationType destination_type;
String destination_name; String destination_name;
Strings group_by_key_columns; ASTs group_by_key;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations; std::vector<std::pair<String, ASTPtr>> group_by_aggregations;
ASTTTLElement(TTLMode mode_, PartDestinationType destination_type_, const String & destination_name_) ASTTTLElement(TTLMode mode_, PartDestinationType destination_type_, const String & destination_name_)

View File

@ -1464,7 +1464,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserIdentifier parser_identifier; ParserIdentifier parser_identifier;
ParserStringLiteral parser_string_literal; ParserStringLiteral parser_string_literal;
ParserExpression parser_exp; ParserExpression parser_exp;
ParserIdentifierList parser_identifier_list; ParserExpressionList parser_expression_list(false);
ASTPtr ttl_expr; ASTPtr ttl_expr;
if (!parser_exp.parse(pos, ttl_expr, expected)) if (!parser_exp.parse(pos, ttl_expr, expected))
@ -1495,7 +1495,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
} }
ASTPtr where_expr; ASTPtr where_expr;
std::vector<String> group_by_key_columns; ASTPtr ast_group_by_key;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations; std::vector<std::pair<String, ASTPtr>> group_by_aggregations;
if (mode == TTLMode::MOVE) if (mode == TTLMode::MOVE)
@ -1508,37 +1508,30 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
} }
else if (mode == TTLMode::GROUP_BY) else if (mode == TTLMode::GROUP_BY)
{ {
ASTPtr ast_group_by_key_columns; if (!parser_expression_list.parse(pos, ast_group_by_key, expected))
if (!parser_identifier_list.parse(pos, ast_group_by_key_columns, expected))
return false; return false;
for (const auto & identifier : ast_group_by_key_columns->children)
if (s_set.ignore(pos))
{ {
String identifier_str; while (true)
if (!tryGetIdentifierNameInto(identifier, identifier_str)) {
return false; if (!group_by_aggregations.empty() && !s_comma.ignore(pos))
group_by_key_columns.emplace_back(std::move(identifier_str)); break;
}
if (!s_set.ignore(pos)) ASTPtr name;
return false; ASTPtr value;
while (true) if (!parser_identifier.parse(pos, name, expected))
{ return false;
if (!group_by_aggregations.empty() && !s_comma.ignore(pos)) if (!s_eq.ignore(pos))
break; return false;
if (!parser_exp.parse(pos, value, expected))
return false;
ASTPtr name; String name_str;
ASTPtr value; if (!tryGetIdentifierNameInto(name, name_str))
if (!parser_identifier.parse(pos, name, expected)) return false;
return false; group_by_aggregations.emplace_back(name_str, std::move(value));
if (!s_eq.ignore(pos)) }
return false;
if (!parser_exp.parse(pos, value, expected))
return false;
String name_str;
if (!tryGetIdentifierNameInto(name, name_str))
return false;
group_by_aggregations.emplace_back(name_str, std::move(value));
} }
} }
else if (mode == TTLMode::DELETE && s_where.ignore(pos)) else if (mode == TTLMode::DELETE && s_where.ignore(pos))
@ -1552,8 +1545,11 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (where_expr) if (where_expr)
ttl_element->setWhere(std::move(where_expr)); ttl_element->setWhere(std::move(where_expr));
ttl_element->group_by_key_columns = std::move(group_by_key_columns); if (mode == TTLMode::GROUP_BY)
ttl_element->group_by_aggregations = std::move(group_by_aggregations); {
ttl_element->group_by_key = std::move(ast_group_by_key->children);
ttl_element->group_by_aggregations = std::move(group_by_aggregations);
}
node = ttl_element; node = ttl_element;
return true; return true;

View File

@ -742,13 +742,4 @@ bool ParserKeyValuePairsList::parseImpl(Pos & pos, ASTPtr & node, Expected & exp
return parser.parse(pos, node, expected); return parser.parse(pos, node, expected);
} }
bool ParserIdentifierList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
return ParserList(
std::make_unique<ParserIdentifier>(),
std::make_unique<ParserToken>(TokenType::Comma))
.parse(pos, node, expected);
}
} }

View File

@ -421,13 +421,4 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
}; };
/** A comma-separated list of identifiers, probably empty. */
class ParserIdentifierList : public IParserBase
{
protected:
const char * getName() const override { return "list of identifiers"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
} }

View File

@ -646,22 +646,70 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
} }
else if (ttl_element->mode == TTLMode::GROUP_BY) else if (ttl_element->mode == TTLMode::GROUP_BY)
{ {
if (ttl_element->group_by_key_columns.size() > this->primary_key_columns.size()) if (ttl_element->group_by_key.size() > this->primary_key_columns.size())
throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION);
for (size_t i = 0; i < ttl_element->group_by_key_columns.size(); ++i)
NameSet primary_key_columns_set(this->primary_key_columns.begin(), this->primary_key_columns.end());
NameSet aggregation_columns_set;
for (const auto & column : this->primary_key_expr->getRequiredColumns())
primary_key_columns_set.insert(column);
for (size_t i = 0; i < ttl_element->group_by_key.size(); ++i)
{ {
if (ttl_element->group_by_key_columns[i] != this->primary_key_columns[i]) if (ttl_element->group_by_key[i]->getColumnName() != this->primary_key_columns[i])
throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION);
} }
for (const auto & [name, value] : ttl_element->group_by_aggregations)
{
if (primary_key_columns_set.contains(name))
throw Exception("Can not set custom aggregation for column in primary key in TTL Expression", ErrorCodes::BAD_TTL_EXPRESSION);
aggregation_columns_set.insert(name);
}
if (aggregation_columns_set.size() != ttl_element->group_by_aggregations.size())
throw Exception("Multiple aggregations set for one column in TTL Expression", ErrorCodes::BAD_TTL_EXPRESSION);
result.group_by_keys = ttl_element->group_by_key_columns; result.group_by_keys = Names(this->primary_key_columns.begin(), this->primary_key_columns.begin() + ttl_element->group_by_key.size());
auto aggregations = ttl_element->group_by_aggregations; auto aggregations = ttl_element->group_by_aggregations;
for (size_t i = ttl_element->group_by_key_columns.size(); i < this->primary_key_columns.size(); ++i) for (size_t i = 0; i < this->primary_key_columns.size(); ++i)
{ {
ASTPtr expr = makeASTFunction("max", std::make_shared<ASTIdentifier>(this->primary_key_columns[i])); ASTPtr value = this->primary_key_expr_ast->children[i]->clone();
aggregations.emplace_back(this->primary_key_columns[i], std::move(expr));
if (i >= ttl_element->group_by_key.size())
{
ASTPtr value_max = makeASTFunction("max", value->clone());
aggregations.emplace_back(value->getColumnName(), std::move(value_max));
}
if (value->as<ASTFunction>())
{
auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true);
auto expr_actions = ExpressionAnalyzer(value, syntax_result, global_context).getActions(false);
for (const auto & column : expr_actions->getRequiredColumns())
{
if (i < ttl_element->group_by_key.size())
{
ASTPtr expr = makeASTFunction("any", std::make_shared<ASTIdentifier>(column));
aggregations.emplace_back(column, std::move(expr));
}
else
{
ASTPtr expr = makeASTFunction("argMax", std::make_shared<ASTIdentifier>(column), value->clone());
aggregations.emplace_back(column, std::move(expr));
}
}
}
} }
for (const auto & column : new_columns.getAllPhysical())
{
if (!primary_key_columns_set.contains(column.name) && !aggregation_columns_set.contains(column.name))
{
ASTPtr expr = makeASTFunction("any", std::make_shared<ASTIdentifier>(column.name));
aggregations.emplace_back(column.name, std::move(expr));
}
}
for (auto [name, value] : aggregations) for (auto [name, value] : aggregations)
{ {
auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true); auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true);

View File

@ -12,3 +12,9 @@
1 3 10 6 1 3 10 6
2 1 0 3 2 1 0 3
3 5 8 2 3 5 8 2
1 1 0 4
3 3 13 9
1 2 7 5
2 3 6 5
1 2 3 5
2 3 3 5

View File

@ -43,3 +43,45 @@ insert into ttl_01280_3 values (3, 5, 5, 8, now());
select sleep(2.1) format Null; select sleep(2.1) format Null;
optimize table ttl_01280_3 final; optimize table ttl_01280_3 final;
select a, b, x, y from ttl_01280_3; select a, b, x, y from ttl_01280_3;
drop table if exists ttl_01280_4;
create table ttl_01280_4 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), -(a + b)) ttl d + interval 1 second group by toDate(d) set x = sum(x), y = max(y);
insert into ttl_01280_4 values (1, 1, 0, 4, now() + 10);
insert into ttl_01280_4 values (10, 2, 3, 3, now());
insert into ttl_01280_4 values (2, 10, 1, 7, now());
insert into ttl_01280_4 values (3, 3, 5, 2, now());
insert into ttl_01280_4 values (1, 5, 4, 9, now());
select sleep(1.1) format Null;
optimize table ttl_01280_4 final;
select a, b, x, y from ttl_01280_4;
drop table if exists ttl_01280_5;
create table ttl_01280_5 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a set x = sum(x);
insert into ttl_01280_5 values (1, 2, 3, 5, now());
insert into ttl_01280_5 values (2, 10, 1, 5, now());
insert into ttl_01280_5 values (2, 3, 5, 5, now());
insert into ttl_01280_5 values (1, 5, 4, 5, now());
select sleep(1.1) format Null;
optimize table ttl_01280_5 final;
select a, b, x, y from ttl_01280_5;
drop table if exists ttl_01280_6;
create table ttl_01280_6 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a;
insert into ttl_01280_6 values (1, 2, 3, 5, now());
insert into ttl_01280_6 values (2, 10, 3, 5, now());
insert into ttl_01280_6 values (2, 3, 3, 5, now());
insert into ttl_01280_6 values (1, 5, 3, 5, now());
select sleep(1.1) format Null;
optimize table ttl_01280_6 final;
select a, b, x, y from ttl_01280_6;
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by x set y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by b set y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b, x set y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set b = min(b), y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set y = max(y), y = max(y); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a) ttl d + interval 1 second group by toDate(d), a set d = min(d), b = max(b); -- { serverError 450}
create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (d, -(a + b)) ttl d + interval 1 second group by d, -(a + b) set a = sum(a), b = min(b); -- { serverError 450}