Avoid losing any allocations context from merges/mutations

This commit is contained in:
Azat Khuzhin 2021-10-13 23:47:28 +03:00
parent 8a209a78d7
commit 2a7a1d8df5
5 changed files with 22 additions and 20 deletions

View File

@ -10,7 +10,8 @@ namespace DB
{
MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MemoryTracker * memory_tracker_ptr, UInt64 untracked_memory_limit, const std::string & query_id)
MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MergeListEntry & merge_list_entry_)
: merge_list_entry(merge_list_entry_)
{
// Each merge is executed into separate background processing pool thread
background_thread_memory_tracker = CurrentThread::getMemoryTracker();
@ -29,19 +30,19 @@ MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MemoryTracker * memory_
}
background_thread_memory_tracker_prev_parent = background_thread_memory_tracker->getParent();
background_thread_memory_tracker->setParent(memory_tracker_ptr);
background_thread_memory_tracker->setParent(&merge_list_entry->memory_tracker);
}
prev_untracked_memory_limit = current_thread->untracked_memory_limit;
current_thread->untracked_memory_limit = untracked_memory_limit;
current_thread->untracked_memory_limit = merge_list_entry->max_untracked_memory;
/// Avoid accounting memory from another mutation/merge
/// (NOTE: consider moving such code to ThreadFromGlobalPool and related places)
prev_untracked_memory = current_thread->untracked_memory;
current_thread->untracked_memory = 0;
current_thread->untracked_memory = merge_list_entry->untracked_memory;
prev_query_id = current_thread->getQueryId().toString();
current_thread->setQueryId(query_id);
current_thread->setQueryId(merge_list_entry->query_id);
}
@ -53,7 +54,10 @@ MemoryTrackerThreadSwitcher::~MemoryTrackerThreadSwitcher()
background_thread_memory_tracker->setParent(background_thread_memory_tracker_prev_parent);
current_thread->untracked_memory_limit = prev_untracked_memory_limit;
merge_list_entry->untracked_memory = current_thread->untracked_memory;
current_thread->untracked_memory = prev_untracked_memory;
current_thread->setQueryId(prev_query_id);
}

View File

@ -55,6 +55,10 @@ struct MergeInfo
struct FutureMergedMutatedPart;
using FutureMergedMutatedPartPtr = std::shared_ptr<FutureMergedMutatedPart>;
struct MergeListElement;
using MergeListEntry = BackgroundProcessListEntry<MergeListElement, MergeInfo>;
/**
* Since merge is executed with multiple threads, this class
* switches the parent MemoryTracker to account all the memory used.
@ -62,9 +66,10 @@ using FutureMergedMutatedPartPtr = std::shared_ptr<FutureMergedMutatedPart>;
class MemoryTrackerThreadSwitcher : boost::noncopyable
{
public:
explicit MemoryTrackerThreadSwitcher(MemoryTracker * memory_tracker_ptr, UInt64 untracked_memory_limit, const std::string & query_id);
explicit MemoryTrackerThreadSwitcher(MergeListEntry & merge_list_entry_);
~MemoryTrackerThreadSwitcher();
private:
MergeListEntry & merge_list_entry;
MemoryTracker * background_thread_memory_tracker;
MemoryTracker * background_thread_memory_tracker_prev_parent = nullptr;
UInt64 prev_untracked_memory_limit;
@ -107,7 +112,11 @@ struct MergeListElement : boost::noncopyable
std::atomic<UInt64> columns_written{};
MemoryTracker memory_tracker{VariableContext::Process};
/// Used to adjust ThreadStatus::untracked_memory_limit
UInt64 max_untracked_memory;
/// Used to avoid losing any allocation context
UInt64 untracked_memory = 0;
/// Used for identifying mutations/merges in trace_log
std::string query_id;
UInt64 thread_id;
@ -129,8 +138,6 @@ struct MergeListElement : boost::noncopyable
~MergeListElement();
};
using MergeListEntry = BackgroundProcessListEntry<MergeListElement, MergeInfo>;
/** Maintains a list of currently running merges.
* For implementation of system.merges table.
*/

View File

@ -30,10 +30,7 @@ bool MergePlainMergeTreeTask::executeStep()
/// Make out memory tracker a parent of current thread memory tracker
MemoryTrackerThreadSwitcherPtr switcher;
if (merge_list_entry)
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(
&(*merge_list_entry)->memory_tracker,
(*merge_list_entry)->max_untracked_memory,
(*merge_list_entry)->query_id);
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(*merge_list_entry);
switch (state)
{

View File

@ -62,10 +62,7 @@ bool MutatePlainMergeTreeTask::executeStep()
/// Make out memory tracker a parent of current thread memory tracker
MemoryTrackerThreadSwitcherPtr switcher;
if (merge_list_entry)
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(
&(*merge_list_entry)->memory_tracker,
(*merge_list_entry)->max_untracked_memory,
(*merge_list_entry)->query_id);
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(*merge_list_entry);
switch (state)
{

View File

@ -122,10 +122,7 @@ bool ReplicatedMergeMutateTaskBase::executeImpl()
{
MemoryTrackerThreadSwitcherPtr switcher;
if (merge_mutate_entry)
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(
&(*merge_mutate_entry)->memory_tracker,
(*merge_mutate_entry)->max_untracked_memory,
(*merge_mutate_entry)->query_id);
switcher = std::make_unique<MemoryTrackerThreadSwitcher>(*merge_mutate_entry);
auto remove_processed_entry = [&] () -> bool
{