From 5f38e17941720505c861a3e6830523c27e6435db Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Fri, 30 Dec 2022 16:20:06 +0000 Subject: [PATCH] Fix integration test. --- src/Interpreters/MutationsInterpreter.cpp | 5 +++ src/Interpreters/MutationsInterpreter.h | 1 + src/Storages/StorageMergeTree.cpp | 46 +++++++++++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index b95cf913eff..e963a453d50 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -1295,6 +1295,11 @@ const ColumnDependencies & MutationsInterpreter::getColumnDependencies() const return dependencies; } +size_t MutationsInterpreter::evaluateCommandsSize() +{ + return prepareQueryAffectedAST(commands, source.getStorage(), context)->size(); +} + std::optional MutationsInterpreter::getStorageSortDescriptionIfPossible(const Block & header) const { Names sort_columns = metadata_snapshot->getSortingKeyColumns(); diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index d5727dd3e53..53a85c02ec5 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -62,6 +62,7 @@ public: bool return_deleted_rows_ = false); void validate(); + size_t evaluateCommandsSize(); /// The resulting stream will return blocks containing only changed columns and columns, that we need to recalculate indices. QueryPipelineBuilder execute(); diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 1e511b53e72..a57b4afda7d 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -978,6 +978,8 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( return {}; } + size_t max_ast_elements = getContext()->getSettingsRef().max_expanded_ast_elements; + auto future_part = std::make_shared(); if (storage_settings.get()->assign_part_uuids) future_part->uuid = UUIDHelpers::generateV4(); @@ -1019,6 +1021,7 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( } auto commands = std::make_shared(); + size_t current_ast_elements = 0; auto last_mutation_to_apply = mutations_end_it; for (auto it = mutations_begin_it; it != mutations_end_it; ++it) { @@ -1026,6 +1029,49 @@ MergeMutateSelectedEntryPtr StorageMergeTree::selectPartsToMutate( if (first_mutation_tid != it->second.tid) break; + size_t commands_size = 0; + MutationCommands commands_for_size_validation; + for (const auto & command : it->second.commands) + { + if (command.type != MutationCommand::Type::DROP_COLUMN + && command.type != MutationCommand::Type::DROP_INDEX + && command.type != MutationCommand::Type::DROP_PROJECTION + && command.type != MutationCommand::Type::RENAME_COLUMN) + { + commands_for_size_validation.push_back(command); + } + else + { + commands_size += command.ast->size(); + } + } + + if (!commands_for_size_validation.empty()) + { + try + { + auto fake_query_context = Context::createCopy(getContext()); + fake_query_context->makeQueryContext(); + fake_query_context->setCurrentQueryId(""); + MutationsInterpreter interpreter( + shared_from_this(), metadata_snapshot, commands_for_size_validation, fake_query_context, false); + commands_size += interpreter.evaluateCommandsSize(); + } + catch (...) + { + tryLogCurrentException(log); + MergeTreeMutationEntry & entry = it->second; + entry.latest_fail_time = time(nullptr); + entry.latest_fail_reason = getCurrentExceptionMessage(false); + /// NOTE we should not skip mutations, because exception may be retryable (e.g. MEMORY_LIMIT_EXCEEDED) + break; + } + } + + if (current_ast_elements + commands_size >= max_ast_elements) + break; + + current_ast_elements += commands_size; commands->insert(commands->end(), it->second.commands.begin(), it->second.commands.end()); last_mutation_to_apply = it; }