From 16e839ac71065c27567ed1cda94c98bea1b69d8c Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 25 May 2022 14:54:49 +0000 Subject: [PATCH] add profile events for introspection of part types --- src/Common/ProfileEvents.cpp | 7 +++ src/Interpreters/PartLog.cpp | 3 ++ src/Interpreters/PartLog.h | 3 ++ .../MergeTree/MergeFromLogEntryTask.cpp | 1 + .../MergeTree/MergePlainMergeTreeTask.cpp | 1 + src/Storages/MergeTree/MergeTreeData.cpp | 44 +++++++++++++++++++ src/Storages/MergeTree/MergeTreeData.h | 2 + .../MergeTree/MergeTreeDataWriter.cpp | 3 +- src/Storages/MergeTree/MergeTreeSink.cpp | 1 + .../MergeTree/ReplicatedMergeTreeSink.cpp | 1 + .../02306_part_types_profile_events.reference | 7 +++ .../02306_part_types_profile_events.sql | 44 +++++++++++++++++++ 12 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 tests/queries/0_stateless/02306_part_types_profile_events.reference create mode 100644 tests/queries/0_stateless/02306_part_types_profile_events.sql diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 7f3b9788c1f..9fa47ff959c 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -144,6 +144,13 @@ M(MergeTreeDataWriterBlocks, "Number of blocks INSERTed to MergeTree tables. Each block forms a data part of level zero.") \ M(MergeTreeDataWriterBlocksAlreadySorted, "Number of blocks INSERTed to MergeTree tables that appeared to be already sorted.") \ \ + M(InsertedWideParts, "Number of parts inserted in Wide format.") \ + M(InsertedCompactParts, "Number of parts inserted in Compact format.") \ + M(InsertedInMemoryParts, "Number of parts inserted in InMemory format.") \ + M(MergedIntoWideParts, "Number of parts merged into Wide format.") \ + M(MergedIntoCompactParts, "Number of parts merged into Compact format.") \ + M(MergedIntoInMemoryParts, "Number of parts in merged into InMemory format.") \ + \ M(MergeTreeDataProjectionWriterRows, "Number of rows INSERTed to MergeTree tables projection.") \ M(MergeTreeDataProjectionWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables projection.") \ M(MergeTreeDataProjectionWriterCompressedBytes, "Bytes written to filesystem for data INSERTed to MergeTree tables projection.") \ diff --git a/src/Interpreters/PartLog.cpp b/src/Interpreters/PartLog.cpp index ce9aa0c03d1..6d57f6b7045 100644 --- a/src/Interpreters/PartLog.cpp +++ b/src/Interpreters/PartLog.cpp @@ -46,6 +46,7 @@ NamesAndTypesList PartLogElement::getNamesAndTypes() {"table", std::make_shared()}, {"part_name", std::make_shared()}, {"partition_id", std::make_shared()}, + {"part_type", std::make_shared()}, {"disk_name", std::make_shared()}, {"path_on_disk", std::make_shared()}, @@ -80,6 +81,7 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const columns[i++]->insert(table_name); columns[i++]->insert(part_name); columns[i++]->insert(partition_id); + columns[i++]->insert(part_type.toString()); columns[i++]->insert(disk_name); columns[i++]->insert(path_on_disk); @@ -159,6 +161,7 @@ bool PartLog::addNewParts( elem.part_name = part->name; elem.disk_name = part->volume->getDisk()->getName(); elem.path_on_disk = part->getFullPath(); + elem.part_type = part->getType(); elem.bytes_compressed_on_disk = part->getBytesOnDisk(); elem.rows = part->rows_count; diff --git a/src/Interpreters/PartLog.h b/src/Interpreters/PartLog.h index 7582f6fe9e6..470dce09fa0 100644 --- a/src/Interpreters/PartLog.h +++ b/src/Interpreters/PartLog.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -35,6 +36,8 @@ struct PartLogElement String disk_name; String path_on_disk; + MergeTreeDataPartType part_type; + /// Size of the part UInt64 rows = 0; diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index 4b8860aa51d..66abe32ac25 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -322,6 +322,7 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite ProfileEvents::increment(ProfileEvents::ReplicatedPartMerges); write_part_log({}); + storage.incrementMergedPartsProfileEvent(part->getType()); return true; } diff --git a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp index 0146ce4c7b3..c6a719fbc67 100644 --- a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp @@ -117,6 +117,7 @@ void MergePlainMergeTreeTask::finish() new_part = merge_task->getFuture().get(); storage.merger_mutator.renameMergedTemporaryPart(new_part, future_part->parts, txn, nullptr); write_part_log({}); + storage.incrementMergedPartsProfileEvent(new_part->getType()); } } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 50811daa4ab..62c11a31f68 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -93,6 +93,12 @@ namespace ProfileEvents extern const Event DelayedInserts; extern const Event DelayedInsertsMilliseconds; extern const Event DuplicatedInsertedBlocks; + extern const Event InsertedWideParts; + extern const Event InsertedCompactParts; + extern const Event InsertedInMemoryParts; + extern const Event MergedIntoWideParts; + extern const Event MergedIntoCompactParts; + extern const Event MergedIntoInMemoryParts; } namespace CurrentMetrics @@ -1716,6 +1722,7 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa part_log_elem.part_name = part->name; part_log_elem.bytes_compressed_on_disk = part->getBytesOnDisk(); part_log_elem.rows = part->rows_count; + part_log_elem.part_type = part->getType(); part_log->add(part_log_elem); } @@ -6190,6 +6197,7 @@ try part_log_elem.path_on_disk = result_part->getFullPath(); part_log_elem.bytes_compressed_on_disk = result_part->getBytesOnDisk(); part_log_elem.rows = result_part->rows_count; + part_log_elem.part_type = result_part->getType(); } part_log_elem.source_part_names.reserve(source_parts.size()); @@ -6755,6 +6763,42 @@ StorageSnapshotPtr MergeTreeData::getStorageSnapshot(const StorageMetadataPtr & return std::make_shared(*this, metadata_snapshot, object_columns, std::move(snapshot_data)); } +#define FOR_EACH_PART_TYPE(M) \ + M(Wide) \ + M(Compact) \ + M(InMemory) + +#define DECLARE_INCREMENT_EVENT_CASE(Event, Type) \ + case MergeTreeDataPartType::Type: \ + ProfileEvents::increment(ProfileEvents::Event##Type##Parts); \ + break; + +#define DECLARE_INCREMENT_EVENT(value, CASE) \ + switch (value) \ + { \ + FOR_EACH_PART_TYPE(CASE) \ + default: \ + break; \ + } + +void MergeTreeData::incrementInsertedPartsProfileEvent(MergeTreeDataPartType type) +{ + #define DECLARE_INSERTED_EVENT_CASE(Type) DECLARE_INCREMENT_EVENT_CASE(Inserted, Type) + DECLARE_INCREMENT_EVENT(type.getValue(), DECLARE_INSERTED_EVENT_CASE) + #undef DECLARE_INSERTED_EVENT +} + +void MergeTreeData::incrementMergedPartsProfileEvent(MergeTreeDataPartType type) +{ + #define DECLARE_MERGED_EVENT_CASE(Type) DECLARE_INCREMENT_EVENT_CASE(MergedInto, Type) + DECLARE_INCREMENT_EVENT(type.getValue(), DECLARE_MERGED_EVENT_CASE) + #undef DECLARE_MERGED_EVENT +} + +#undef FOR_EACH_PART_TYPE +#undef DECLARE_INCREMENT_EVENT_CASE +#undef DECLARE_INCREMENT_EVENT + CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger() { std::lock_guard lock(storage.currently_submerging_emerging_mutex); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index df37cd000e4..1ba09251f6f 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1224,6 +1224,8 @@ protected: /// Moves part to specified space, used in ALTER ... MOVE ... queries bool movePartsToSpace(const DataPartsVector & parts, SpacePtr space); + static void incrementInsertedPartsProfileEvent(MergeTreeDataPartType type); + static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type); private: /// RAII Wrapper for atomic work with currently moving parts diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index bf247074f57..7e08fb0ccfc 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -451,6 +451,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart( temp_part.streams.emplace_back(std::move(stream)); } } + auto finalizer = out->finalizePartAsync( new_data_part, data_settings->fsync_after_insert, @@ -460,8 +461,6 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart( temp_part.part = new_data_part; temp_part.streams.emplace_back(TemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)}); - /// out.finish(new_data_part, std::move(written_files), sync_on_insert); - ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterRows, block.rows()); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterUncompressedBytes, block.bytes()); ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterCompressedBytes, new_data_part->getBytesOnDisk()); diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 93b9f356595..4dc4d62c2a2 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -137,6 +137,7 @@ void MergeTreeSink::finishDelayedChunk() if (storage.renameTempPartAndAdd(part, context->getCurrentTransaction().get(), &storage.increment, nullptr, storage.getDeduplicationLog(), partition.block_dedup_token)) { PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns); + storage.incrementInsertedPartsProfileEvent(part->getType()); /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. storage.background_operations_assignee.trigger(); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 126d34bcc1d..de893d59b05 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -261,6 +261,7 @@ void ReplicatedMergeTreeSink::finishDelayedChunk(zkutil::ZooKeeperPtr & zookeepe /// Set a special error code if the block is duplicate int error = (deduplicate && part->is_duplicate) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0; PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns, ExecutionStatus(error)); + storage.incrementInsertedPartsProfileEvent(part->getType()); } catch (...) { diff --git a/tests/queries/0_stateless/02306_part_types_profile_events.reference b/tests/queries/0_stateless/02306_part_types_profile_events.reference new file mode 100644 index 00000000000..7b5495f39fe --- /dev/null +++ b/tests/queries/0_stateless/02306_part_types_profile_events.reference @@ -0,0 +1,7 @@ +3 1 2 +2 1 1 +Compact +Compact +Wide +Compact 1 +Wide 1 diff --git a/tests/queries/0_stateless/02306_part_types_profile_events.sql b/tests/queries/0_stateless/02306_part_types_profile_events.sql new file mode 100644 index 00000000000..0ec13bc3827 --- /dev/null +++ b/tests/queries/0_stateless/02306_part_types_profile_events.sql @@ -0,0 +1,44 @@ +DROP TABLE IF EXISTS t_parts_profile_events; + +CREATE TABLE t_parts_profile_events (a UInt32) +ENGINE = MergeTree ORDER BY tuple() +SETTINGS min_rows_for_wide_part = 10, min_bytes_for_wide_part = 0; + +SYSTEM STOP MERGES t_parts_profile_events; + +SET log_comment = '02306_part_types_profile_events'; + +INSERT INTO t_parts_profile_events VALUES (1); +INSERT INTO t_parts_profile_events VALUES (1); + +SYSTEM START MERGES t_parts_profile_events; +OPTIMIZE TABLE t_parts_profile_events FINAL; +SYSTEM STOP MERGES t_parts_profile_events; + +INSERT INTO t_parts_profile_events SELECT number FROM numbers(20); + +SYSTEM START MERGES t_parts_profile_events; +OPTIMIZE TABLE t_parts_profile_events FINAL; +SYSTEM STOP MERGES t_parts_profile_events; + +SYSTEM FLUSH LOGS; + +SELECT count(), sum(ProfileEvents['InsertedWideParts']), sum(ProfileEvents['InsertedCompactParts']) + FROM system.query_log WHERE has(databases, currentDatabase()) + AND log_comment = '02306_part_types_profile_events' + AND query ILIKE 'INSERT INTO%' AND type = 'QueryFinish'; + +SELECT count(), sum(ProfileEvents['MergedIntoWideParts']), sum(ProfileEvents['MergedIntoCompactParts']) + FROM system.query_log WHERE has(databases, currentDatabase()) + AND log_comment = '02306_part_types_profile_events' + AND query ILIKE 'OPTIMIZE TABLE%' AND type = 'QueryFinish'; + +SELECT part_type FROM system.part_log WHERE database = currentDatabase() + AND table = 't_parts_profile_events' AND event_type = 'NewPart' + ORDER BY event_time_microseconds; + +SELECT part_type, count() > 0 FROM system.part_log WHERE database = currentDatabase() + AND table = 't_parts_profile_events' AND event_type = 'MergeParts' + GROUP BY part_type; + +DROP TABLE t_parts_profile_events;