TTL Expressions WHERE and GROUP BY draft

This commit is contained in:
Nikolai Sorokin 2020-04-27 17:47:59 +03:00
parent 3399e573d1
commit 61974e0047
12 changed files with 479 additions and 44 deletions

View File

@ -67,6 +67,30 @@ TTLBlockInputStream::TTLBlockInputStream(
default_expr_list, storage.getColumns().getAllPhysical()); default_expr_list, storage.getColumns().getAllPhysical());
defaults_expression = ExpressionAnalyzer{default_expr_list, syntax_result, storage.global_context}.getActions(true); defaults_expression = ExpressionAnalyzer{default_expr_list, syntax_result, storage.global_context}.getActions(true);
} }
if (storage.hasRowsTTL() && !storage.rows_ttl_entry.group_by_keys.empty())
{
ColumnNumbers keys;
for (const auto & key : storage.rows_ttl_entry.group_by_keys)
keys.push_back(header.getPositionByName(key));
agg_key_columns.resize(storage.rows_ttl_entry.group_by_keys.size());
AggregateDescriptions aggregates = storage.rows_ttl_entry.aggregate_descriptions;
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
descr.arguments.push_back(header.getPositionByName(name));
agg_aggregate_columns.resize(storage.rows_ttl_entry.aggregate_descriptions.size());
const Settings & settings = storage.global_context.getSettingsRef();
Aggregator::Params params(header, keys, aggregates,
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
storage.global_context.getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
aggregator = std::make_unique<Aggregator>(params);
}
} }
bool TTLBlockInputStream::isTTLExpired(time_t ttl) const bool TTLBlockInputStream::isTTLExpired(time_t ttl) const
@ -77,7 +101,8 @@ bool TTLBlockInputStream::isTTLExpired(time_t ttl) const
Block TTLBlockInputStream::readImpl() Block TTLBlockInputStream::readImpl()
{ {
/// Skip all data if table ttl is expired for part /// Skip all data if table ttl is expired for part
if (storage.hasRowsTTL() && isTTLExpired(old_ttl_infos.table_ttl.max)) if (storage.hasRowsTTL() && !storage.rows_ttl_entry.where_expression &&
storage.rows_ttl_entry.group_by_keys.empty() && isTTLExpired(old_ttl_infos.table_ttl.max))
{ {
rows_removed = data_part->rows_count; rows_removed = data_part->rows_count;
return {}; return {};
@ -85,7 +110,43 @@ Block TTLBlockInputStream::readImpl()
Block block = children.at(0)->read(); Block block = children.at(0)->read();
if (!block) if (!block)
{
if (aggregator && !agg_result.empty())
{
MutableColumns result_columns;
const auto & column_names = header.getNames();
for (const auto & column_name : column_names)
{
const IColumn * values_column = header.getByName(column_name).column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_columns.emplace_back(std::move(result_column));
}
auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1);
for (auto & agg_block : aggregated_res)
{
for (const auto & [name, actions] : storage.rows_ttl_entry.group_by_aggregations)
actions->execute(agg_block);
for (const auto & name : storage.rows_ttl_entry.group_by_keys)
{
const IColumn * values_column = agg_block.getByName(name).column.get();
auto & result_column = result_columns[header.getPositionByName(name)];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
for (const auto & [name, res_column] : storage.rows_ttl_entry.group_by_aggregations_res_column)
{
const IColumn * values_column = agg_block.getByName(res_column).column.get();
auto & result_column = result_columns[header.getPositionByName(name)];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
}
block = header.cloneWithColumns(std::move(result_columns));
agg_result.invalidate();
}
return block; return block;
}
if (storage.hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min))) if (storage.hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
removeRowsWithExpiredTableTTL(block); removeRowsWithExpiredTableTTL(block);
@ -114,14 +175,21 @@ void TTLBlockInputStream::readSuffixImpl()
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
{ {
storage.rows_ttl_entry.expression->execute(block); storage.rows_ttl_entry.expression->execute(block);
if (storage.rows_ttl_entry.where_expression)
storage.rows_ttl_entry.where_expression->execute(block);
const IColumn * ttl_column = const IColumn * ttl_column =
block.getByName(storage.rows_ttl_entry.result_column).column.get(); block.getByName(storage.rows_ttl_entry.result_column).column.get();
const IColumn * where_filter_column = storage.rows_ttl_entry.where_expression ?
block.getByName(storage.rows_ttl_entry.where_filter_column).column.get() : nullptr;
const auto & column_names = header.getNames(); const auto & column_names = header.getNames();
if (!aggregator)
{
MutableColumns result_columns; MutableColumns result_columns;
result_columns.reserve(column_names.size()); result_columns.reserve(column_names.size());
for (auto it = column_names.begin(); it != column_names.end(); ++it) for (auto it = column_names.begin(); it != column_names.end(); ++it)
{ {
const IColumn * values_column = block.getByName(*it).column.get(); const IColumn * values_column = block.getByName(*it).column.get();
@ -131,7 +199,8 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
for (size_t i = 0; i < block.rows(); ++i) for (size_t i = 0; i < block.rows(); ++i)
{ {
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i); UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
if (!isTTLExpired(cur_ttl)) bool where_filter_passed = !where_filter_column || where_filter_column->getBool(i);
if (!isTTLExpired(cur_ttl) || !where_filter_passed)
{ {
new_ttl_infos.table_ttl.update(cur_ttl); new_ttl_infos.table_ttl.update(cur_ttl);
result_column->insertFrom(*values_column, i); result_column->insertFrom(*values_column, i);
@ -141,9 +210,137 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
} }
result_columns.emplace_back(std::move(result_column)); result_columns.emplace_back(std::move(result_column));
} }
block = header.cloneWithColumns(std::move(result_columns));
}
else
{
MutableColumns result_columns;
MutableColumns aggregate_columns;
for (const auto & column_name : column_names)
{
const IColumn * values_column = block.getByName(column_name).column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
result_columns.emplace_back(std::move(result_column));
MutableColumnPtr aggregate_column = values_column->cloneEmpty();
aggregate_column->reserve(block.rows());
aggregate_columns.emplace_back(std::move(aggregate_column));
}
size_t rows_aggregated = 0;
size_t current_key_start = 0;
size_t rows_with_current_key = 0;
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
bool where_filter_passed = !where_filter_column || where_filter_column->getBool(i);
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
bool same_as_current = true;
if (current_key_value.empty())
{
same_as_current = false;
current_key_value.resize(storage.rows_ttl_entry.group_by_keys.size());
}
for (size_t j = 0; j < storage.rows_ttl_entry.group_by_keys.size(); ++j)
{
const String & key_column = storage.rows_ttl_entry.group_by_keys[j];
const IColumn * values_column = block.getByName(key_column).column.get();
if (!same_as_current)
values_column->get(i, current_key_value[j]);
else
{
Field value;
values_column->get(i, value);
if (value != current_key_value[j])
{
current_key_value[j] = value;
same_as_current = false;
}
}
}
if (!same_as_current)
{
if (rows_with_current_key)
{
Columns aggregate_chunk;
aggregate_chunk.reserve(aggregate_columns.size());
for (const auto & name : column_names)
{
const auto & column = aggregate_columns[header.getPositionByName(name)];
ColumnPtr chunk_column = column->cut(current_key_start, rows_with_current_key);
aggregate_chunk.emplace_back(std::move(chunk_column));
}
aggregator->executeOnBlock(aggregate_chunk, rows_with_current_key, agg_result, agg_key_columns,
agg_aggregate_columns, agg_no_more_keys);
}
if (!agg_result.empty())
{
auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1);
for (auto & agg_block : aggregated_res)
{
for (const auto & [name, actions] : storage.rows_ttl_entry.group_by_aggregations)
actions->execute(agg_block);
for (const auto & name : storage.rows_ttl_entry.group_by_keys)
{
const IColumn * values_column = agg_block.getByName(name).column.get();
auto & result_column = result_columns[header.getPositionByName(name)];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
for (const auto & [name, res_column] : storage.rows_ttl_entry.group_by_aggregations_res_column)
{
const IColumn * values_column = agg_block.getByName(res_column).column.get();
auto & result_column = result_columns[header.getPositionByName(name)];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
}
}
agg_result.invalidate();
current_key_start = rows_aggregated;
rows_with_current_key = 0;
}
if (ttl_expired)
{
++rows_with_current_key;
++rows_aggregated;
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = aggregate_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
else
{
new_ttl_infos.table_ttl.update(cur_ttl);
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = result_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
}
if (rows_with_current_key)
{
Columns aggregate_chunk;
aggregate_chunk.reserve(aggregate_columns.size());
for (const auto & name : column_names)
{
const auto & column = aggregate_columns[header.getPositionByName(name)];
ColumnPtr chunk_column = column->cut(current_key_start, rows_with_current_key);
aggregate_chunk.emplace_back(std::move(chunk_column));
}
aggregator->executeOnBlock(aggregate_chunk, rows_with_current_key, agg_result, agg_key_columns,
agg_aggregate_columns, agg_no_more_keys);
}
block = header.cloneWithColumns(std::move(result_columns)); block = header.cloneWithColumns(std::move(result_columns));
} }
}
void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block) void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
{ {

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h> #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h> #include <Core/Block.h>
#include <Interpreters/Aggregator.h>
#include <common/DateLUT.h> #include <common/DateLUT.h>
@ -39,6 +40,13 @@ private:
time_t current_time; time_t current_time;
bool force; bool force;
std::unique_ptr<Aggregator> aggregator;
std::vector<Field> current_key_value;
AggregatedDataVariants agg_result;
ColumnRawPtrs agg_key_columns;
Aggregator::AggregateColumns agg_aggregate_columns;
bool agg_no_more_keys;
IMergeTreeDataPart::TTLInfos old_ttl_infos; IMergeTreeDataPart::TTLInfos old_ttl_infos;
IMergeTreeDataPart::TTLInfos new_ttl_infos; IMergeTreeDataPart::TTLInfos new_ttl_infos;
NameSet empty_columns; NameSet empty_columns;

View File

@ -192,10 +192,10 @@ void ExpressionAnalyzer::analyzeAggregation()
if (has_aggregation) if (has_aggregation)
{ {
getSelectQuery(); /// assertSelect() // getSelectQuery(); /// assertSelect()
/// Find out aggregation keys. /// Find out aggregation keys.
if (select_query->groupBy()) if (select_query && select_query->groupBy())
{ {
NameSet unique_keys; NameSet unique_keys;
ASTs & group_asts = select_query->groupBy()->children; ASTs & group_asts = select_query->groupBy()->children;
@ -926,7 +926,10 @@ void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const
ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result) ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result)
{ {
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(sourceColumns(), context); NamesAndTypesList columns(sourceColumns());
for (const auto & col : aggregated_columns)
columns.push_back(col);
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(columns, context);
NamesWithAliases result_columns; NamesWithAliases result_columns;
Names result_names; Names result_names;

View File

@ -839,7 +839,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyzeSelect(
return std::make_shared<const SyntaxAnalyzerResult>(result); return std::make_shared<const SyntaxAnalyzerResult>(result);
} }
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage) const SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage, bool allow_aggregates) const
{ {
if (query->as<ASTSelectQuery>()) if (query->as<ASTSelectQuery>())
throw Exception("Not select analyze for select asts.", ErrorCodes::LOGICAL_ERROR); throw Exception("Not select analyze for select asts.", ErrorCodes::LOGICAL_ERROR);
@ -855,7 +855,20 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTy
optimizeIf(query, result.aliases, settings.optimize_if_chain_to_miltiif); optimizeIf(query, result.aliases, settings.optimize_if_chain_to_miltiif);
if (allow_aggregates)
{
GetAggregatesVisitor::Data data;
GetAggregatesVisitor(data).visit(query);
/// There can not be other aggregate functions within the aggregate functions.
for (const ASTFunction * node : data.aggregates)
for (auto & arg : node->arguments->children)
assertNoAggregates(arg, "inside another aggregate function");
result.aggregates = data.aggregates;
}
else
assertNoAggregates(query, "in wrong place"); assertNoAggregates(query, "in wrong place");
result.collectUsedColumns(query); result.collectUsedColumns(query);
return std::make_shared<const SyntaxAnalyzerResult>(result); return std::make_shared<const SyntaxAnalyzerResult>(result);
} }

View File

@ -86,7 +86,7 @@ public:
{} {}
/// Analyze and rewrite not select query /// Analyze and rewrite not select query
SyntaxAnalyzerResultPtr analyze(ASTPtr & query, const NamesAndTypesList & source_columns_, ConstStoragePtr storage = {}) const; SyntaxAnalyzerResultPtr analyze(ASTPtr & query, const NamesAndTypesList & source_columns_, ConstStoragePtr storage = {}, bool allow_aggregations = false) const;
/// Analyze and rewrite select query /// Analyze and rewrite select query
SyntaxAnalyzerResultPtr analyzeSelect( SyntaxAnalyzerResultPtr analyzeSelect(

View File

@ -7,21 +7,80 @@
namespace DB namespace DB
{ {
ASTPtr ASTTTLElement::clone() const
{
auto clone = std::make_shared<ASTTTLElement>(*this);
clone->children.clear();
clone->positions.clear();
for (auto expr : {Expression::TTL, Expression::WHERE})
clone->setExpression(expr, getExpression(expr, true));
for (auto & [name, expr] : clone->group_by_aggregations)
expr = expr->clone();
return clone;
}
void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{ {
children.front()->formatImpl(settings, state, frame); ttl()->formatImpl(settings, state, frame);
if (destination_type == PartDestinationType::DISK) if (mode == Mode::MOVE && destination_type == PartDestinationType::DISK)
{ {
settings.ostr << " TO DISK " << quoteString(destination_name); settings.ostr << " TO DISK " << quoteString(destination_name);
} }
else if (destination_type == PartDestinationType::VOLUME) else if (mode == Mode::MOVE && destination_type == PartDestinationType::VOLUME)
{ {
settings.ostr << " TO VOLUME " << quoteString(destination_name); settings.ostr << " TO VOLUME " << quoteString(destination_name);
} }
else if (destination_type == PartDestinationType::DELETE) else if (mode == Mode::GROUP_BY)
{
settings.ostr << " GROUP BY ";
for (size_t i = 0; i < group_by_key_columns.size(); ++i)
{
settings.ostr << group_by_key_columns[i];
if (i + 1 != group_by_key_columns.size())
settings.ostr << ", ";
}
settings.ostr << " SET ";
for (size_t i = 0; i < group_by_aggregations.size(); ++i)
{
settings.ostr << group_by_aggregations[i].first << " = ";
group_by_aggregations[i].second->formatImpl(settings, state, frame);
if (i + 1 != group_by_aggregations.size())
settings.ostr << ", ";
}
}
else if (mode == Mode::DELETE)
{ {
/// It would be better to output "DELETE" here but that will break compatibility with earlier versions. /// It would be better to output "DELETE" here but that will break compatibility with earlier versions.
} }
if (where())
{
settings.ostr << " WHERE ";
where()->formatImpl(settings, state, frame);
}
}
void ASTTTLElement::setExpression(Expression expr, ASTPtr && ast)
{
auto it = positions.find(expr);
if (it == positions.end())
{
positions[expr] = children.size();
children.emplace_back(ast);
}
else
children[it->second] = ast;
}
ASTPtr ASTTTLElement::getExpression(Expression expr, bool clone) const
{
auto it = positions.find(expr);
if (it != positions.end())
return clone ? children[it->second]->clone() : children[it->second];
return {};
} }
} }

View File

@ -6,31 +6,53 @@
namespace DB namespace DB
{ {
/** Element of TTL expression. /** Element of TTL expression.
*/ */
class ASTTTLElement : public IAST class ASTTTLElement : public IAST
{ {
public: public:
enum class Expression : uint8_t
{
TTL,
WHERE
};
enum class Mode : uint8_t
{
DELETE,
MOVE,
GROUP_BY
};
Mode mode;
PartDestinationType destination_type; PartDestinationType destination_type;
String destination_name; String destination_name;
std::vector<String> group_by_key_columns;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations;
ASTTTLElement(PartDestinationType destination_type_, const String & destination_name_) ASTTTLElement(Mode mode_, PartDestinationType destination_type_, const String & destination_name_)
: destination_type(destination_type_) : mode(mode_)
, destination_type(destination_type_)
, destination_name(destination_name_) , destination_name(destination_name_)
{ {
} }
String getID(char) const override { return "TTLElement"; } String getID(char) const override { return "TTLElement"; }
ASTPtr clone() const override ASTPtr clone() const override;
{
auto clone = std::make_shared<ASTTTLElement>(*this); const ASTPtr ttl() const { return getExpression(Expression::TTL); }
clone->cloneChildren(); const ASTPtr where() const { return getExpression(Expression::WHERE); }
return clone;
} void setExpression(Expression expr, ASTPtr && ast);
ASTPtr getExpression(Expression expr, bool clone = false) const;
protected: protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
private:
std::unordered_map<Expression, size_t> positions;
}; };
} }

View File

@ -1455,23 +1455,50 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_to_disk("TO DISK"); ParserKeyword s_to_disk("TO DISK");
ParserKeyword s_to_volume("TO VOLUME"); ParserKeyword s_to_volume("TO VOLUME");
ParserKeyword s_delete("DELETE"); ParserKeyword s_delete("DELETE");
ParserKeyword s_where("WHERE");
ParserKeyword s_group_by("GROUP BY");
ParserKeyword s_set("SET");
ParserToken s_comma(TokenType::Comma);
ParserToken s_eq(TokenType::Equals);
ParserIdentifier parser_identifier;
ParserStringLiteral parser_string_literal; ParserStringLiteral parser_string_literal;
ParserExpression parser_exp; ParserExpression parser_exp;
ParserIdentifierList parser_identifier_list;
ASTPtr expr_elem;
if (!parser_exp.parse(pos, expr_elem, expected)) ASTPtr ttl_expr;
if (!parser_exp.parse(pos, ttl_expr, expected))
return false; return false;
ASTPtr where_expr;
std::vector<String> group_by_key_columns;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations;
ASTTTLElement::Mode mode;
PartDestinationType destination_type = PartDestinationType::DELETE; PartDestinationType destination_type = PartDestinationType::DELETE;
String destination_name; String destination_name;
if (s_to_disk.ignore(pos)) if (s_to_disk.ignore(pos))
{
mode = ASTTTLElement::Mode::MOVE;
destination_type = PartDestinationType::DISK; destination_type = PartDestinationType::DISK;
}
else if (s_to_volume.ignore(pos)) else if (s_to_volume.ignore(pos))
{
mode = ASTTTLElement::Mode::MOVE;
destination_type = PartDestinationType::VOLUME; destination_type = PartDestinationType::VOLUME;
}
else if (s_group_by.ignore(pos))
{
mode = ASTTTLElement::Mode::GROUP_BY;
}
else else
{
s_delete.ignore(pos); s_delete.ignore(pos);
mode = ASTTTLElement::Mode::DELETE;
}
if (destination_type == PartDestinationType::DISK || destination_type == PartDestinationType::VOLUME) if (mode == ASTTTLElement::Mode::MOVE)
{ {
ASTPtr ast_space_name; ASTPtr ast_space_name;
if (!parser_string_literal.parse(pos, ast_space_name, expected)) if (!parser_string_literal.parse(pos, ast_space_name, expected))
@ -1479,10 +1506,57 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
destination_name = ast_space_name->as<ASTLiteral &>().value.get<const String &>(); destination_name = ast_space_name->as<ASTLiteral &>().value.get<const String &>();
} }
else if (mode == ASTTTLElement::Mode::GROUP_BY)
{
ASTPtr ast_group_by_key_columns;
if (!parser_identifier_list.parse(pos, ast_group_by_key_columns, expected))
return false;
for (const auto identifier : ast_group_by_key_columns->children)
{
String identifier_str;
if (!tryGetIdentifierNameInto(identifier, identifier_str))
return false;
group_by_key_columns.emplace_back(std::move(identifier_str));
}
node = std::make_shared<ASTTTLElement>(destination_type, destination_name); if (!s_set.ignore(pos))
node->children.push_back(expr_elem); return false;
while (true)
{
if (!group_by_aggregations.empty() && !s_comma.ignore(pos))
break;
ASTPtr name;
ASTPtr value;
if (!parser_identifier.parse(pos, name, expected))
return false;
if (!s_eq.ignore(pos))
return false;
if (!parser_exp.parse(pos, value, expected))
return false;
String name_str;
if (!tryGetIdentifierNameInto(name, name_str))
return false;
group_by_aggregations.emplace_back(name_str, std::move(value));
}
}
if ((mode == ASTTTLElement::Mode::MOVE || mode == ASTTTLElement::Mode::DELETE) && s_where.ignore(pos))
{
if (!parser_exp.parse(pos, where_expr, expected))
return false;
}
auto ttl_element = std::make_shared<ASTTTLElement>(mode, destination_type, destination_name);
ttl_element->setExpression(ASTTTLElement::Expression::TTL, std::move(ttl_expr));
if (where_expr)
ttl_element->setExpression(ASTTTLElement::Expression::WHERE, std::move(where_expr));
ttl_element->group_by_key_columns = std::move(group_by_key_columns);
ttl_element->group_by_aggregations = std::move(group_by_aggregations);
node = ttl_element;
return true; return true;
} }

View File

@ -742,4 +742,13 @@ bool ParserKeyValuePairsList::parseImpl(Pos & pos, ASTPtr & node, Expected & exp
return parser.parse(pos, node, expected); return parser.parse(pos, node, expected);
} }
bool ParserIdentifierList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
return ParserList(
std::make_unique<ParserIdentifier>(),
std::make_unique<ParserToken>(TokenType::Comma))
.parse(pos, node, expected);
}
} }

View File

@ -421,4 +421,13 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
}; };
/** A comma-separated list of identifiers, probably empty. */
class ParserIdentifierList : public IParserBase
{
protected:
const char * getName() const override { return "list of identifiers"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
} }

View File

@ -615,19 +615,50 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
auto new_column_ttls = new_columns.getColumnTTLs(); auto new_column_ttls = new_columns.getColumnTTLs();
auto create_ttl_entry = [this, &new_columns](ASTPtr ttl_ast) auto create_ttl_entry = [this, &new_columns](ASTPtr ttl_expr_ast)
{ {
TTLEntry result; TTLEntry result;
auto syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_ast, new_columns.getAllPhysical()); auto ttl_syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_expr_ast, new_columns.getAllPhysical());
result.expression = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false); result.expression = ExpressionAnalyzer(ttl_expr_ast, ttl_syntax_result, global_context).getActions(false);
result.result_column = ttl_expr_ast->getColumnName();
result.destination_type = PartDestinationType::DELETE; result.destination_type = PartDestinationType::DELETE;
result.result_column = ttl_ast->getColumnName();
checkTTLExpression(result.expression, result.result_column); checkTTLExpression(result.expression, result.result_column);
return result; return result;
}; };
auto create_rows_ttl_entry = [this, &new_columns, &create_ttl_entry](const ASTTTLElement * ttl_element)
{
auto result = create_ttl_entry(ttl_element->ttl());
if (ttl_element->mode == ASTTTLElement::Mode::DELETE || ttl_element->mode ==ASTTTLElement::Mode::MOVE)
{
if (ASTPtr where_expr_ast = ttl_element->where())
{
auto where_syntax_result = SyntaxAnalyzer(global_context).analyze(where_expr_ast, new_columns.getAllPhysical());
result.where_expression = ExpressionAnalyzer(where_expr_ast, where_syntax_result, global_context).getActions(false);
result.where_filter_column = where_expr_ast->getColumnName();
}
}
else if (ttl_element->mode == ASTTTLElement::Mode::GROUP_BY)
{
result.group_by_keys = ttl_element->group_by_key_columns;
for (auto [name, value] : ttl_element->group_by_aggregations)
{
auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true);
auto expr_analyzer = ExpressionAnalyzer(value, syntax_result, global_context);
result.group_by_aggregations.emplace_back(name, expr_analyzer.getActions(false));
result.group_by_aggregations_res_column.emplace_back(name, value->getColumnName());
for (const auto descr : expr_analyzer.getAnalyzedData().aggregate_descriptions)
result.aggregate_descriptions.push_back(descr);
}
}
return result;
};
if (!new_column_ttls.empty()) if (!new_column_ttls.empty())
{ {
NameSet columns_ttl_forbidden; NameSet columns_ttl_forbidden;
@ -672,7 +703,8 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION); throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION);
} }
auto new_rows_ttl_entry = create_ttl_entry(ttl_element->children[0]); LOG_DEBUG(log, "ttl_element->size is " << ttl_element->size());
auto new_rows_ttl_entry = create_rows_ttl_entry(ttl_element);
if (!only_check) if (!only_check)
update_rows_ttl_entry = new_rows_ttl_entry; update_rows_ttl_entry = new_rows_ttl_entry;
@ -680,7 +712,7 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns,
} }
else else
{ {
auto new_ttl_entry = create_ttl_entry(ttl_element->children[0]); auto new_ttl_entry = create_rows_ttl_entry(ttl_element);
new_ttl_entry.entry_ast = ttl_element_ptr; new_ttl_entry.entry_ast = ttl_element_ptr;
new_ttl_entry.destination_type = ttl_element->destination_type; new_ttl_entry.destination_type = ttl_element->destination_type;

View File

@ -20,6 +20,7 @@
#include <Storages/MergeTree/MergeTreePartsMover.h> #include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Interpreters/PartLog.h> #include <Interpreters/PartLog.h>
#include <Disks/StoragePolicy.h> #include <Disks/StoragePolicy.h>
#include <Interpreters/Aggregator.h>
#include <boost/multi_index_container.hpp> #include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp> #include <boost/multi_index/ordered_index.hpp>
@ -652,6 +653,14 @@ public:
ExpressionActionsPtr expression; ExpressionActionsPtr expression;
String result_column; String result_column;
ExpressionActionsPtr where_expression;
String where_filter_column;
Names group_by_keys;
std::vector<std::pair<String, ExpressionActionsPtr>> group_by_aggregations;
std::vector<std::pair<String, String>> group_by_aggregations_res_column;
AggregateDescriptions aggregate_descriptions;
/// Name and type of a destination are only valid in table-level context. /// Name and type of a destination are only valid in table-level context.
PartDestinationType destination_type; PartDestinationType destination_type;
String destination_name; String destination_name;