2014-09-10 11:34:26 +00:00
|
|
|
#pragma once
|
|
|
|
|
2020-10-26 12:40:55 +00:00
|
|
|
#include <Core/Names.h>
|
|
|
|
#include <Core/Field.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/Stopwatch.h>
|
|
|
|
#include <Common/CurrentMetrics.h>
|
|
|
|
#include <Common/MemoryTracker.h>
|
2020-09-03 13:00:13 +00:00
|
|
|
#include <Storages/MergeTree/MergeType.h>
|
2020-10-26 12:40:55 +00:00
|
|
|
#include <Storages/MergeTree/MergeAlgorithm.h>
|
2021-06-24 14:07:43 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreePartInfo.h>
|
2020-10-26 12:40:55 +00:00
|
|
|
#include <Storages/MergeTree/BackgroundProcessList.h>
|
2021-06-06 12:24:49 +00:00
|
|
|
#include <Interpreters/StorageID.h>
|
2020-10-26 12:40:55 +00:00
|
|
|
#include <boost/noncopyable.hpp>
|
2015-02-10 21:10:58 +00:00
|
|
|
#include <memory>
|
2014-09-10 11:34:26 +00:00
|
|
|
#include <list>
|
|
|
|
#include <mutex>
|
|
|
|
#include <atomic>
|
|
|
|
|
2016-10-24 04:06:27 +00:00
|
|
|
|
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const Metric Merge;
|
2016-10-24 04:06:27 +00:00
|
|
|
}
|
|
|
|
|
2014-09-10 11:34:26 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2015-04-16 06:12:35 +00:00
|
|
|
struct MergeInfo
|
2016-12-23 20:23:46 +00:00
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
std::string database;
|
|
|
|
std::string table;
|
|
|
|
std::string result_part_name;
|
2019-11-20 08:51:52 +00:00
|
|
|
std::string result_part_path;
|
2017-04-01 07:20:54 +00:00
|
|
|
Array source_part_names;
|
2019-11-20 08:51:52 +00:00
|
|
|
Array source_part_paths;
|
2018-09-11 11:16:40 +00:00
|
|
|
std::string partition_id;
|
2019-01-13 22:02:33 +00:00
|
|
|
bool is_mutation;
|
2017-04-01 07:20:54 +00:00
|
|
|
Float64 elapsed;
|
|
|
|
Float64 progress;
|
|
|
|
UInt64 num_parts;
|
|
|
|
UInt64 total_size_bytes_compressed;
|
|
|
|
UInt64 total_size_marks;
|
2019-03-26 12:37:42 +00:00
|
|
|
UInt64 total_rows_count;
|
2017-04-01 07:20:54 +00:00
|
|
|
UInt64 bytes_read_uncompressed;
|
|
|
|
UInt64 bytes_written_uncompressed;
|
|
|
|
UInt64 rows_read;
|
|
|
|
UInt64 rows_written;
|
|
|
|
UInt64 columns_written;
|
|
|
|
UInt64 memory_usage;
|
2020-02-02 20:01:13 +00:00
|
|
|
UInt64 thread_id;
|
2020-09-03 13:00:13 +00:00
|
|
|
std::string merge_type;
|
2020-09-10 14:56:15 +00:00
|
|
|
std::string merge_algorithm;
|
2016-12-23 20:23:46 +00:00
|
|
|
};
|
|
|
|
|
2019-01-13 22:02:33 +00:00
|
|
|
struct FutureMergedMutatedPart;
|
2021-09-16 21:19:58 +00:00
|
|
|
using FutureMergedMutatedPartPtr = std::shared_ptr<FutureMergedMutatedPart>;
|
|
|
|
|
2021-10-13 20:47:28 +00:00
|
|
|
struct MergeListElement;
|
|
|
|
using MergeListEntry = BackgroundProcessListEntry<MergeListElement, MergeInfo>;
|
|
|
|
|
Fix possible memory_tracker use-after-free for merges/mutations
There are two possible cases for execution merges/mutations:
1) from background thread
2) from OPTIMIZE TABLE query
1) is pretty simple, it's memory tracking structure is as follow:
current_thread::memory_tracker = level=Thread / description="(for thread)" ==
background_thread_memory_tracker = level=Thread / description="(for thread)"
current_thread::memory_tracker.parent = level=Global / description="(total)"
So as you can see it is pretty simple and MemoryTrackerThreadSwitcher
does not do anything icky for this case.
2) is complex, it's memory tracking structure is as follow:
current_thread::memory_tracker = level=Thread / description="(for thread)"
current_thread::memory_tracker.parent = level=Process / description="(for query)" ==
background_thread_memory_tracker = level=Process / description="(for query)"
Before this patch to track memory (and related things, like sampling,
profiling and so on) for OPTIMIZE TABLE query dirty hacks was done to
do this, since current_thread memory_tracker was of Thread scope, that
does not have any limits.
And so if will change parent for it to Merge/Mutate memory tracker
(which also does not have some of settings) it will not be correctly
tracked.
To address this Merge/Mutate was set as parent not to the
current_thread memory_tracker but to it's parent, since it's scope is
Process with all settings.
But that parent's memory_tracker is the memory_tracker of the
thread_group, and so if you will have nested ThreadPool inside
merge/mutate (this is the case for s3 async writes, which has been
added in #33291) you may get use-after-free of memory_tracker.
Consider the following example:
MemoryTrackerThreadSwitcher()
thread_group.memory_tracker.parent = merge_list_entry->memory_tracker
(see also background_thread_memory_tracker above)
CurrentThread::attachTo()
current_thread.memory_tracker.parent = thread_group.memory_tracker
CurrentThread::detachQuery()
current_thread.memory_tracker.parent = thread_group.memory_tracker.parent
# and this is equal to merge_list_entry->memory_tracker
~MemoryTrackerThreadSwitcher()
thread_group.memory_tracker = thread_group.memory_tracker.parent
So after the following we will get incorrect memory_tracker (from the
mege_list_entry) when the next job in that ThreadPool will not have
thread_group, since in this case it will not try to update the
current_thread.memory_tracker.parent and use-after-free will happens.
So to address the (2) issue, settings from the parent memory_tracker
should be copied to the merge_list_entry->memory_tracker, to avoid
playing with parent memory tracker.
Note, that settings from the query (OPTIMIZE TABLE) is not available at
that time, so it cannot be used (instead of parent's memory tracker
settings).
v2: remove memory_tracker.setOrRaiseHardLimit() from settings
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-02-18 07:45:29 +00:00
|
|
|
struct Settings;
|
|
|
|
|
2021-10-13 20:47:28 +00:00
|
|
|
|
2021-09-16 21:19:58 +00:00
|
|
|
/**
|
|
|
|
* Since merge is executed with multiple threads, this class
|
|
|
|
* switches the parent MemoryTracker to account all the memory used.
|
|
|
|
*/
|
|
|
|
class MemoryTrackerThreadSwitcher : boost::noncopyable
|
|
|
|
{
|
|
|
|
public:
|
2021-10-13 20:47:28 +00:00
|
|
|
explicit MemoryTrackerThreadSwitcher(MergeListEntry & merge_list_entry_);
|
2021-09-16 21:19:58 +00:00
|
|
|
~MemoryTrackerThreadSwitcher();
|
|
|
|
private:
|
2021-10-13 20:47:28 +00:00
|
|
|
MergeListEntry & merge_list_entry;
|
2021-09-16 21:19:58 +00:00
|
|
|
MemoryTracker * background_thread_memory_tracker;
|
|
|
|
MemoryTracker * background_thread_memory_tracker_prev_parent = nullptr;
|
2022-11-22 16:09:48 +00:00
|
|
|
Int64 prev_untracked_memory_limit;
|
|
|
|
Int64 prev_untracked_memory;
|
2021-10-13 20:47:28 +00:00
|
|
|
String prev_query_id;
|
2021-09-16 21:19:58 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
using MemoryTrackerThreadSwitcherPtr = std::unique_ptr<MemoryTrackerThreadSwitcher>;
|
2016-12-23 20:23:46 +00:00
|
|
|
|
|
|
|
struct MergeListElement : boost::noncopyable
|
2014-09-10 11:34:26 +00:00
|
|
|
{
|
2021-06-06 12:24:49 +00:00
|
|
|
const StorageID table_id;
|
2018-09-11 11:16:40 +00:00
|
|
|
std::string partition_id;
|
2019-01-14 12:25:25 +00:00
|
|
|
|
|
|
|
const std::string result_part_name;
|
2019-11-20 08:51:52 +00:00
|
|
|
const std::string result_part_path;
|
2021-06-24 14:07:43 +00:00
|
|
|
MergeTreePartInfo result_part_info;
|
2019-01-14 12:25:25 +00:00
|
|
|
bool is_mutation{};
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
UInt64 num_parts{};
|
|
|
|
Names source_part_names;
|
2019-11-20 08:51:52 +00:00
|
|
|
Names source_part_paths;
|
2019-01-14 12:25:25 +00:00
|
|
|
Int64 source_data_version{};
|
|
|
|
|
|
|
|
Stopwatch watch;
|
|
|
|
std::atomic<Float64> progress{};
|
|
|
|
std::atomic<bool> is_cancelled{};
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
UInt64 total_size_bytes_compressed{};
|
|
|
|
UInt64 total_size_marks{};
|
2019-03-26 12:37:42 +00:00
|
|
|
UInt64 total_rows_count{};
|
2017-04-01 07:20:54 +00:00
|
|
|
std::atomic<UInt64> bytes_read_uncompressed{};
|
|
|
|
std::atomic<UInt64> bytes_written_uncompressed{};
|
2016-11-03 12:00:44 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// In case of Vertical algorithm they are actual only for primary key columns
|
|
|
|
std::atomic<UInt64> rows_read{};
|
|
|
|
std::atomic<UInt64> rows_written{};
|
2016-11-22 19:34:36 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
/// Updated only for Vertical algorithm
|
|
|
|
std::atomic<UInt64> columns_written{};
|
2016-11-03 12:00:44 +00:00
|
|
|
|
2021-10-13 20:47:28 +00:00
|
|
|
/// Used to adjust ThreadStatus::untracked_memory_limit
|
2021-10-03 08:21:54 +00:00
|
|
|
UInt64 max_untracked_memory;
|
2021-10-13 20:47:28 +00:00
|
|
|
/// Used to avoid losing any allocation context
|
|
|
|
UInt64 untracked_memory = 0;
|
|
|
|
/// Used for identifying mutations/merges in trace_log
|
2021-10-13 20:47:28 +00:00
|
|
|
std::string query_id;
|
2016-07-31 03:53:16 +00:00
|
|
|
|
2020-02-02 20:01:13 +00:00
|
|
|
UInt64 thread_id;
|
2020-09-03 13:00:13 +00:00
|
|
|
MergeType merge_type;
|
2020-10-27 15:27:12 +00:00
|
|
|
/// Detected after merge already started
|
|
|
|
std::atomic<MergeAlgorithm> merge_algorithm;
|
2016-12-23 20:23:46 +00:00
|
|
|
|
2022-07-15 09:40:58 +00:00
|
|
|
/// Description used for logging
|
|
|
|
/// Needs to outlive memory_tracker since it's used in its destructor
|
2022-07-14 14:22:06 +00:00
|
|
|
const String description{"Mutate/Merge"};
|
2022-07-15 09:40:58 +00:00
|
|
|
MemoryTracker memory_tracker{VariableContext::Process};
|
2022-07-14 14:22:06 +00:00
|
|
|
|
2021-10-03 08:21:54 +00:00
|
|
|
MergeListElement(
|
|
|
|
const StorageID & table_id_,
|
|
|
|
FutureMergedMutatedPartPtr future_part,
|
Fix possible memory_tracker use-after-free for merges/mutations
There are two possible cases for execution merges/mutations:
1) from background thread
2) from OPTIMIZE TABLE query
1) is pretty simple, it's memory tracking structure is as follow:
current_thread::memory_tracker = level=Thread / description="(for thread)" ==
background_thread_memory_tracker = level=Thread / description="(for thread)"
current_thread::memory_tracker.parent = level=Global / description="(total)"
So as you can see it is pretty simple and MemoryTrackerThreadSwitcher
does not do anything icky for this case.
2) is complex, it's memory tracking structure is as follow:
current_thread::memory_tracker = level=Thread / description="(for thread)"
current_thread::memory_tracker.parent = level=Process / description="(for query)" ==
background_thread_memory_tracker = level=Process / description="(for query)"
Before this patch to track memory (and related things, like sampling,
profiling and so on) for OPTIMIZE TABLE query dirty hacks was done to
do this, since current_thread memory_tracker was of Thread scope, that
does not have any limits.
And so if will change parent for it to Merge/Mutate memory tracker
(which also does not have some of settings) it will not be correctly
tracked.
To address this Merge/Mutate was set as parent not to the
current_thread memory_tracker but to it's parent, since it's scope is
Process with all settings.
But that parent's memory_tracker is the memory_tracker of the
thread_group, and so if you will have nested ThreadPool inside
merge/mutate (this is the case for s3 async writes, which has been
added in #33291) you may get use-after-free of memory_tracker.
Consider the following example:
MemoryTrackerThreadSwitcher()
thread_group.memory_tracker.parent = merge_list_entry->memory_tracker
(see also background_thread_memory_tracker above)
CurrentThread::attachTo()
current_thread.memory_tracker.parent = thread_group.memory_tracker
CurrentThread::detachQuery()
current_thread.memory_tracker.parent = thread_group.memory_tracker.parent
# and this is equal to merge_list_entry->memory_tracker
~MemoryTrackerThreadSwitcher()
thread_group.memory_tracker = thread_group.memory_tracker.parent
So after the following we will get incorrect memory_tracker (from the
mege_list_entry) when the next job in that ThreadPool will not have
thread_group, since in this case it will not try to update the
current_thread.memory_tracker.parent and use-after-free will happens.
So to address the (2) issue, settings from the parent memory_tracker
should be copied to the merge_list_entry->memory_tracker, to avoid
playing with parent memory tracker.
Note, that settings from the query (OPTIMIZE TABLE) is not available at
that time, so it cannot be used (instead of parent's memory tracker
settings).
v2: remove memory_tracker.setOrRaiseHardLimit() from settings
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-02-18 07:45:29 +00:00
|
|
|
const Settings & settings);
|
2016-12-23 20:23:46 +00:00
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
MergeInfo getInfo() const;
|
2016-12-23 20:23:46 +00:00
|
|
|
|
2021-09-24 13:57:44 +00:00
|
|
|
MergeListElement * ptr() { return this; }
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
~MergeListElement();
|
2021-11-05 11:55:30 +00:00
|
|
|
|
|
|
|
MergeListElement & ref() { return *this; }
|
2015-04-16 06:12:35 +00:00
|
|
|
};
|
2014-09-10 11:34:26 +00:00
|
|
|
|
2020-10-26 12:40:55 +00:00
|
|
|
/** Maintains a list of currently running merges.
|
|
|
|
* For implementation of system.merges table.
|
|
|
|
*/
|
|
|
|
class MergeList final : public BackgroundProcessList<MergeListElement, MergeInfo>
|
2015-04-16 06:12:35 +00:00
|
|
|
{
|
2020-10-26 16:38:35 +00:00
|
|
|
private:
|
2020-10-26 12:40:55 +00:00
|
|
|
using Parent = BackgroundProcessList<MergeListElement, MergeInfo>;
|
2020-09-04 06:55:19 +00:00
|
|
|
std::atomic<size_t> merges_with_ttl_counter = 0;
|
2015-04-16 06:12:35 +00:00
|
|
|
public:
|
2020-10-26 12:40:55 +00:00
|
|
|
MergeList()
|
|
|
|
: Parent(CurrentMetrics::Merge)
|
|
|
|
{}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-10-26 12:40:55 +00:00
|
|
|
void onEntryDestroy(const Parent::Entry & entry) override
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2020-10-26 12:40:55 +00:00
|
|
|
if (isTTLMergeType(entry->merge_type))
|
|
|
|
--merges_with_ttl_counter;
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2019-01-14 12:25:25 +00:00
|
|
|
|
2021-06-06 12:24:49 +00:00
|
|
|
void cancelPartMutations(const StorageID & table_id, const String & partition_id, Int64 mutation_version)
|
2019-01-14 12:25:25 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock{mutex};
|
2020-10-26 12:40:55 +00:00
|
|
|
for (auto & merge_element : entries)
|
2019-01-14 12:25:25 +00:00
|
|
|
{
|
2019-02-04 12:53:25 +00:00
|
|
|
if ((partition_id.empty() || merge_element.partition_id == partition_id)
|
2021-06-06 12:24:49 +00:00
|
|
|
&& merge_element.table_id == table_id
|
2019-02-04 12:53:25 +00:00
|
|
|
&& merge_element.source_data_version < mutation_version
|
2021-06-24 14:07:43 +00:00
|
|
|
&& merge_element.result_part_info.getDataVersion() >= mutation_version)
|
|
|
|
merge_element.is_cancelled = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void cancelInPartition(const StorageID & table_id, const String & partition_id, Int64 delimiting_block_number)
|
|
|
|
{
|
2021-06-30 12:55:26 +00:00
|
|
|
std::lock_guard lock{mutex};
|
2021-06-24 14:07:43 +00:00
|
|
|
for (auto & merge_element : entries)
|
|
|
|
{
|
|
|
|
if (merge_element.table_id == table_id
|
|
|
|
&& merge_element.partition_id == partition_id
|
|
|
|
&& merge_element.result_part_info.min_block < delimiting_block_number)
|
2019-01-14 12:25:25 +00:00
|
|
|
merge_element.is_cancelled = true;
|
|
|
|
}
|
|
|
|
}
|
2020-09-04 06:55:19 +00:00
|
|
|
|
2021-01-27 11:56:12 +00:00
|
|
|
/// Merge consists of two parts: assignment and execution. We add merge to
|
|
|
|
/// merge list on execution, but checking merge list during merge
|
|
|
|
/// assignment. This lead to the logical race condition (we can assign more
|
|
|
|
/// merges with TTL than allowed). So we "book" merge with ttl during
|
|
|
|
/// assignment, and remove from list after merge execution.
|
|
|
|
///
|
|
|
|
/// NOTE: Not important for replicated merge tree, we check count of merges twice:
|
|
|
|
/// in assignment and in queue before execution.
|
|
|
|
void bookMergeWithTTL()
|
|
|
|
{
|
|
|
|
++merges_with_ttl_counter;
|
|
|
|
}
|
|
|
|
|
2022-06-01 19:09:53 +00:00
|
|
|
void cancelMergeWithTTL()
|
|
|
|
{
|
|
|
|
--merges_with_ttl_counter;
|
|
|
|
}
|
|
|
|
|
2021-01-27 11:56:12 +00:00
|
|
|
size_t getMergesWithTTLCount() const
|
2020-09-04 06:55:19 +00:00
|
|
|
{
|
|
|
|
return merges_with_ttl_counter;
|
|
|
|
}
|
2014-09-10 11:34:26 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
}
|