Merge pull request #29691 from azat/fix-merge-mutate-memory-tracking

Fix memory tracking for merges and mutations
This commit is contained in:
Nikita Mikhaylov 2021-10-15 13:53:52 +03:00 committed by GitHub
commit d1138a8a25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 50 additions and 13 deletions

View File

@ -41,6 +41,7 @@ class TaskStatsInfoGetter;
class InternalTextLogsQueue;
struct ViewRuntimeData;
class QueryViewsLog;
class MemoryTrackerThreadSwitcher;
using InternalTextLogsQueuePtr = std::shared_ptr<InternalTextLogsQueue>;
using InternalTextLogsQueueWeakPtr = std::weak_ptr<InternalTextLogsQueue>;
@ -166,6 +167,13 @@ protected:
/// It is used to avoid enabling the query profiler when you have multiple ThreadStatus in the same thread
bool query_profiled_enabled = true;
/// Requires access to query_id.
friend class MemoryTrackerThreadSwitcher;
void setQueryId(const String & query_id_)
{
query_id = query_id_;
}
public:
ThreadStatus();
~ThreadStatus();

View File

@ -110,7 +110,7 @@ StorageID StorageID::fromDictionaryConfig(const Poco::Util::AbstractConfiguratio
return res;
}
String StorageID::getInternalDictionaryName() const
String StorageID::getShortName() const
{
assertNotEmpty();
if (hasUUID())

View File

@ -91,7 +91,9 @@ struct StorageID
/// If dictionary has UUID, then use it as dictionary name in ExternalLoader to allow dictionary renaming.
/// ExternalDictnariesLoader::resolveDictionaryName(...) should be used to access such dictionaries by name.
String getInternalDictionaryName() const;
String getInternalDictionaryName() const { return getShortName(); }
/// Get short, but unique, name.
String getShortName() const;
private:
StorageID() = default;

View File

@ -10,7 +10,8 @@ namespace DB
{
MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MemoryTracker * memory_tracker_ptr, UInt64 untracked_memory_limit)
MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MergeListEntry & merge_list_entry_)
: merge_list_entry(merge_list_entry_)
{
// Each merge is executed into separate background processing pool thread
background_thread_memory_tracker = CurrentThread::getMemoryTracker();
@ -29,11 +30,19 @@ MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MemoryTracker * memory_
}
background_thread_memory_tracker_prev_parent = background_thread_memory_tracker->getParent();
background_thread_memory_tracker->setParent(memory_tracker_ptr);
background_thread_memory_tracker->setParent(&merge_list_entry->memory_tracker);
}
prev_untracked_memory_limit = current_thread->untracked_memory_limit;
current_thread->untracked_memory_limit = untracked_memory_limit;
current_thread->untracked_memory_limit = merge_list_entry->max_untracked_memory;
/// Avoid accounting memory from another mutation/merge
/// (NOTE: consider moving such code to ThreadFromGlobalPool and related places)
prev_untracked_memory = current_thread->untracked_memory;
current_thread->untracked_memory = merge_list_entry->untracked_memory;
prev_query_id = current_thread->getQueryId().toString();
current_thread->setQueryId(merge_list_entry->query_id);
}
@ -45,6 +54,11 @@ MemoryTrackerThreadSwitcher::~MemoryTrackerThreadSwitcher()
background_thread_memory_tracker->setParent(background_thread_memory_tracker_prev_parent);
current_thread->untracked_memory_limit = prev_untracked_memory_limit;
merge_list_entry->untracked_memory = current_thread->untracked_memory;
current_thread->untracked_memory = prev_untracked_memory;
current_thread->setQueryId(prev_query_id);
}
MergeListElement::MergeListElement(
@ -60,6 +74,7 @@ MergeListElement::MergeListElement(
, result_part_info{future_part->part_info}
, num_parts{future_part->parts.size()}
, max_untracked_memory(max_untracked_memory_)
, query_id(table_id.getShortName() + "::" + result_part_name)
, thread_id{getThreadId()}
, merge_type{future_part->merge_type}
, merge_algorithm{MergeAlgorithm::Undecided}

View File

@ -55,6 +55,10 @@ struct MergeInfo
struct FutureMergedMutatedPart;
using FutureMergedMutatedPartPtr = std::shared_ptr<FutureMergedMutatedPart>;
struct MergeListElement;
using MergeListEntry = BackgroundProcessListEntry<MergeListElement, MergeInfo>;
/**
* Since merge is executed with multiple threads, this class
* switches the parent MemoryTracker to account all the memory used.
@ -62,12 +66,15 @@ using FutureMergedMutatedPartPtr = std::shared_ptr<FutureMergedMutatedPart>;
class MemoryTrackerThreadSwitcher : boost::noncopyable
{
public:
explicit MemoryTrackerThreadSwitcher(MemoryTracker * memory_tracker_ptr, UInt64 untracked_memory_limit);
explicit MemoryTrackerThreadSwitcher(MergeListEntry & merge_list_entry_);
~MemoryTrackerThreadSwitcher();
private:
MergeListEntry & merge_list_entry;
MemoryTracker * background_thread_memory_tracker;
MemoryTracker * background_thread_memory_tracker_prev_parent = nullptr;
UInt64 prev_untracked_memory_limit;
UInt64 prev_untracked_memory;
String prev_query_id;
};
using MemoryTrackerThreadSwitcherPtr = std::unique_ptr<MemoryTrackerThreadSwitcher>;
@ -105,7 +112,12 @@ struct MergeListElement : boost::noncopyable
std::atomic<UInt64> columns_written{};
MemoryTracker memory_tracker{VariableContext::Process};
/// Used to adjust ThreadStatus::untracked_memory_limit
UInt64 max_untracked_memory;
/// Used to avoid losing any allocation context
UInt64 untracked_memory = 0;
/// Used for identifying mutations/merges in trace_log
std::string query_id;
UInt64 thread_id;
MergeType merge_type;
@ -126,8 +138,6 @@ struct MergeListElement : boost::noncopyable
~MergeListElement();
};
using MergeListEntry = BackgroundProcessListEntry<MergeListElement, MergeInfo>;
/** Maintains a list of currently running merges.
* For implementation of system.merges table.
*/

View File

@ -30,7 +30,7 @@ bool MergePlainMergeTreeTask::executeStep()
/// Make out memory tracker a parent of current thread memory tracker
MemoryTrackerThreadSwitcherPtr switcher;
if (merge_list_entry)
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(&(*merge_list_entry)->memory_tracker, (*merge_list_entry)->max_untracked_memory);
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(*merge_list_entry);
switch (state)
{
@ -87,6 +87,7 @@ void MergePlainMergeTreeTask::prepare()
write_part_log = [this] (const ExecutionStatus & execution_status)
{
merge_task.reset();
storage.writePartLog(
PartLogElement::MERGE_PARTS,
execution_status,

View File

@ -40,6 +40,7 @@ void MutatePlainMergeTreeTask::prepare()
write_part_log = [this] (const ExecutionStatus & execution_status)
{
mutate_task.reset();
storage.writePartLog(
PartLogElement::MUTATE_PART,
execution_status,
@ -61,7 +62,7 @@ bool MutatePlainMergeTreeTask::executeStep()
/// Make out memory tracker a parent of current thread memory tracker
MemoryTrackerThreadSwitcherPtr switcher;
if (merge_list_entry)
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(&(*merge_list_entry)->memory_tracker, (*merge_list_entry)->max_untracked_memory);
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(*merge_list_entry);
switch (state)
{

View File

@ -766,7 +766,6 @@ private:
State state{State::NEED_PREPARE};
MutationContextPtr ctx;
Block block;
size_t block_num = 0;
using ProjectionNameToItsBlocks = std::map<String, MergeTreeData::MutableDataPartsVector>;
@ -986,6 +985,7 @@ private:
ctx->mutating_pipeline.reset();
static_pointer_cast<MergedBlockOutputStream>(ctx->out)->writeSuffixAndFinalizePart(ctx->new_data_part, ctx->need_sync);
ctx->out.reset();
}
enum class State

View File

@ -122,7 +122,7 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
{
MemoryTrackerThreadSwitcherPtr switcher;
if (merge_mutate_entry)
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(&(*merge_mutate_entry)->memory_tracker, (*merge_mutate_entry)->max_untracked_memory);
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(*merge_mutate_entry);
auto remove_processed_entry = [&] () -> bool
{

View File

@ -1,4 +1,4 @@
-- Tags: no-debug, no-parallel, no-fasttest
-- Tags: no-debug, no-parallel, long
DROP TABLE IF EXISTS table_with_single_pk;