Merge pull request #34717 from azat/merge-mutate-memory-tracker

Fix possible memory_tracker use-after-free (for async s3 writes) for merges/mutations
This commit is contained in:
Nikolai Kochetov 2022-02-18 19:28:43 +01:00 committed by GitHub
commit e4d5db6161
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 45 additions and 53 deletions

View File

@ -117,6 +117,10 @@ public:
void setSoftLimit(Int64 value);
void setHardLimit(Int64 value);
Int64 getHardLimit() const
{
return hard_limit.load(std::memory_order_relaxed);
}
Int64 getSoftLimit() const
{
return soft_limit.load(std::memory_order_relaxed);

View File

@ -187,9 +187,7 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MergeFromLogEntryT
merge_mutate_entry = storage.getContext()->getMergeList().insert(
storage.getStorageID(),
future_merged_part,
settings.memory_profiler_step,
settings.memory_profiler_sample_probability,
settings.max_untracked_memory);
settings);
transaction_ptr = std::make_unique<MergeTreeData::Transaction>(storage);
stopwatch_ptr = std::make_unique<Stopwatch>();

View File

@ -16,23 +16,8 @@ MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MergeListEntry & merge_
{
// Each merge is executed into separate background processing pool thread
background_thread_memory_tracker = CurrentThread::getMemoryTracker();
if (background_thread_memory_tracker)
{
/// From the query context it will be ("for thread") memory tracker with VariableContext::Thread level,
/// which does not have any limits and sampling settings configured.
/// And parent for this memory tracker should be ("(for query)") with VariableContext::Process level,
/// that has limits and sampling configured.
MemoryTracker * parent;
if (background_thread_memory_tracker->level == VariableContext::Thread &&
(parent = background_thread_memory_tracker->getParent()) &&
parent != &total_memory_tracker)
{
background_thread_memory_tracker = parent;
}
background_thread_memory_tracker_prev_parent = background_thread_memory_tracker->getParent();
background_thread_memory_tracker->setParent(&merge_list_entry->memory_tracker);
}
background_thread_memory_tracker_prev_parent = background_thread_memory_tracker->getParent();
background_thread_memory_tracker->setParent(&merge_list_entry->memory_tracker);
prev_untracked_memory_limit = current_thread->untracked_memory_limit;
current_thread->untracked_memory_limit = merge_list_entry->max_untracked_memory;
@ -50,9 +35,7 @@ MemoryTrackerThreadSwitcher::MemoryTrackerThreadSwitcher(MergeListEntry & merge_
MemoryTrackerThreadSwitcher::~MemoryTrackerThreadSwitcher()
{
// Unplug memory_tracker from current background processing pool thread
if (background_thread_memory_tracker)
background_thread_memory_tracker->setParent(background_thread_memory_tracker_prev_parent);
background_thread_memory_tracker->setParent(background_thread_memory_tracker_prev_parent);
current_thread->untracked_memory_limit = prev_untracked_memory_limit;
@ -65,16 +48,14 @@ MemoryTrackerThreadSwitcher::~MemoryTrackerThreadSwitcher()
MergeListElement::MergeListElement(
const StorageID & table_id_,
FutureMergedMutatedPartPtr future_part,
UInt64 memory_profiler_step,
UInt64 memory_profiler_sample_probability,
UInt64 max_untracked_memory_)
const Settings & settings)
: table_id{table_id_}
, partition_id{future_part->part_info.partition_id}
, result_part_name{future_part->name}
, result_part_path{future_part->path}
, result_part_info{future_part->part_info}
, num_parts{future_part->parts.size()}
, max_untracked_memory(max_untracked_memory_)
, max_untracked_memory(settings.max_untracked_memory)
, query_id(table_id.getShortName() + "::" + result_part_name)
, thread_id{getThreadId()}
, merge_type{future_part->merge_type}
@ -97,8 +78,33 @@ MergeListElement::MergeListElement(
}
memory_tracker.setDescription("Mutate/Merge");
memory_tracker.setProfilerStep(memory_profiler_step);
memory_tracker.setSampleProbability(memory_profiler_sample_probability);
/// MemoryTracker settings should be set here, because
/// later (see MemoryTrackerThreadSwitcher)
/// parent memory tracker will be changed, and if merge executed from the
/// query (OPTIMIZE TABLE), all settings will be lost (since
/// current_thread::memory_tracker will have Thread level MemoryTracker,
/// which does not have any settings itself, it relies on the settings of the
/// thread_group::memory_tracker, but MemoryTrackerThreadSwitcher will reset parent).
memory_tracker.setProfilerStep(settings.memory_profiler_step);
memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
memory_tracker.setSoftLimit(settings.max_guaranteed_memory_usage);
if (settings.memory_tracker_fault_probability)
memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);
/// Let's try to copy memory related settings from the query,
/// since settings that we have here is not from query, but global, from the table.
///
/// NOTE: Remember, that Thread level MemoryTracker does not have any settings,
/// so it's parent is required.
MemoryTracker * query_memory_tracker = CurrentThread::getMemoryTracker();
MemoryTracker * parent_query_memory_tracker;
if (query_memory_tracker->level == VariableContext::Thread &&
(parent_query_memory_tracker = query_memory_tracker->getParent()) &&
parent_query_memory_tracker != &total_memory_tracker)
{
memory_tracker.setOrRaiseHardLimit(parent_query_memory_tracker->getHardLimit());
}
}
MergeInfo MergeListElement::getInfo() const

View File

@ -58,6 +58,8 @@ using FutureMergedMutatedPartPtr = std::shared_ptr<FutureMergedMutatedPart>;
struct MergeListElement;
using MergeListEntry = BackgroundProcessListEntry<MergeListElement, MergeInfo>;
struct Settings;
/**
* Since merge is executed with multiple threads, this class
@ -127,9 +129,7 @@ struct MergeListElement : boost::noncopyable
MergeListElement(
const StorageID & table_id_,
FutureMergedMutatedPartPtr future_part,
UInt64 memory_profiler_step,
UInt64 memory_profiler_sample_probability,
UInt64 max_untracked_memory_);
const Settings & settings);
MergeInfo getInfo() const;

View File

@ -81,9 +81,7 @@ void MergePlainMergeTreeTask::prepare()
merge_list_entry = storage.getContext()->getMergeList().insert(
storage.getStorageID(),
future_part,
settings.memory_profiler_step,
settings.memory_profiler_sample_probability,
settings.max_untracked_memory);
settings);
write_part_log = [this] (const ExecutionStatus & execution_status)
{

View File

@ -585,12 +585,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
projection_future_part,
projection.metadata,
global_ctx->merge_entry,
std::make_unique<MergeListElement>(
(*global_ctx->merge_entry)->table_id,
projection_future_part,
settings.memory_profiler_step,
settings.memory_profiler_sample_probability,
settings.max_untracked_memory),
std::make_unique<MergeListElement>((*global_ctx->merge_entry)->table_id, projection_future_part, settings),
global_ctx->time_of_merge,
global_ctx->context,
global_ctx->space_reservation,

View File

@ -116,9 +116,7 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MutateFromLogEntry
merge_mutate_entry = storage.getContext()->getMergeList().insert(
storage.getStorageID(),
future_mutated_part,
settings.memory_profiler_step,
settings.memory_profiler_sample_probability,
settings.max_untracked_memory);
settings);
stopwatch_ptr = std::make_unique<Stopwatch>();

View File

@ -32,9 +32,7 @@ void MutatePlainMergeTreeTask::prepare()
merge_list_entry = storage.getContext()->getMergeList().insert(
storage.getStorageID(),
future_part,
settings.memory_profiler_step,
settings.memory_profiler_sample_probability,
settings.max_untracked_memory);
settings);
stopwatch = std::make_unique<Stopwatch>();

View File

@ -633,12 +633,7 @@ public:
projection_future_part,
projection.metadata,
ctx->mutate_entry,
std::make_unique<MergeListElement>(
(*ctx->mutate_entry)->table_id,
projection_future_part,
settings.memory_profiler_step,
settings.memory_profiler_sample_probability,
settings.max_untracked_memory),
std::make_unique<MergeListElement>((*ctx->mutate_entry)->table_id, projection_future_part, settings),
*ctx->holder,
ctx->time_of_mutation,
ctx->context,