ClickHouse/src/Storages/MergeTree/MergeList.h

193 lines
4.8 KiB
C++
Raw Normal View History

2014-09-10 11:34:26 +00:00
#pragma once
#include <Common/Stopwatch.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
2020-09-10 14:56:15 +00:00
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Storages/MergeTree/MergeTreeData.h>
2020-09-03 13:00:13 +00:00
#include <Storages/MergeTree/MergeType.h>
2015-02-10 21:10:58 +00:00
#include <memory>
2014-09-10 11:34:26 +00:00
#include <list>
#include <mutex>
#include <atomic>
/** Maintains a list of currently running merges.
* For implementation of system.merges table.
*/
namespace CurrentMetrics
{
extern const Metric Merge;
}
2014-09-10 11:34:26 +00:00
namespace DB
{
2015-04-16 06:12:35 +00:00
struct MergeInfo
{
std::string database;
std::string table;
std::string result_part_name;
std::string result_part_path;
Array source_part_names;
Array source_part_paths;
2018-09-11 11:16:40 +00:00
std::string partition_id;
bool is_mutation;
Float64 elapsed;
Float64 progress;
UInt64 num_parts;
UInt64 total_size_bytes_compressed;
UInt64 total_size_marks;
2019-03-26 12:37:42 +00:00
UInt64 total_rows_count;
UInt64 bytes_read_uncompressed;
UInt64 bytes_written_uncompressed;
UInt64 rows_read;
UInt64 rows_written;
UInt64 columns_written;
UInt64 memory_usage;
UInt64 thread_id;
2020-09-03 13:00:13 +00:00
std::string merge_type;
2020-09-10 14:56:15 +00:00
std::string merge_algorithm;
};
struct FutureMergedMutatedPart;
struct MergeListElement : boost::noncopyable
2014-09-10 11:34:26 +00:00
{
const std::string database;
const std::string table;
2018-09-11 11:16:40 +00:00
std::string partition_id;
const std::string result_part_name;
const std::string result_part_path;
Int64 result_data_version{};
bool is_mutation{};
UInt64 num_parts{};
Names source_part_names;
Names source_part_paths;
Int64 source_data_version{};
Stopwatch watch;
std::atomic<Float64> progress{};
std::atomic<bool> is_cancelled{};
UInt64 total_size_bytes_compressed{};
UInt64 total_size_marks{};
2019-03-26 12:37:42 +00:00
UInt64 total_rows_count{};
std::atomic<UInt64> bytes_read_uncompressed{};
std::atomic<UInt64> bytes_written_uncompressed{};
/// In case of Vertical algorithm they are actual only for primary key columns
std::atomic<UInt64> rows_read{};
std::atomic<UInt64> rows_written{};
/// Updated only for Vertical algorithm
std::atomic<UInt64> columns_written{};
MemoryTracker memory_tracker{VariableContext::Process};
MemoryTracker * background_thread_memory_tracker;
MemoryTracker * background_thread_memory_tracker_prev_parent = nullptr;
2016-07-31 03:53:16 +00:00
UInt64 thread_id;
2020-09-03 13:00:13 +00:00
MergeType merge_type;
2020-09-10 14:56:15 +00:00
MergeAlgorithm merge_algorithm;
MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part);
MergeInfo getInfo() const;
~MergeListElement();
2015-04-16 06:12:35 +00:00
};
2014-09-10 11:34:26 +00:00
2015-04-16 06:12:35 +00:00
class MergeList;
class MergeListEntry
{
MergeList & list;
2015-04-16 06:12:35 +00:00
using container_t = std::list<MergeListElement>;
container_t::iterator it;
2014-09-10 11:34:26 +00:00
CurrentMetrics::Increment num_merges {CurrentMetrics::Merge};
2014-09-10 11:34:26 +00:00
public:
MergeListEntry(const MergeListEntry &) = delete;
MergeListEntry & operator=(const MergeListEntry &) = delete;
2015-04-16 06:12:35 +00:00
2019-08-03 11:02:40 +00:00
MergeListEntry(MergeList & list_, const container_t::iterator it_) : list(list_), it{it_} {}
~MergeListEntry();
2014-09-10 11:34:26 +00:00
MergeListElement * operator->() { return &*it; }
const MergeListElement * operator->() const { return &*it; }
2015-04-16 06:12:35 +00:00
};
2014-09-10 11:34:26 +00:00
2015-04-16 06:12:35 +00:00
class MergeList
{
friend class MergeListEntry;
2014-09-10 11:34:26 +00:00
using container_t = std::list<MergeListElement>;
using info_container_t = std::list<MergeInfo>;
2015-04-16 06:12:35 +00:00
mutable std::mutex mutex;
container_t merges;
2015-04-16 06:12:35 +00:00
2020-09-04 06:55:19 +00:00
std::atomic<size_t> merges_with_ttl_counter = 0;
2015-04-16 06:12:35 +00:00
public:
using Entry = MergeListEntry;
using EntryPtr = std::unique_ptr<Entry>;
template <typename... Args>
EntryPtr insert(Args &&... args)
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock{mutex};
2020-09-04 06:55:19 +00:00
auto entry = std::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...));
2020-09-04 11:27:27 +00:00
if (isTTLMergeType((*entry)->merge_type))
++merges_with_ttl_counter;
2020-09-04 06:55:19 +00:00
return entry;
}
info_container_t get() const
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock{mutex};
info_container_t res;
for (const auto & merge_element : merges)
res.emplace_back(merge_element.getInfo());
return res;
}
void cancelPartMutations(const String & partition_id, Int64 mutation_version)
{
std::lock_guard lock{mutex};
for (auto & merge_element : merges)
{
if ((partition_id.empty() || merge_element.partition_id == partition_id)
&& merge_element.source_data_version < mutation_version
&& merge_element.result_data_version >= mutation_version)
merge_element.is_cancelled = true;
}
}
2020-09-04 06:55:19 +00:00
size_t getExecutingMergesWithTTLCount() const
{
return merges_with_ttl_counter;
}
2014-09-10 11:34:26 +00:00
};
2015-04-16 06:12:35 +00:00
inline MergeListEntry::~MergeListEntry()
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock{list.mutex};
2020-09-04 06:55:19 +00:00
2020-09-04 13:55:07 +00:00
if (isTTLMergeType(it->merge_type))
2020-09-04 11:27:27 +00:00
--list.merges_with_ttl_counter;
2020-09-04 06:55:19 +00:00
list.merges.erase(it);
2015-04-16 06:12:35 +00:00
}
2014-09-10 11:34:26 +00:00
}