diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 16a47a21184..d22f6e1574b 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -41,6 +41,7 @@ class TaskStatsInfoGetter; class InternalTextLogsQueue; struct ViewRuntimeData; class QueryViewsLog; +class MemoryTrackerThreadSwitcher; using InternalTextLogsQueuePtr = std::shared_ptr; using InternalTextLogsQueueWeakPtr = std::weak_ptr; @@ -166,6 +167,13 @@ protected: /// It is used to avoid enabling the query profiler when you have multiple ThreadStatus in the same thread bool query_profiled_enabled = true; + /// Requires access to query_id. + friend class MemoryTrackerThreadSwitcher; + void setQueryId(const String & query_id_) + { + query_id = query_id_; + } + public: ThreadStatus(); ~ThreadStatus(); diff --git a/src/Interpreters/StorageID.cpp b/src/Interpreters/StorageID.cpp index bfb969cab25..2e76618e4c0 100644 --- a/src/Interpreters/StorageID.cpp +++ b/src/Interpreters/StorageID.cpp @@ -110,7 +110,7 @@ StorageID StorageID::fromDictionaryConfig(const Poco::Util::AbstractConfiguratio return res; } -String StorageID::getInternalDictionaryName() const +String StorageID::getShortName() const { assertNotEmpty(); if (hasUUID()) diff --git a/src/Interpreters/StorageID.h b/src/Interpreters/StorageID.h index 623f481fbc5..f1fcfde25c0 100644 --- a/src/Interpreters/StorageID.h +++ b/src/Interpreters/StorageID.h @@ -91,7 +91,9 @@ struct StorageID /// If dictionary has UUID, then use it as dictionary name in ExternalLoader to allow dictionary renaming. /// ExternalDictnariesLoader::resolveDictionaryName(...) should be used to access such dictionaries by name. - String getInternalDictionaryName() const; + String getInternalDictionaryName() const { return getShortName(); } + /// Get short, but unique, name. + String getShortName() const; private: StorageID() = default; diff --git a/src/Storages/MergeTree/MergeList.cpp b/src/Storages/MergeTree/MergeList.cpp index 07aabf64dfd..366834b8f09 100644 --- a/src/Storages/MergeTree/MergeList.cpp +++ b/src/Storages/MergeTree/MergeList.cpp @@ -10,7 +10,8 @@ namespace DB { -MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MemoryTracker * memory_tracker_ptr, UInt64 untracked_memory_limit) +MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MergeListEntry & merge_list_entry_) + : merge_list_entry(merge_list_entry_) { // Each merge is executed into separate background processing pool thread background_thread_memory_tracker = CurrentThread::getMemoryTracker(); @@ -29,11 +30,19 @@ MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MemoryTracker * memory_ } background_thread_memory_tracker_prev_parent = background_thread_memory_tracker->getParent(); - background_thread_memory_tracker->setParent(memory_tracker_ptr); + background_thread_memory_tracker->setParent(&merge_list_entry->memory_tracker); } prev_untracked_memory_limit = current_thread->untracked_memory_limit; - current_thread->untracked_memory_limit = untracked_memory_limit; + current_thread->untracked_memory_limit = merge_list_entry->max_untracked_memory; + + /// Avoid accounting memory from another mutation/merge + /// (NOTE: consider moving such code to ThreadFromGlobalPool and related places) + prev_untracked_memory = current_thread->untracked_memory; + current_thread->untracked_memory = merge_list_entry->untracked_memory; + + prev_query_id = current_thread->getQueryId().toString(); + current_thread->setQueryId(merge_list_entry->query_id); } @@ -45,6 +54,11 @@ MemoryTrackerThreadSwitcher::~MemoryTrackerThreadSwitcher() background_thread_memory_tracker->setParent(background_thread_memory_tracker_prev_parent); current_thread->untracked_memory_limit = prev_untracked_memory_limit; + + merge_list_entry->untracked_memory = current_thread->untracked_memory; + current_thread->untracked_memory = prev_untracked_memory; + + current_thread->setQueryId(prev_query_id); } MergeListElement::MergeListElement( @@ -60,6 +74,7 @@ MergeListElement::MergeListElement( , result_part_info{future_part->part_info} , num_parts{future_part->parts.size()} , max_untracked_memory(max_untracked_memory_) + , query_id(table_id.getShortName() + "::" + result_part_name) , thread_id{getThreadId()} , merge_type{future_part->merge_type} , merge_algorithm{MergeAlgorithm::Undecided} diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index 95e96b5a73e..4139e9599aa 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -55,6 +55,10 @@ struct MergeInfo struct FutureMergedMutatedPart; using FutureMergedMutatedPartPtr = std::shared_ptr; +struct MergeListElement; +using MergeListEntry = BackgroundProcessListEntry; + + /** * Since merge is executed with multiple threads, this class * switches the parent MemoryTracker to account all the memory used. @@ -62,12 +66,15 @@ using FutureMergedMutatedPartPtr = std::shared_ptr; class MemoryTrackerThreadSwitcher : boost::noncopyable { public: - explicit MemoryTrackerThreadSwitcher(MemoryTracker * memory_tracker_ptr, UInt64 untracked_memory_limit); + explicit MemoryTrackerThreadSwitcher(MergeListEntry & merge_list_entry_); ~MemoryTrackerThreadSwitcher(); private: + MergeListEntry & merge_list_entry; MemoryTracker * background_thread_memory_tracker; MemoryTracker * background_thread_memory_tracker_prev_parent = nullptr; UInt64 prev_untracked_memory_limit; + UInt64 prev_untracked_memory; + String prev_query_id; }; using MemoryTrackerThreadSwitcherPtr = std::unique_ptr; @@ -105,7 +112,12 @@ struct MergeListElement : boost::noncopyable std::atomic columns_written{}; MemoryTracker memory_tracker{VariableContext::Process}; + /// Used to adjust ThreadStatus::untracked_memory_limit UInt64 max_untracked_memory; + /// Used to avoid losing any allocation context + UInt64 untracked_memory = 0; + /// Used for identifying mutations/merges in trace_log + std::string query_id; UInt64 thread_id; MergeType merge_type; @@ -126,8 +138,6 @@ struct MergeListElement : boost::noncopyable ~MergeListElement(); }; -using MergeListEntry = BackgroundProcessListEntry; - /** Maintains a list of currently running merges. * For implementation of system.merges table. */ diff --git a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp index 3129ae9aa35..1cbf928047c 100644 --- a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp @@ -30,7 +30,7 @@ bool MergePlainMergeTreeTask::executeStep() /// Make out memory tracker a parent of current thread memory tracker MemoryTrackerThreadSwitcherPtr switcher; if (merge_list_entry) - switcher = std::make_unique(&(*merge_list_entry)->memory_tracker, (*merge_list_entry)->max_untracked_memory); + switcher = std::make_unique(*merge_list_entry); switch (state) { @@ -87,6 +87,7 @@ void MergePlainMergeTreeTask::prepare() write_part_log = [this] (const ExecutionStatus & execution_status) { + merge_task.reset(); storage.writePartLog( PartLogElement::MERGE_PARTS, execution_status, diff --git a/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp index 1f4b387a3a3..3359693fa22 100644 --- a/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp @@ -40,6 +40,7 @@ void MutatePlainMergeTreeTask::prepare() write_part_log = [this] (const ExecutionStatus & execution_status) { + mutate_task.reset(); storage.writePartLog( PartLogElement::MUTATE_PART, execution_status, @@ -61,7 +62,7 @@ bool MutatePlainMergeTreeTask::executeStep() /// Make out memory tracker a parent of current thread memory tracker MemoryTrackerThreadSwitcherPtr switcher; if (merge_list_entry) - switcher = std::make_unique(&(*merge_list_entry)->memory_tracker, (*merge_list_entry)->max_untracked_memory); + switcher = std::make_unique(*merge_list_entry); switch (state) { diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 66d0f5f7e49..40037c38779 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -766,7 +766,6 @@ private: State state{State::NEED_PREPARE}; MutationContextPtr ctx; - Block block; size_t block_num = 0; using ProjectionNameToItsBlocks = std::map; @@ -986,6 +985,7 @@ private: ctx->mutating_pipeline.reset(); static_pointer_cast(ctx->out)->writeSuffixAndFinalizePart(ctx->new_data_part, ctx->need_sync); + ctx->out.reset(); } enum class State diff --git a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp index 32071566bbe..5fe7de70a20 100644 --- a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp @@ -122,7 +122,7 @@ bool ReplicatedMergeMutateTaskBase::executeImpl() { MemoryTrackerThreadSwitcherPtr switcher; if (merge_mutate_entry) - switcher = std::make_unique(&(*merge_mutate_entry)->memory_tracker, (*merge_mutate_entry)->max_untracked_memory); + switcher = std::make_unique(*merge_mutate_entry); auto remove_processed_entry = [&] () -> bool { diff --git a/tests/queries/bugs/01200_mutations_memory_consumption.reference b/tests/queries/0_stateless/01200_mutations_memory_consumption.reference similarity index 100% rename from tests/queries/bugs/01200_mutations_memory_consumption.reference rename to tests/queries/0_stateless/01200_mutations_memory_consumption.reference diff --git a/tests/queries/bugs/01200_mutations_memory_consumption.sql b/tests/queries/0_stateless/01200_mutations_memory_consumption.sql similarity index 98% rename from tests/queries/bugs/01200_mutations_memory_consumption.sql rename to tests/queries/0_stateless/01200_mutations_memory_consumption.sql index d3eb5dd165c..de9c2df7f08 100644 --- a/tests/queries/bugs/01200_mutations_memory_consumption.sql +++ b/tests/queries/0_stateless/01200_mutations_memory_consumption.sql @@ -1,4 +1,4 @@ --- Tags: no-debug, no-parallel, no-fasttest +-- Tags: no-debug, no-parallel, long DROP TABLE IF EXISTS table_with_single_pk;