better TTL calculations with mutations

This commit is contained in:
CurtizJ 2020-02-18 22:03:40 +03:00
parent 49c80c88ef
commit b0f7905567
12 changed files with 169 additions and 64 deletions

View File

@ -282,6 +282,8 @@ private:
auto modify_ttl = std::make_unique<Node>("MODIFY TTL", next_flag++, TABLE_LEVEL);
modify_ttl->aliases.push_back("ALTER MODIFY TTL");
auto materialize_ttl = std::make_unique<Node>("MATERIALIZE TTL", next_flag++, TABLE_LEVEL);
materialize_ttl->aliases.push_back("ALTER MATERIALIZE TTL");
auto modify_setting = std::make_unique<Node>("MODIFY SETTING", next_flag++, TABLE_LEVEL);
modify_setting->aliases.push_back("ALTER MODIFY SETTING");
@ -293,7 +295,7 @@ private:
auto freeze_partition = std::make_unique<Node>("FREEZE PARTITION", next_flag++, TABLE_LEVEL);
ext::push_back(freeze_partition->aliases, "ALTER FREEZE PARTITION");
auto alter_table = std::make_unique<Node>("ALTER TABLE", std::move(update), std::move(delet), std::move(alter_column), std::move(index), std::move(alter_constraint), std::move(modify_ttl), std::move(modify_setting), std::move(move_partition), std::move(fetch_partition), std::move(freeze_partition));
auto alter_table = std::make_unique<Node>("ALTER TABLE", std::move(update), std::move(delet), std::move(alter_column), std::move(index), std::move(alter_constraint), std::move(modify_ttl), std::move(materialize_ttl), std::move(modify_setting), std::move(move_partition), std::move(fetch_partition), std::move(freeze_partition));
auto refresh_view = std::make_unique<Node>("REFRESH VIEW", next_flag++, VIEW_LEVEL);
ext::push_back(refresh_view->aliases, "ALTER LIVE VIEW REFRESH");

View File

@ -45,6 +45,7 @@ enum class AccessType
ALTER_CONSTRAINT, /// allows to execute ALTER {ADD|DROP} CONSTRAINT
MODIFY_TTL, /// allows to execute ALTER MODIFY TTL
MATERIALIZE_TTL, /// allows to execute ALTER MATERIALIZE TTL
MODIFY_SETTING, /// allows to execute ALTER MODIFY SETTING
MOVE_PARTITION,
@ -214,6 +215,7 @@ namespace impl
ACCESS_TYPE_TO_KEYWORD_CASE(ALTER_CONSTRAINT);
ACCESS_TYPE_TO_KEYWORD_CASE(MODIFY_TTL);
ACCESS_TYPE_TO_KEYWORD_CASE(MATERIALIZE_TTL);
ACCESS_TYPE_TO_KEYWORD_CASE(MODIFY_SETTING);
ACCESS_TYPE_TO_KEYWORD_CASE(MOVE_PARTITION);

View File

@ -214,6 +214,11 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccess() const
required_access.emplace_back(AccessType::MODIFY_TTL, alter.database, alter.table);
break;
}
case ASTAlterCommand::MATERIALIZE_TTL:
{
required_access.emplace_back(AccessType::MATERIALIZE_TTL, alter.database, alter.table);
break;
}
case ASTAlterCommand::MODIFY_SETTING:
{
required_access.emplace_back(AccessType::MODIFY_SETTING, alter.database, alter.table);

View File

@ -315,7 +315,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
validateUpdateColumns(storage, updated_columns, column_to_affected_materialized);
}
/// Columns, that we need to read for calculation of skip indices or TTL expressions
/// Columns, that we need to read for calculation of skip indices or TTL expressions.
auto dependencies = getAllColumnDependencies(storage, updated_columns);
/// First, break a sequence of commands into stages.
@ -394,18 +394,38 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
}
else if (command.type == MutationCommand::MATERIALIZE_TTL)
{
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
if (storage->hasRowsTTL())
{
for (const auto & column : all_columns)
dependencies.emplace(column.name, ColumnDependency::TTL_TARGET);
}
else
{
NameSet new_updated_columns;
auto column_ttls = storage->getColumns().getColumnTTLs();
for (const auto & elem : column_ttls)
{
dependencies.emplace(elem.first, ColumnDependency::TTL_TARGET);
new_updated_columns.insert(elem.first);
}
auto all_columns_vec = all_columns.getNames();
dependencies = getAllColumnDependencies(storage, NameSet(all_columns_vec.begin(), all_columns_vec.end()));
auto all_columns_vec = all_columns.getNames();
auto all_dependencies = getAllColumnDependencies(storage, NameSet(all_columns_vec.begin(), all_columns_vec.end()));
for (const auto & dependency : dependencies)
if (dependency.kind == ColumnDependency::TTL_TARGET)
stages.back().column_to_updated.emplace(
dependency.column_name, std::make_shared<ASTIdentifier>(dependency.column_name));
for (const auto & dependency : all_dependencies)
{
if (dependency.kind == ColumnDependency::TTL_EXPRESSION)
dependencies.insert(dependency);
}
/// Recalc only skip indices of columns, that could be updated by TTL.
auto new_dependencies = storage->getColumnDependencies(new_updated_columns);
for (const auto & dependency : new_dependencies)
{
if (dependency.kind == ColumnDependency::SKIP_INDEX)
dependencies.insert(dependency);
}
}
}
else
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);
@ -413,35 +433,58 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
/// We cares about affected indices because we also need to rewrite them
/// when one of index columns updated or filtered with delete.
/// The same about colums, that are need for calculation of TTL expressions.
/// The same about colums, that are needed for calculation of TTL expressions.
if (!dependencies.empty())
{
if (!stages.empty())
{
std::vector<Stage> stages_copy;
/// Copy all filled stages except index calculation stage.
for (const auto & stage : stages)
{
stages_copy.emplace_back(context);
stages_copy.back().column_to_updated = stage.column_to_updated;
stages_copy.back().output_columns = stage.output_columns;
stages_copy.back().filters = stage.filters;
}
const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true);
InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()};
auto first_stage_header = interpreter.getSampleBlock();
auto in = std::make_shared<NullBlockInputStream>(first_stage_header);
updated_header = std::make_unique<Block>(addStreamsForLaterStages(stages_copy, in)->getHeader());
}
/// Special step to recalculate affected indices and TTL expressions.
stages.emplace_back(context);
NameSet changed_columns;
NameSet unchanged_columns;
for (const auto & dependency : dependencies)
{
if (dependency.isReadOnly())
unchanged_columns.insert(dependency.column_name);
else
changed_columns.insert(dependency.column_name);
}
if (!changed_columns.empty())
{
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
for (const auto & column : changed_columns)
stages.back().column_to_updated.emplace(
dependency.column_name, std::make_shared<ASTIdentifier>(dependency.column_name));
column, std::make_shared<ASTIdentifier>(column));
}
if (!unchanged_columns.empty())
{
if (!stages.empty())
{
std::vector<Stage> stages_copy;
/// Copy all filled stages except index calculation stage.
for (const auto & stage : stages)
{
stages_copy.emplace_back(context);
stages_copy.back().column_to_updated = stage.column_to_updated;
stages_copy.back().output_columns = stage.output_columns;
stages_copy.back().filters = stage.filters;
}
const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true);
InterpreterSelectQuery interpreter{select_query, context, storage, SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()};
auto first_stage_header = interpreter.getSampleBlock();
auto in = std::make_shared<NullBlockInputStream>(first_stage_header);
updated_header = std::make_unique<Block>(addStreamsForLaterStages(stages_copy, in)->getHeader());
}
/// Special step to recalculate affected indices and TTL expressions.
stages.emplace_back(context);
for (const auto & column : unchanged_columns)
stages.back().column_to_updated.emplace(
column, std::make_shared<ASTIdentifier>(column));
}
}

View File

@ -9,11 +9,11 @@ namespace DB
struct ColumnDependency
{
enum Kind
enum Kind : UInt8
{
SKIP_INDEX,
TTL_EXPRESSION,
TTL_TARGET
SKIP_INDEX = 1,
TTL_EXPRESSION = 2,
TTL_TARGET = 4
};
ColumnDependency(const String & column_name_, Kind kind_)

View File

@ -129,9 +129,12 @@ public:
/// Example is StorageSystemNumbers.
virtual bool hasEvenlyDistributedRead() const { return false; }
/// Returns true if there is set table TTL, any column TTL or any move TTL
/// Returns true if there is set table TTL, any column TTL or any move TTL.
virtual bool hasAnyTTL() const { return false; }
/// Returns true if there is set TTL for rows.
virtual bool hasRowsTTL() const { return false; }
/// Optional size information of each physical column.
/// Currently it's only used by the MergeTree family for query optimizations.
using ColumnSizeByName = std::unordered_map<std::string, ColumnSize>;
@ -459,7 +462,7 @@ public:
/// Returns columns that could be updated by applying TTL rules
virtual Names getColumnsUpdatedByTTL() const { return {}; }
/// Retuns columns, whose dependencies (skip indices, TTL expressions)
/// Returns columns, whose dependencies (skip indices, TTL expressions)
/// would be affected if we will update @updated_columns set of columns.
virtual ColumnDependencies getColumnDependencies(const NameSet & /* updated_columns */) const { return {}; }

View File

@ -3955,11 +3955,6 @@ ColumnDependencies MergeTreeData::getColumnDependencies(const NameSet & updated_
{
if (add_dependent_columns(entry.expression, required_ttl_columns))
updated_ttl_columns.insert(name);
else if (updated_columns.count(name))
{
updated_ttl_columns.insert(name);
add_dependent_columns(entry.expression, required_ttl_columns);
}
}
for (const auto & entry : move_ttl_entries)

View File

@ -583,7 +583,7 @@ public:
bool hasAnyColumnTTL() const { return !column_ttl_entries_by_name.empty(); }
bool hasAnyMoveTTL() const { return !move_ttl_entries.empty(); }
bool hasRowsTTL() const { return !rows_ttl_entry.isEmpty(); }
bool hasRowsTTL() const override { return !rows_ttl_entry.isEmpty(); }
bool hasAnyTTL() const override { return hasRowsTTL() || hasAnyMoveTTL() || hasAnyColumnTTL(); }
/// Check that the part is not broken and calculate the checksums for it if they are not present.

View File

@ -998,21 +998,24 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
MergeStageProgress stage_progress(1.0);
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
bool need_remove_expired_values = false;
if (data.hasAnyTTL())
auto should_execute_ttl = [&](const auto & in_columns)
{
auto in_columns = in_header.getNames();
if (!data.hasAnyTTL())
return false;
for (const auto & command : commands_for_part)
if (command.type == MutationCommand::MATERIALIZE_TTL)
return true;
auto dependencies = data.getColumnDependencies(NameSet(in_columns.begin(), in_columns.end()));
for (const auto & dependency : dependencies)
{
if (dependency.kind == ColumnDependency::TTL_EXPRESSION
|| dependency.kind == ColumnDependency::TTL_TARGET)
{
need_remove_expired_values = true;
break;
}
}
}
if (dependency.kind == ColumnDependency::TTL_EXPRESSION || dependency.kind == ColumnDependency::TTL_TARGET)
return true;
return false;
};
bool need_remove_expired_values = should_execute_ttl(in_header.getNamesAndTypesList().getNames());
if (updated_header.columns() == all_columns.size())
{

View File

@ -48,6 +48,7 @@ public:
}
bool hasAnyTTL() const override { return part->storage.hasAnyTTL(); }
bool hasRowsTTL() const override { return part->storage.hasRowsTTL(); }
ColumnDependencies getColumnDependencies(const NameSet & updated_columns) const override
{

View File

@ -1,2 +1,18 @@
2100-10-10 3
2100-10-10 4
1 a
3 c
1 a
2
3 c
4
1
2
3
4
1 a
2 b
4 d
1
2
4 d

View File

@ -1,5 +1,3 @@
set mutations_sync = 2;
drop table if exists ttl;
create table ttl (d Date, a Int) engine = MergeTree order by a partition by toDayOfMonth(d);
@ -11,9 +9,46 @@ insert into ttl values (toDateTime('2100-10-10 00:00:00'), 4);
alter table ttl materialize ttl; -- { serverError 80 }
alter table ttl modify ttl d + interval 1 day;
alter table ttl materialize ttl;
alter table ttl materialize ttl settings mutations_sync=2;
select * from ttl order by a;
drop table if exists ttl;
create table ttl (i Int, s String) engine = MergeTree order by i;
insert into ttl values (1, 'a') (2, 'b') (3, 'c') (4, 'd');
alter table ttl modify ttl i % 2 = 0 ? today() - 10 : toDate('2100-01-01');
alter table ttl materialize ttl settings mutations_sync=2;
select * from ttl order by i;
alter table ttl modify ttl toDate('2000-01-01');
alter table ttl materialize ttl settings mutations_sync=2;
select * from ttl order by i;
drop table if exists ttl;
create table ttl (i Int, s String) engine = MergeTree order by i;
insert into ttl values (1, 'a') (2, 'b') (3, 'c') (4, 'd');
alter table ttl modify column s String ttl i % 2 = 0 ? today() - 10 : toDate('2100-01-01');
alter table ttl materialize ttl settings mutations_sync=2;
select * from ttl order by i;
alter table ttl modify column s String ttl toDate('2000-01-01');
alter table ttl materialize ttl settings mutations_sync=2;
select * from ttl order by i;
drop table if exists ttl;
create table ttl (d Date, i Int, s String) engine = MergeTree order by i;
insert into ttl values (toDate('2000-01-02'), 1, 'a') (toDate('2000-01-03'), 2, 'b') (toDate('2080-01-01'), 3, 'c') (toDate('2080-01-03'), 4, 'd');
alter table ttl modify ttl i % 3 = 0 ? today() - 10 : toDate('2100-01-01');
alter table ttl materialize ttl settings mutations_sync=2;
select i, s from ttl order by i;
alter table ttl modify column s String ttl d + interval 1 month;
alter table ttl materialize ttl settings mutations_sync=2;
select i, s from ttl order by i;
drop table if exists ttl;