diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index c48e76e1d98..14fdf74b5e0 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -5,6 +5,9 @@ #define APPLY_FOR_METRICS(M) \ M(Query, "Number of executing queries") \ M(Merge, "Number of executing background merges") \ + M(Parts, "Total number of data parts") \ + M(PartsActive, "Number of active data parts") \ + M(PartsInactive, "Number of inactive data parts") \ M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \ M(ReplicatedFetch, "Number of data parts being fetched from replica") \ M(ReplicatedSend, "Number of data parts being sent to replicas") \ diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index b213bb7b6f9..af3e323763e 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -72,6 +72,9 @@ namespace CurrentMetrics { extern const Metric DelayedInserts; extern const Metric BackgroundMovePoolTask; + extern const Metric Parts; + extern const Metric PartsActive; + extern const Metric PartsInactive; } @@ -877,6 +880,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) std::lock_guard loading_lock(mutex); if (!data_parts_indexes.insert(part).second) throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); + + CurrentMetrics::add(CurrentMetrics::Parts); + CurrentMetrics::add(CurrentMetrics::PartsActive); }); } @@ -893,6 +899,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) if (!data_parts_indexes.insert(part).second) throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART); + + CurrentMetrics::add(CurrentMetrics::Parts); + CurrentMetrics::add(CurrentMetrics::PartsActive); } if (has_non_adaptive_parts && has_adaptive_parts && !settings->enable_mixed_granularity_parts) @@ -924,6 +933,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { (*it)->remove_time.store((*it)->modification_time, std::memory_order_relaxed); modifyPartState(it, DataPartState::Outdated); + + CurrentMetrics::sub(CurrentMetrics::PartsActive); + CurrentMetrics::add(CurrentMetrics::PartsInactive); }; (*prev_jt)->assertState({DataPartState::Committed}); @@ -1091,6 +1103,9 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa (*it)->assertState({DataPartState::Deleting}); data_parts_indexes.erase(it); + + CurrentMetrics::sub(CurrentMetrics::Parts); + CurrentMetrics::sub(CurrentMetrics::PartsInactive); } } @@ -1265,10 +1280,15 @@ void MergeTreeData::dropAllData() LOG_TRACE(log, "dropAllData: removing data from memory."); DataPartsVector all_parts(data_parts_by_info.begin(), data_parts_by_info.end()); + DataPartsVector committed_parts = getDataPartsVector({DataPartState::Committed}); data_parts_indexes.clear(); column_sizes.clear(); + CurrentMetrics::sub(CurrentMetrics::Parts, all_parts.size()); + CurrentMetrics::sub(CurrentMetrics::PartsActive, committed_parts.size()); + CurrentMetrics::sub(CurrentMetrics::PartsInactive, all_parts.size() - committed_parts.size()); + /// Tables in atomic databases have UUID and stored in persistent locations. /// No need to drop caches (that are keyed by filesystem path) because collision is not possible. if (!getStorageID().hasUUID()) @@ -1981,6 +2001,9 @@ bool MergeTreeData::renameTempPartAndReplace( auto part_it = data_parts_indexes.insert(part).first; + CurrentMetrics::add(CurrentMetrics::Parts); + CurrentMetrics::add(CurrentMetrics::PartsInactive); + if (out_transaction) { out_transaction->precommitted_parts.insert(part); @@ -1993,10 +2016,16 @@ bool MergeTreeData::renameTempPartAndReplace( covered_part->remove_time.store(current_time, std::memory_order_relaxed); modifyPartState(covered_part, DataPartState::Outdated); removePartContributionToColumnSizes(covered_part); + + CurrentMetrics::sub(CurrentMetrics::PartsActive); + CurrentMetrics::add(CurrentMetrics::PartsInactive); } modifyPartState(part_it, DataPartState::Committed); addPartContributionToColumnSizes(part); + + CurrentMetrics::add(CurrentMetrics::PartsActive); + CurrentMetrics::sub(CurrentMetrics::PartsInactive); } auto part_in_memory = asInMemoryPart(part); @@ -2037,7 +2066,11 @@ void MergeTreeData::removePartsFromWorkingSet(const MergeTreeData::DataPartsVect for (const DataPartPtr & part : remove) { if (part->state == IMergeTreeDataPart::State::Committed) + { removePartContributionToColumnSizes(part); + CurrentMetrics::sub(CurrentMetrics::PartsActive); + CurrentMetrics::add(CurrentMetrics::PartsInactive); + } if (part->state == IMergeTreeDataPart::State::Committed || clear_without_timeout) part->remove_time.store(remove_time, std::memory_order_relaxed); @@ -2063,6 +2096,9 @@ void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(con modifyPartState(part, IMergeTreeDataPart::State::Temporary); /// Erase immediately data_parts_indexes.erase(it_part); + + CurrentMetrics::sub(CurrentMetrics::Parts); + CurrentMetrics::sub(CurrentMetrics::PartsInactive); } } @@ -2150,13 +2186,21 @@ restore_covered) DataPartPtr part = *it_part; if (part->state == DataPartState::Committed) + { removePartContributionToColumnSizes(part); + CurrentMetrics::sub(CurrentMetrics::PartsActive); + CurrentMetrics::add(CurrentMetrics::PartsInactive); + } + modifyPartState(it_part, DataPartState::Deleting); part->renameToDetached(prefix); data_parts_indexes.erase(it_part); + CurrentMetrics::sub(CurrentMetrics::Parts); + CurrentMetrics::sub(CurrentMetrics::PartsInactive); + if (restore_covered && part->info.level == 0) { LOG_WARNING(log, "Will not recover parts covered by zero-level part {}", part->name); @@ -3385,10 +3429,16 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData: covered_part->remove_time.store(current_time, std::memory_order_relaxed); data.modifyPartState(covered_part, DataPartState::Outdated); data.removePartContributionToColumnSizes(covered_part); + + CurrentMetrics::sub(CurrentMetrics::PartsActive); + CurrentMetrics::add(CurrentMetrics::PartsInactive); } data.modifyPartState(part, DataPartState::Committed); data.addPartContributionToColumnSizes(part); + + CurrentMetrics::add(CurrentMetrics::PartsActive); + CurrentMetrics::sub(CurrentMetrics::PartsInactive); } } }