Fix integration test.

This commit is contained in:
Nikolai Kochetov 2022-12-30 16:20:06 +00:00
parent 3c02e208c8
commit 5f38e17941
3 changed files with 52 additions and 0 deletions

View File

@ -1295,6 +1295,11 @@ const ColumnDependencies & MutationsInterpreter::getColumnDependencies() const
return dependencies;
}
size_t MutationsInterpreter::evaluateCommandsSize()
{
return prepareQueryAffectedAST(commands, source.getStorage(), context)->size();
}
std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIfPossible(const Block & header) const
{
Names sort_columns = metadata_snapshot->getSortingKeyColumns();

View File

@ -62,6 +62,7 @@ public:
bool return_deleted_rows_ = false);
void validate();
size_t evaluateCommandsSize();
/// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices.
QueryPipelineBuilder execute();

View File

@ -978,6 +978,8 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
return {};
}
size_t max_ast_elements = getContext()->getSettingsRef().max_expanded_ast_elements;
auto future_part = std::make_shared<FutureMergedMutatedPart>();
if (storage_settings.get()->assign_part_uuids)
future_part->uuid = UUIDHelpers::generateV4();
@ -1019,6 +1021,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
}
auto commands = std::make_shared<MutationCommands>();
size_t current_ast_elements = 0;
auto last_mutation_to_apply = mutations_end_it;
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
{
@ -1026,6 +1029,49 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate(
if (first_mutation_tid != it->second.tid)
break;
size_t commands_size = 0;
MutationCommands commands_for_size_validation;
for (const auto & command : it->second.commands)
{
if (command.type != MutationCommand::Type::DROP_COLUMN
&& command.type != MutationCommand::Type::DROP_INDEX
&& command.type != MutationCommand::Type::DROP_PROJECTION
&& command.type != MutationCommand::Type::RENAME_COLUMN)
{
commands_for_size_validation.push_back(command);
}
else
{
commands_size += command.ast->size();
}
}
if (!commands_for_size_validation.empty())
{
try
{
auto fake_query_context = Context::createCopy(getContext());
fake_query_context->makeQueryContext();
fake_query_context->setCurrentQueryId("");
MutationsInterpreter interpreter(
shared_from_this(), metadata_snapshot, commands_for_size_validation, fake_query_context, false);
commands_size += interpreter.evaluateCommandsSize();
}
catch (...)
{
tryLogCurrentException(log);
MergeTreeMutationEntry & entry = it->second;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = getCurrentExceptionMessage(false);
/// NOTE we should not skip mutations, because exception may be retryable (e.g. MEMORY_LIMIT_EXCEEDED)
break;
}
}
if (current_ast_elements + commands_size >= max_ast_elements)
break;
current_ast_elements += commands_size;
commands->insert(commands->end(), it->second.commands.begin(), it->second.commands.end());
last_mutation_to_apply = it;
}