Add asynchronous metrics on MergeTree tables data volume (#17639)

Co-authored-by: Alexander Kazakov <Akazz@users.noreply.github.com>
This commit is contained in:
flynn 2020-12-22 18:34:35 +08:00 committed by GitHub
parent 0e807d0647
commit 4e580f7b7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 111 additions and 25 deletions

View File

@ -247,6 +247,10 @@ void AsynchronousMetrics::update()
size_t number_of_databases = databases.size();
size_t total_number_of_tables = 0;
size_t total_number_of_bytes = 0;
size_t total_number_of_rows = 0;
size_t total_number_of_parts = 0;
for (const auto & db : databases)
{
/// Check if database can contain MergeTree tables
@ -295,6 +299,17 @@ void AsynchronousMetrics::update()
if (table_merge_tree)
{
calculateMax(max_part_count_for_partition, table_merge_tree->getMaxPartsCountForPartition());
const auto & settings = global_context.getSettingsRef();
total_number_of_bytes += table_merge_tree->totalBytes(settings).value();
total_number_of_rows += table_merge_tree->totalRows(settings).value();
total_number_of_parts += table_merge_tree->getPartsCount();
}
if (table_replicated_merge_tree)
{
const auto & settings = global_context.getSettingsRef();
total_number_of_bytes += table_replicated_merge_tree->totalBytes(settings).value();
total_number_of_rows += table_replicated_merge_tree->totalRows(settings).value();
total_number_of_parts += table_replicated_merge_tree->getPartsCount();
}
}
}
@ -315,6 +330,10 @@ void AsynchronousMetrics::update()
new_values["NumberOfDatabases"] = number_of_databases;
new_values["NumberOfTables"] = total_number_of_tables;
new_values["TotalBytesOfMergeTreeTables"] = total_number_of_bytes;
new_values["TotalRowsOfMergeTreeTables"] = total_number_of_rows;
new_values["TotalPartsOfMergeTreeTables"] = total_number_of_parts;
auto get_metric_name = [](const String & name) -> const char *
{
static std::map<String, const char *> metric_map = {

View File

@ -877,6 +877,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
std::lock_guard loading_lock(mutex);
if (!data_parts_indexes.insert(part).second)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
addPartContributionToDataVolume(part);
});
}
@ -893,6 +895,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (!data_parts_indexes.insert(part).second)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
addPartContributionToDataVolume(part);
}
if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts)
@ -924,6 +928,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
(*it)->remove_time.store((*it)->modification_time, std::memory_order_relaxed);
modifyPartState(it, DataPartState::Outdated);
removePartContributionToDataVolume(*it);
};
(*prev_jt)->assertState({DataPartState::Committed});
@ -1292,6 +1297,8 @@ void MergeTreeData::dropAllData()
}
}
setDataVolume(0, 0, 0);
LOG_TRACE(log, "dropAllData: done.");
}
@ -1987,16 +1994,25 @@ bool MergeTreeData::renameTempPartAndReplace(
}
else
{
size_t reduce_bytes = 0;
size_t reduce_rows = 0;
size_t reduce_parts = 0;
auto current_time = time(nullptr);
for (const DataPartPtr & covered_part : covered_parts)
{
covered_part->remove_time.store(current_time, std::memory_order_relaxed);
modifyPartState(covered_part, DataPartState::Outdated);
removePartContributionToColumnSizes(covered_part);
reduce_bytes += covered_part->getBytesOnDisk();
reduce_rows += covered_part->rows_count;
++reduce_parts;
}
decreaseDataVolume(reduce_bytes, reduce_rows, reduce_parts);
modifyPartState(part_it, DataPartState::Committed);
addPartContributionToColumnSizes(part);
addPartContributionToDataVolume(part);
}
auto part_in_memory = asInMemoryPart(part);
@ -2037,7 +2053,10 @@ void MergeTreeData::removePartsFromWorkingSet(const MergeTreeData::DataPartsVect
for (const DataPartPtr & part : remove)
{
if (part->state == IMergeTreeDataPart::State::Committed)
{
removePartContributionToColumnSizes(part);
removePartContributionToDataVolume(part);
}
if (part->state == IMergeTreeDataPart::State::Committed || clear_without_timeout)
part->remove_time.store(remove_time, std::memory_order_relaxed);
@ -2150,7 +2169,10 @@ restore_covered)
DataPartPtr part = *it_part;
if (part->state == DataPartState::Committed)
{
removePartContributionToDataVolume(part);
removePartContributionToColumnSizes(part);
}
modifyPartState(it_part, DataPartState::Deleting);
part->renameToDetached(prefix);
@ -2198,6 +2220,7 @@ restore_covered)
if ((*it)->state != DataPartState::Committed)
{
addPartContributionToColumnSizes(*it);
addPartContributionToDataVolume(*it);
modifyPartState(it, DataPartState::Committed); // iterator is not invalidated here
}
@ -2228,6 +2251,7 @@ restore_covered)
if ((*it)->state != DataPartState::Committed)
{
addPartContributionToColumnSizes(*it);
addPartContributionToDataVolume(*it);
modifyPartState(it, DataPartState::Committed);
}
@ -2289,41 +2313,19 @@ void MergeTreeData::tryRemovePartImmediately(DataPartPtr && part)
size_t MergeTreeData::getTotalActiveSizeInBytes() const
{
size_t res = 0;
{
auto lock = lockParts();
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
res += part->getBytesOnDisk();
}
return res;
return total_active_size_bytes.load(std::memory_order_acquire);
}
size_t MergeTreeData::getTotalActiveSizeInRows() const
{
size_t res = 0;
{
auto lock = lockParts();
for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
res += part->rows_count;
}
return res;
return total_active_size_rows.load(std::memory_order_acquire);
}
size_t MergeTreeData::getPartsCount() const
{
auto lock = lockParts();
size_t res = 0;
for (const auto & part [[maybe_unused]] : getDataPartsStateRange(DataPartState::Committed))
++res;
return res;
return total_active_size_parts.load(std::memory_order_acquire);
}
@ -2452,6 +2454,9 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
auto part_it = data_parts_indexes.insert(part_copy).first;
modifyPartState(part_it, DataPartState::Committed);
removePartContributionToDataVolume(original_active_part);
addPartContributionToDataVolume(part_copy);
auto disk = original_active_part->volume->getDisk();
String marker_path = original_active_part->getFullRelativePath() + IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME;
try
@ -3349,6 +3354,15 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
auto * owing_parts_lock = acquired_parts_lock ? acquired_parts_lock : &parts_lock;
auto current_time = time(nullptr);
size_t add_bytes = 0;
size_t add_rows = 0;
size_t add_parts = 0;
size_t reduce_bytes = 0;
size_t reduce_rows = 0;
size_t reduce_parts = 0;
for (const DataPartPtr & part : precommitted_parts)
{
DataPartPtr covering_part;
@ -3366,14 +3380,25 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
for (const DataPartPtr & covered_part : covered_parts)
{
covered_part->remove_time.store(current_time, std::memory_order_relaxed);
reduce_bytes += covered_part->getBytesOnDisk();
reduce_rows += covered_part->rows_count;
data.modifyPartState(covered_part, DataPartState::Outdated);
data.removePartContributionToColumnSizes(covered_part);
}
reduce_parts += covered_parts.size();
add_bytes += part->getBytesOnDisk();
add_rows += part->rows_count;
++add_parts;
data.modifyPartState(part, DataPartState::Committed);
data.addPartContributionToColumnSizes(part);
}
}
data.decreaseDataVolume(reduce_bytes, reduce_rows, reduce_parts);
data.increaseDataVolume(add_bytes, add_rows, add_parts);
}
clear();
@ -3918,4 +3943,34 @@ size_t MergeTreeData::getTotalMergesWithTTLInMergeList() const
return global_context.getMergeList().getExecutingMergesWithTTLCount();
}
void MergeTreeData::addPartContributionToDataVolume(const DataPartPtr & part)
{
increaseDataVolume(part->getBytesOnDisk(), part->rows_count, 1);
}
void MergeTreeData::removePartContributionToDataVolume(const DataPartPtr & part)
{
decreaseDataVolume(part->getBytesOnDisk(), part->rows_count, 1);
}
void MergeTreeData::increaseDataVolume(size_t bytes, size_t rows, size_t parts)
{
total_active_size_bytes.fetch_add(bytes, std::memory_order_acq_rel);
total_active_size_rows.fetch_add(rows, std::memory_order_acq_rel);
total_active_size_parts.fetch_add(parts, std::memory_order_acq_rel);
}
void MergeTreeData::decreaseDataVolume(size_t bytes, size_t rows, size_t parts)
{
total_active_size_bytes.fetch_sub(bytes, std::memory_order_acq_rel);
total_active_size_rows.fetch_sub(rows, std::memory_order_acq_rel);
total_active_size_parts.fetch_sub(parts, std::memory_order_acq_rel);
}
void MergeTreeData::setDataVolume(size_t bytes, size_t rows, size_t parts)
{
total_active_size_bytes.store(bytes, std::memory_order_release);
total_active_size_rows.store(rows, std::memory_order_release);
total_active_size_parts.store(parts, std::memory_order_release);
}
}

View File

@ -945,6 +945,18 @@ private:
virtual void startBackgroundMovesIfNeeded() = 0;
bool allow_nullable_key{};
void addPartContributionToDataVolume(const DataPartPtr & part);
void removePartContributionToDataVolume(const DataPartPtr & part);
void increaseDataVolume(size_t bytes, size_t rows, size_t parts);
void decreaseDataVolume(size_t bytes, size_t rows, size_t parts);
void setDataVolume(size_t bytes, size_t rows, size_t parts);
std::atomic<size_t> total_active_size_bytes = 0;
std::atomic<size_t> total_active_size_rows = 0;
std::atomic<size_t> total_active_size_parts = 0;
};
}