ClickHouse/dbms/src/Storages/MergeTree/MergeList.cpp

93 lines
3.4 KiB
C++
Raw Normal View History

#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
#include <Common/CurrentMetrics.h>
#include <common/getThreadNumber.h>
#include <Common/CurrentThread.h>
namespace CurrentMetrics
{
extern const Metric MemoryTrackingForMerges;
}
namespace DB
{
2019-08-03 11:02:40 +00:00
MergeListElement::MergeListElement(const std::string & database_, const std::string & table_, const FutureMergedMutatedPart & future_part)
: database{database_}, table{table_}, partition_id{future_part.part_info.partition_id}
, result_part_name{future_part.name}
2019-11-24 05:47:39 +00:00
, result_part_path{future_part.path}
, result_data_version{future_part.part_info.getDataVersion()}
, num_parts{future_part.parts.size()}
2020-02-02 02:27:15 +00:00
, thread_id{getThreadNumber()}
{
for (const auto & source_part : future_part.parts)
{
source_part_names.emplace_back(source_part->name);
source_part_paths.emplace_back(source_part->getFullPath());
2018-09-13 03:34:58 +00:00
std::shared_lock<std::shared_mutex> part_lock(source_part->columns_lock);
total_size_bytes_compressed += source_part->bytes_on_disk;
2019-03-25 13:55:24 +00:00
total_size_marks += source_part->getMarksCount();
2019-03-26 12:37:42 +00:00
total_rows_count += source_part->index_granularity.getTotalRows();
}
if (!future_part.parts.empty())
{
source_data_version = future_part.parts[0]->info.getDataVersion();
is_mutation = (result_data_version != source_data_version);
}
/// Each merge is executed into separate background processing pool thread
background_thread_memory_tracker = CurrentThread::getMemoryTracker();
if (background_thread_memory_tracker)
{
memory_tracker.setMetric(CurrentMetrics::MemoryTrackingForMerges);
background_thread_memory_tracker_prev_parent = background_thread_memory_tracker->getParent();
background_thread_memory_tracker->setParent(&memory_tracker);
}
}
MergeInfo MergeListElement::getInfo() const
{
MergeInfo res;
res.database = database;
res.table = table;
res.result_part_name = result_part_name;
res.result_part_path = result_part_path;
2018-09-11 11:16:40 +00:00
res.partition_id = partition_id;
res.is_mutation = is_mutation;
res.elapsed = watch.elapsedSeconds();
res.progress = progress.load(std::memory_order_relaxed);
res.num_parts = num_parts;
res.total_size_bytes_compressed = total_size_bytes_compressed;
res.total_size_marks = total_size_marks;
2019-03-26 12:37:42 +00:00
res.total_rows_count = total_rows_count;
res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed);
res.bytes_written_uncompressed = bytes_written_uncompressed.load(std::memory_order_relaxed);
res.rows_read = rows_read.load(std::memory_order_relaxed);
res.rows_written = rows_written.load(std::memory_order_relaxed);
res.columns_written = columns_written.load(std::memory_order_relaxed);
res.memory_usage = memory_tracker.get();
2020-02-02 02:27:15 +00:00
res.thread_id = thread_id;
for (const auto & source_part_name : source_part_names)
res.source_part_names.emplace_back(source_part_name);
for (const auto & source_part_path : source_part_paths)
res.source_part_paths.emplace_back(source_part_path);
return res;
}
MergeListElement::~MergeListElement()
{
/// Unplug memory_tracker from current background processing pool thread
if (background_thread_memory_tracker)
background_thread_memory_tracker->setParent(background_thread_memory_tracker_prev_parent);
}
}