ClickHouse/dbms/include/DB/Storages/MergeTree/MergeList.h

121 lines
2.7 KiB
C++
Raw Normal View History

2014-09-10 11:34:26 +00:00
#pragma once
2015-10-05 00:44:40 +00:00
#include <DB/Common/Stopwatch.h>
#include <DB/Common/CurrentMetrics.h>
2015-02-10 21:10:58 +00:00
#include <memory>
2014-09-10 11:34:26 +00:00
#include <list>
#include <mutex>
#include <atomic>
/** Maintains a list of currently running merges.
* For implementation of system.merges table.
*/
namespace CurrentMetrics
{
extern const Metric Merge;
}
2014-09-10 11:34:26 +00:00
namespace DB
{
2015-04-16 06:12:35 +00:00
struct MergeInfo
2014-09-10 11:34:26 +00:00
{
2015-04-16 06:12:35 +00:00
const std::string database;
const std::string table;
const std::string result_part_name;
Stopwatch watch;
Float64 progress{};
2016-07-31 03:53:16 +00:00
uint64_t num_parts{};
uint64_t total_size_bytes_compressed{};
uint64_t total_size_marks{};
std::atomic<uint64_t> bytes_read_uncompressed{};
std::atomic<uint64_t> rows_read{};
std::atomic<uint64_t> bytes_written_uncompressed{};
std::atomic<uint64_t> rows_written{};
2015-04-16 06:12:35 +00:00
MergeInfo(const std::string & database, const std::string & table, const std::string & result_part_name)
: database{database}, table{table}, result_part_name{result_part_name}
2014-09-10 11:34:26 +00:00
{
2015-04-16 06:12:35 +00:00
}
2016-07-31 03:53:16 +00:00
MergeInfo(const MergeInfo & other)
: database(other.database),
table(other.table),
result_part_name(other.result_part_name),
watch(other.watch),
progress(other.progress),
num_parts(other.num_parts),
total_size_bytes_compressed(other.total_size_bytes_compressed),
total_size_marks(other.total_size_marks),
bytes_read_uncompressed(other.bytes_read_uncompressed.load(std::memory_order_relaxed)),
rows_read(other.rows_read.load(std::memory_order_relaxed)),
bytes_written_uncompressed(other.bytes_written_uncompressed.load(std::memory_order_relaxed)),
rows_written(other.rows_written.load(std::memory_order_relaxed))
{
}
2015-04-16 06:12:35 +00:00
};
2014-09-10 11:34:26 +00:00
2015-04-16 06:12:35 +00:00
class MergeList;
class MergeListEntry
{
MergeList & list;
using container_t = std::list<MergeInfo>;
container_t::iterator it;
2014-09-10 11:34:26 +00:00
CurrentMetrics::Increment num_merges {CurrentMetrics::Merge};
2014-09-10 11:34:26 +00:00
public:
2015-04-16 06:12:35 +00:00
MergeListEntry(const MergeListEntry &) = delete;
MergeListEntry & operator=(const MergeListEntry &) = delete;
MergeListEntry(MergeList & list, const container_t::iterator it) : list(list), it{it} {}
~MergeListEntry();
2014-09-10 11:34:26 +00:00
2015-04-16 06:12:35 +00:00
MergeInfo * operator->() { return &*it; }
};
2014-09-10 11:34:26 +00:00
2015-04-16 06:12:35 +00:00
class MergeList
{
friend class MergeListEntry;
2014-09-10 11:34:26 +00:00
2015-04-16 06:12:35 +00:00
using container_t = std::list<MergeInfo>;
mutable std::mutex mutex;
container_t merges;
public:
using Entry = MergeListEntry;
2014-09-10 11:34:26 +00:00
using EntryPtr = std::unique_ptr<Entry>;
template <typename... Args>
EntryPtr insert(Args &&... args)
{
std::lock_guard<std::mutex> lock{mutex};
2015-02-10 21:10:58 +00:00
return std::make_unique<Entry>(*this, merges.emplace(merges.end(), std::forward<Args>(args)...));
2014-09-10 11:34:26 +00:00
}
container_t get() const
{
std::lock_guard<std::mutex> lock{mutex};
return merges;
}
};
2015-04-16 06:12:35 +00:00
inline MergeListEntry::~MergeListEntry()
{
std::lock_guard<std::mutex> lock{list.mutex};
list.merges.erase(it);
}
2014-09-10 11:34:26 +00:00
}