This commit is contained in:
Andrey Zvonov 2024-09-19 08:20:07 +02:00 committed by GitHub
commit ea82de6959
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 115 additions and 4 deletions

View File

@ -857,7 +857,10 @@ void MutationsInterpreter::prepare(bool dry_run)
else if (command.type == MutationCommand::MATERIALIZE_TTL)
{
mutation_kind.set(MutationKind::MUTATE_OTHER);
if (materialize_ttl_recalculate_only)
bool suitable_for_ttl_optimization = source.getMergeTreeData()->getSettings()->ttl_only_drop_parts
&& metadata_snapshot->hasOnlyRowsTTL();
if (materialize_ttl_recalculate_only || suitable_for_ttl_optimization)
{
// just recalculate ttl_infos without remove expired data
auto all_columns_vec = all_columns.getNames();

View File

@ -119,6 +119,7 @@ static void splitAndModifyMutationCommands(
const MutationCommands & commands,
MutationCommands & for_interpreter,
MutationCommands & for_file_renames,
bool suitable_for_ttl_optimization,
LoggerPtr log)
{
auto part_columns = part->getColumnsDescription();
@ -128,6 +129,7 @@ static void splitAndModifyMutationCommands(
{
NameSet mutated_columns;
NameSet dropped_columns;
NameSet ignored_columns;
for (const auto & command : commands)
{
@ -153,6 +155,15 @@ static void splitAndModifyMutationCommands(
for_interpreter.push_back(command);
for (const auto & [column_name, expr] : command.column_to_update_expression)
mutated_columns.emplace(column_name);
if (command.type == MutationCommand::Type::MATERIALIZE_TTL && suitable_for_ttl_optimization)
{
for (const auto & col : part_columns)
{
if (!mutated_columns.contains(col.name))
ignored_columns.emplace(col.name);
}
}
}
else if (command.type == MutationCommand::Type::DROP_INDEX
|| command.type == MutationCommand::Type::DROP_PROJECTION
@ -213,7 +224,7 @@ static void splitAndModifyMutationCommands(
/// from disk we just don't read dropped columns
for (const auto & column : part_columns)
{
if (!mutated_columns.contains(column.name))
if (!mutated_columns.contains(column.name) && !ignored_columns.contains(column.name))
{
if (!metadata_snapshot->getColumns().has(column.name) && !part->storage.getVirtualsPtr()->has(column.name))
{
@ -1884,6 +1895,82 @@ private:
std::unique_ptr<PartMergerWriter> part_merger_writer_task{nullptr};
};
/*
* Decorator that'll drop expired parts by replacing them with empty ones.
* Main use case (only use case for now) is to decorate `MutateSomePartColumnsTask`,
* which is used to recalculate TTL. If the part is expired, this class will replace it with
* an empty one.
*
* Triggered when `ttl_only_drop_parts` is set and the only TTL is rows TTL.
* */
class ExecutableTaskDropTTLExpiredPartsDecorator : public IExecutableTask
{
public:
explicit ExecutableTaskDropTTLExpiredPartsDecorator(
std::unique_ptr<IExecutableTask> executable_task_,
MutationContextPtr ctx_
)
: executable_task(std::move(executable_task_)), ctx(ctx_) {}
void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); }
bool executeStep() override
{
switch (state)
{
case State::NEED_EXECUTE:
{
if (executable_task->executeStep())
return true;
if (isRowsMaxTTLExpired())
replacePartWithEmpty();
state = State::SUCCESS;
return true;
}
case State::SUCCESS:
{
return false;
}
}
return false;
}
private:
enum class State
{
NEED_EXECUTE,
SUCCESS
};
State state{State::NEED_EXECUTE};
std::unique_ptr<IExecutableTask> executable_task;
MutationContextPtr ctx;
bool isRowsMaxTTLExpired() const
{
const auto ttl = ctx->new_data_part->ttl_infos.table_ttl;
return ttl.max && ttl.max <= ctx->time_of_mutation;
}
void replacePartWithEmpty()
{
MergeTreePartInfo part_info = ctx->new_data_part->info;
part_info.level += 1;
MergeTreePartition partition = ctx->new_data_part->partition;
std::string part_name = ctx->new_data_part->getNewName(part_info);
auto [mutable_empty_part, _] = ctx->data->createEmptyPart(part_info, partition, part_name, ctx->txn);
ctx->new_data_part = std::move(mutable_empty_part);
}
};
MutateTask::MutateTask(
FutureMergedMutatedPartPtr future_part_,
@ -2122,6 +2209,7 @@ bool MutateTask::prepare()
context_for_reading->setSetting("max_streams_for_merge_tree_reading", Field(0));
context_for_reading->setSetting("read_from_filesystem_cache_if_exists_otherwise_bypass_cache", 1);
bool suitable_for_ttl_optimization = ctx->metadata_snapshot->hasOnlyRowsTTL() && ctx->data->getSettings()->ttl_only_drop_parts;
MutationHelpers::splitAndModifyMutationCommands(
ctx->source_part,
ctx->metadata_snapshot,
@ -2129,6 +2217,7 @@ bool MutateTask::prepare()
ctx->commands_for_part,
ctx->for_interpreter,
ctx->for_file_renames,
suitable_for_ttl_optimization,
ctx->log);
ctx->stage_progress = std::make_unique<MergeStageProgress>(1.0);
@ -2235,7 +2324,12 @@ bool MutateTask::prepare()
/// The blobs have to be removed along with the part, this temporary part owns them and does not share them yet.
ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
task = std::make_unique<MutateAllPartColumnsTask>(ctx);
bool drop_expired_parts = suitable_for_ttl_optimization && !ctx->data->getSettings()->materialize_ttl_recalculate_only;
if (drop_expired_parts)
task = std::make_unique<ExecutableTaskDropTTLExpiredPartsDecorator>(std::make_unique<MutateAllPartColumnsTask>(ctx), ctx);
else
task = std::make_unique<MutateAllPartColumnsTask>(ctx);
ProfileEvents::increment(ProfileEvents::MutationAllPartColumns);
}
else /// TODO: check that we modify only non-key columns in this case.
@ -2295,7 +2389,12 @@ bool MutateTask::prepare()
/// Keeper has to be asked with unlock request to release the references to the blobs
ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::ASK_KEEPER;
task = std::make_unique<MutateSomePartColumnsTask>(ctx);
bool drop_expired_parts = suitable_for_ttl_optimization && !ctx->data->getSettings()->materialize_ttl_recalculate_only;
if (drop_expired_parts)
task = std::make_unique<ExecutableTaskDropTTLExpiredPartsDecorator>(std::make_unique<MutateSomePartColumnsTask>(ctx), ctx);
else
task = std::make_unique<MutateSomePartColumnsTask>(ctx);
ProfileEvents::increment(ProfileEvents::MutationSomePartColumns);
}

View File

@ -260,6 +260,12 @@ bool StorageInMemoryMetadata::hasAnyTableTTL() const
return hasAnyMoveTTL() || hasRowsTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL();
}
bool StorageInMemoryMetadata::hasOnlyRowsTTL() const
{
bool has_any_other_ttl = hasAnyMoveTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL() || hasAnyColumnTTL();
return hasRowsTTL() && !has_any_other_ttl;
}
TTLColumnsDescription StorageInMemoryMetadata::getColumnTTLs() const
{
return column_ttls_by_name;

View File

@ -144,6 +144,9 @@ struct StorageInMemoryMetadata
/// Returns true if there is set table TTL, any column TTL or any move TTL.
bool hasAnyTTL() const { return hasAnyColumnTTL() || hasAnyTableTTL(); }
/// Returns true if only rows TTL is set, not even rows where.
bool hasOnlyRowsTTL() const;
/// Common tables TTLs (for rows and moves).
TTLTableDescription getTableTTLs() const;
bool hasAnyTableTTL() const;