From 42aa967311a55d3da0e1230595b0e0ca9928e777 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Thu, 8 Aug 2024 00:38:05 +0000 Subject: [PATCH] add profile events for merges --- src/Common/ProfileEvents.cpp | 25 +++++- .../Merges/AggregatingSortedTransform.h | 10 +++ .../Algorithms/AggregatingSortedAlgorithm.h | 2 + .../FinishAggregatingInOrderAlgorithm.cpp | 3 + .../FinishAggregatingInOrderAlgorithm.h | 5 ++ .../GraphiteRollupSortedAlgorithm.h | 2 + .../Merges/Algorithms/IMergingAlgorithm.h | 11 ++- .../IMergingAlgorithmWithSharedChunks.h | 2 + src/Processors/Merges/Algorithms/MergedData.h | 2 + .../Algorithms/MergingSortedAlgorithm.h | 2 +- .../Algorithms/SummingSortedAlgorithm.h | 2 + .../Merges/CollapsingSortedTransform.h | 10 +++ src/Processors/Merges/IMergingTransform.h | 35 +++++++- .../Merges/MergingSortedTransform.cpp | 26 ++---- .../Merges/MergingSortedTransform.h | 4 - .../Merges/ReplacingSortedTransform.h | 9 ++ .../Merges/SummingSortedTransform.h | 10 +++ .../Merges/VersionedCollapsingTransform.h | 9 ++ .../Transforms/ColumnGathererTransform.cpp | 57 ++++++------- .../Transforms/ColumnGathererTransform.h | 11 ++- .../Transforms/MergeJoinTransform.cpp | 12 ++- .../Transforms/MergeJoinTransform.h | 2 + .../Transforms/MergeSortingTransform.cpp | 2 - .../Transforms/PasteJoinTransform.cpp | 10 +++ .../Transforms/PasteJoinTransform.h | 3 +- .../gtest_blocks_size_merging_streams.cpp | 4 +- src/Storages/MergeTree/MergeList.h | 1 + src/Storages/MergeTree/MergeProgress.h | 27 +++--- src/Storages/MergeTree/MergeTask.cpp | 84 +++++++++++++++---- src/Storages/MergeTree/MergeTask.h | 20 ++++- src/Storages/MergeTree/MutateTask.cpp | 10 ++- 31 files changed, 308 insertions(+), 104 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index ccdce7ff584..857a08d8a5d 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -210,7 +210,29 @@ M(Merge, "Number of launched background merges.") \ M(MergedRows, "Rows read for background merges. This is the number of rows before merge.") \ M(MergedUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) that was read for background merges. This is the number before merge.") \ - M(MergesTimeMilliseconds, "Total time spent for background merges.")\ + M(MergeTotalMilliseconds, "Total time spent for background merges") \ + M(MergeExecuteMilliseconds, "Total busy time spent for execution of background merges") \ + M(MergeHorizontalStageTotalMilliseconds, "Total time spent for horizontal stage of background merges") \ + M(MergeHorizontalStageExecuteMilliseconds, "Total busy time spent for execution of horizontal stage of background merges") \ + M(MergeVerticalStageTotalMilliseconds, "Total time spent for vertical stage of background merges") \ + M(MergeVerticalStageExecuteMilliseconds, "Total busy time spent for execution of vertical stage of background merges") \ + M(MergeProjectionStageTotalMilliseconds, "Total time spent for projection stage of background merges") \ + M(MergeProjectionStageExecuteMilliseconds, "Total busy time spent for execution of projection stage of background merges") \ + \ + M(MergingSortedMilliseconds, "Total time spent while merging sorted columns") \ + M(AggregatingSortedMilliseconds, "Total time spent while aggregating sorted columns") \ + M(CollapsingSortedMilliseconds, "Total time spent while collapsing sorted columns") \ + M(ReplacingSortedMilliseconds, "Total time spent while replacing sorted columns") \ + M(SummingSortedMilliseconds, "Total time spent while summing sorted columns") \ + M(VersionedCollapsingSortedMilliseconds, "Total time spent while version collapsing sorted columns") \ + M(GatheringColumnMilliseconds, "Total time spent while gathering columns for vertical merge") \ + \ + M(MutationTotalParts, "Number of total parts for which mutations tried to be applied") \ + M(MutationUntouchedParts, "Number of total parts for which mutations tried to be applied but which was completely skipped according to predicate") \ + M(MutatedRows, "Rows read for mutations. This is the number of rows before mutation") \ + M(MutatedUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) that was read for mutations. This is the number before mutation.") \ + M(MutationTimeMilliseconds, "Total time spent for mutations.") \ + M(MutateTaskProjectionsCalculationMicroseconds, "Time spent calculating projections") \ \ M(MergeTreeDataWriterRows, "Number of rows INSERTed to MergeTree tables.") \ M(MergeTreeDataWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables.") \ @@ -225,7 +247,6 @@ M(MergeTreeDataWriterProjectionsCalculationMicroseconds, "Time spent calculating projections") \ M(MergeTreeDataProjectionWriterSortingBlocksMicroseconds, "Time spent sorting blocks (for projection it might be a key different from table's sorting key)") \ M(MergeTreeDataProjectionWriterMergingBlocksMicroseconds, "Time spent merging blocks") \ - M(MutateTaskProjectionsCalculationMicroseconds, "Time spent calculating projections") \ \ M(InsertedWideParts, "Number of parts inserted in Wide format.") \ M(InsertedCompactParts, "Number of parts inserted in Compact format.") \ diff --git a/src/Processors/Merges/AggregatingSortedTransform.h b/src/Processors/Merges/AggregatingSortedTransform.h index c6d7e844c65..c96ad3db525 100644 --- a/src/Processors/Merges/AggregatingSortedTransform.h +++ b/src/Processors/Merges/AggregatingSortedTransform.h @@ -3,6 +3,11 @@ #include #include +namespace ProfileEvents +{ + extern const Event AggregatingSortedMilliseconds; +} + namespace DB { @@ -29,6 +34,11 @@ public: } String getName() const override { return "AggregatingSortedTransform"; } + + void onFinish() override + { + logMergedStats(ProfileEvents::AggregatingSortedMilliseconds, "Aggregated sorted", getLogger("AggregatingSortedTransform")); + } }; } diff --git a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h index 53c103e7038..908994e1851 100644 --- a/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h @@ -30,6 +30,8 @@ public: void consume(Input & input, size_t source_num) override; Status merge() override; + MergedStats getMergedStats() const override { return merged_data.getMergedStats(); } + /// Stores information for aggregation of SimpleAggregateFunction columns struct SimpleAggregateDescription { diff --git a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp index 86675bcb237..477566d8a94 100644 --- a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp +++ b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.cpp @@ -126,6 +126,9 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge() Chunk FinishAggregatingInOrderAlgorithm::prepareToMerge() { + total_merged_rows += accumulated_rows; + total_merged_bytes += accumulated_bytes; + accumulated_rows = 0; accumulated_bytes = 0; diff --git a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h index cc6578e79be..39171c5a978 100644 --- a/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h +++ b/src/Processors/Merges/Algorithms/FinishAggregatingInOrderAlgorithm.h @@ -50,6 +50,8 @@ public: void consume(Input & input, size_t source_num) override; Status merge() override; + MergedStats getMergedStats() const override { return {.bytes = accumulated_bytes, .rows = accumulated_rows, .blocks = chunk_num}; } + private: Chunk prepareToMerge(); void addToAggregation(); @@ -92,6 +94,9 @@ private: UInt64 chunk_num = 0; size_t accumulated_rows = 0; size_t accumulated_bytes = 0; + + size_t total_merged_rows = 0; + size_t total_merged_bytes = 0; }; } diff --git a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.h b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.h index aaa3859efb6..cb2775c968d 100644 --- a/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/GraphiteRollupSortedAlgorithm.h @@ -33,6 +33,8 @@ public: const char * getName() const override { return "GraphiteRollupSortedAlgorithm"; } Status merge() override; + MergedStats getMergedStats() const override { return merged_data->getMergedStats(); } + struct ColumnsDefinition { size_t path_column_num; diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h index 9a1c7c24270..83f11232b71 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithm.h +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithm.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include namespace DB { @@ -65,6 +65,15 @@ public: IMergingAlgorithm() = default; virtual ~IMergingAlgorithm() = default; + + struct MergedStats + { + UInt64 bytes = 0; + UInt64 rows = 0; + UInt64 blocks = 0; + }; + + virtual MergedStats getMergedStats() const = 0; }; // TODO: use when compile with clang which could support it diff --git a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h index bc1aafe93f7..1725108ac5d 100644 --- a/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h +++ b/src/Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h @@ -16,6 +16,8 @@ public: void initialize(Inputs inputs) override; void consume(Input & input, size_t source_num) override; + MergedStats getMergedStats() const override { return merged_data->getMergedStats(); } + private: Block header; SortDescription description; diff --git a/src/Processors/Merges/Algorithms/MergedData.h b/src/Processors/Merges/Algorithms/MergedData.h index c5bb074bb0c..8f47f89d8ee 100644 --- a/src/Processors/Merges/Algorithms/MergedData.h +++ b/src/Processors/Merges/Algorithms/MergedData.h @@ -183,6 +183,8 @@ public: UInt64 totalAllocatedBytes() const { return total_allocated_bytes; } UInt64 maxBlockSize() const { return max_block_size; } + IMergingAlgorithm::MergedStats getMergedStats() const { return {.bytes = total_allocated_bytes, .rows = total_merged_rows, .blocks = total_chunks}; } + virtual ~MergedData() = default; protected: diff --git a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h index bcb111baadf..c889668a38e 100644 --- a/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/MergingSortedAlgorithm.h @@ -31,7 +31,7 @@ public: void consume(Input & input, size_t source_num) override; Status merge() override; - const MergedData & getMergedData() const { return merged_data; } + MergedStats getMergedStats() const override { return merged_data.getMergedStats(); } private: Block header; diff --git a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h index 664b171c4b9..74b4e397831 100644 --- a/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h +++ b/src/Processors/Merges/Algorithms/SummingSortedAlgorithm.h @@ -30,6 +30,8 @@ public: void consume(Input & input, size_t source_num) override; Status merge() override; + MergedStats getMergedStats() const override { return merged_data.getMergedStats(); } + struct AggregateDescription; struct MapDescription; diff --git a/src/Processors/Merges/CollapsingSortedTransform.h b/src/Processors/Merges/CollapsingSortedTransform.h index 4479ac82f66..99fb700abf1 100644 --- a/src/Processors/Merges/CollapsingSortedTransform.h +++ b/src/Processors/Merges/CollapsingSortedTransform.h @@ -3,6 +3,11 @@ #include #include +namespace ProfileEvents +{ + extern const Event CollapsingSortedMilliseconds; +} + namespace DB { @@ -36,6 +41,11 @@ public: } String getName() const override { return "CollapsingSortedTransform"; } + + void onFinish() override + { + logMergedStats(ProfileEvents::CollapsingSortedMilliseconds, "Collapsed sorted", getLogger("CollapsingSortedTransform")); + } }; } diff --git a/src/Processors/Merges/IMergingTransform.h b/src/Processors/Merges/IMergingTransform.h index be629271736..fba5b038618 100644 --- a/src/Processors/Merges/IMergingTransform.h +++ b/src/Processors/Merges/IMergingTransform.h @@ -2,7 +2,10 @@ #include #include +#include #include +#include +#include namespace DB { @@ -110,6 +113,8 @@ public: void work() override { + Stopwatch watch; + if (!state.init_chunks.empty()) algorithm.initialize(std::move(state.init_chunks)); @@ -147,6 +152,8 @@ public: // std::cerr << "Finished" << std::endl; state.is_finished = true; } + + merging_elapsed_ns += watch.elapsedNanoseconds(); } protected: @@ -156,7 +163,33 @@ protected: Algorithm algorithm; /// Profile info. - Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; + UInt64 merging_elapsed_ns = 0; + + void logMergedStats(ProfileEvents::Event elapsed_ms_event, std::string_view transform_message, LoggerPtr log) const + { + auto stats = algorithm.getMergedStats(); + + UInt64 elapsed_ms = merging_elapsed_ns / 1000000LL; + ProfileEvents::increment(elapsed_ms_event, elapsed_ms); + + /// Don't print info for small parts (< 1M rows) + if (stats.rows < 1000000) + return; + + double seconds = static_cast(merging_elapsed_ns) / 1000000000ULL; + + if (seconds == 0.0) + { + LOG_DEBUG(log, "{}: {} blocks, {} rows, {} bytes in 0 sec.", + transform_message, stats.blocks, stats.rows, stats.bytes); + } + else + { + LOG_DEBUG(log, "{}: {} blocks, {} rows, {} bytes in {} sec., {} rows/sec., {}/sec.", + transform_message, stats.blocks, stats.rows, stats.bytes, + seconds, stats.rows / seconds, ReadableSize(stats.bytes / seconds)); + } + } private: using IMergingTransformBase::state; diff --git a/src/Processors/Merges/MergingSortedTransform.cpp b/src/Processors/Merges/MergingSortedTransform.cpp index 338b1ff7935..d2895a2a2e9 100644 --- a/src/Processors/Merges/MergingSortedTransform.cpp +++ b/src/Processors/Merges/MergingSortedTransform.cpp @@ -1,9 +1,12 @@ #include #include #include - #include -#include + +namespace ProfileEvents +{ + extern const Event MergingSortedMilliseconds; +} namespace DB { @@ -18,7 +21,6 @@ MergingSortedTransform::MergingSortedTransform( UInt64 limit_, bool always_read_till_end_, WriteBuffer * out_row_sources_buf_, - bool quiet_, bool use_average_block_sizes, bool have_all_inputs_) : IMergingTransform( @@ -37,7 +39,6 @@ MergingSortedTransform::MergingSortedTransform( limit_, out_row_sources_buf_, use_average_block_sizes) - , quiet(quiet_) { } @@ -48,22 +49,7 @@ void MergingSortedTransform::onNewInput() void MergingSortedTransform::onFinish() { - if (quiet) - return; - - const auto & merged_data = algorithm.getMergedData(); - - auto log = getLogger("MergingSortedTransform"); - - double seconds = total_stopwatch.elapsedSeconds(); - - if (seconds == 0.0) - LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in 0 sec.", merged_data.totalChunks(), merged_data.totalMergedRows()); - else - LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in {} sec., {} rows/sec., {}/sec", - merged_data.totalChunks(), merged_data.totalMergedRows(), seconds, - merged_data.totalMergedRows() / seconds, - ReadableSize(merged_data.totalAllocatedBytes() / seconds)); + logMergedStats(ProfileEvents::MergingSortedMilliseconds, "Merged sorted", getLogger("MergingSortedTransform")); } } diff --git a/src/Processors/Merges/MergingSortedTransform.h b/src/Processors/Merges/MergingSortedTransform.h index 2b53939f309..6e52450efa7 100644 --- a/src/Processors/Merges/MergingSortedTransform.h +++ b/src/Processors/Merges/MergingSortedTransform.h @@ -21,7 +21,6 @@ public: UInt64 limit_ = 0, bool always_read_till_end_ = false, WriteBuffer * out_row_sources_buf_ = nullptr, - bool quiet_ = false, bool use_average_block_sizes = false, bool have_all_inputs_ = true); @@ -30,9 +29,6 @@ public: protected: void onNewInput() override; void onFinish() override; - -private: - bool quiet = false; }; } diff --git a/src/Processors/Merges/ReplacingSortedTransform.h b/src/Processors/Merges/ReplacingSortedTransform.h index 2657987f161..dc262aab9ee 100644 --- a/src/Processors/Merges/ReplacingSortedTransform.h +++ b/src/Processors/Merges/ReplacingSortedTransform.h @@ -3,6 +3,10 @@ #include #include +namespace ProfileEvents +{ + extern const Event ReplacingSortedMilliseconds; +} namespace DB { @@ -38,6 +42,11 @@ public: } String getName() const override { return "ReplacingSorted"; } + + void onFinish() override + { + logMergedStats(ProfileEvents::ReplacingSortedMilliseconds, "Replaced sorted", getLogger("ReplacingSortedTransform")); + } }; } diff --git a/src/Processors/Merges/SummingSortedTransform.h b/src/Processors/Merges/SummingSortedTransform.h index 70ddebfea95..d7c20223d7e 100644 --- a/src/Processors/Merges/SummingSortedTransform.h +++ b/src/Processors/Merges/SummingSortedTransform.h @@ -3,6 +3,11 @@ #include #include +namespace ProfileEvents +{ + extern const Event SummingSortedMilliseconds; +} + namespace DB { @@ -33,6 +38,11 @@ public: } String getName() const override { return "SummingSortedTransform"; } + + void onFinish() override + { + logMergedStats(ProfileEvents::SummingSortedMilliseconds, "Summed sorted", getLogger("SummingSortedTransform")); + } }; } diff --git a/src/Processors/Merges/VersionedCollapsingTransform.h b/src/Processors/Merges/VersionedCollapsingTransform.h index 18244469bd7..32b5d7bf343 100644 --- a/src/Processors/Merges/VersionedCollapsingTransform.h +++ b/src/Processors/Merges/VersionedCollapsingTransform.h @@ -3,6 +3,10 @@ #include #include +namespace ProfileEvents +{ + extern const Event VersionedCollapsingSortedMilliseconds; +} namespace DB { @@ -33,6 +37,11 @@ public: } String getName() const override { return "VersionedCollapsingTransform"; } + + void onFinish() override + { + logMergedStats(ProfileEvents::VersionedCollapsingSortedMilliseconds, "Versioned collapsed sorted", getLogger("VersionedCollapsingTransform")); + } }; } diff --git a/src/Processors/Transforms/ColumnGathererTransform.cpp b/src/Processors/Transforms/ColumnGathererTransform.cpp index 15f8355bdc7..52fa42fdb51 100644 --- a/src/Processors/Transforms/ColumnGathererTransform.cpp +++ b/src/Processors/Transforms/ColumnGathererTransform.cpp @@ -1,11 +1,15 @@ #include +#include #include #include #include #include #include -#include +namespace ProfileEvents +{ + extern const Event GatheringColumnMilliseconds; +} namespace DB { @@ -33,6 +37,13 @@ ColumnGathererStream::ColumnGathererStream( throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "There are no streams to gather"); } +void ColumnGathererStream::updateStats(const IColumn & column) +{ + merged_rows += column.size(); + merged_bytes += column.allocatedBytes(); + ++merged_blocks; +} + void ColumnGathererStream::initialize(Inputs inputs) { Columns source_columns; @@ -82,7 +93,9 @@ IMergingAlgorithm::Status ColumnGathererStream::merge() { res.addColumn(source_to_fully_copy->column); } - merged_rows += source_to_fully_copy->size; + + updateStats(*source_to_fully_copy->column); + source_to_fully_copy->pos = source_to_fully_copy->size; source_to_fully_copy = nullptr; return Status(std::move(res)); @@ -96,8 +109,7 @@ IMergingAlgorithm::Status ColumnGathererStream::merge() { next_required_source = 0; Chunk res; - merged_rows += sources.front().column->size(); - merged_bytes += sources.front().column->allocatedBytes(); + updateStats(*sources.front().column); res.addColumn(std::move(sources.front().column)); sources.front().pos = sources.front().size = 0; return Status(std::move(res)); @@ -123,8 +135,8 @@ IMergingAlgorithm::Status ColumnGathererStream::merge() if (source_to_fully_copy && result_column->empty()) { Chunk res; - merged_rows += source_to_fully_copy->column->size(); - merged_bytes += source_to_fully_copy->column->allocatedBytes(); + updateStats(*source_to_fully_copy->column); + if (result_column->hasDynamicStructure()) { auto col = result_column->cloneEmpty(); @@ -140,13 +152,13 @@ IMergingAlgorithm::Status ColumnGathererStream::merge() return Status(std::move(res)); } - auto col = result_column->cloneEmpty(); - result_column.swap(col); + auto return_column = result_column->cloneEmpty(); + result_column.swap(return_column); Chunk res; - merged_rows += col->size(); - merged_bytes += col->allocatedBytes(); - res.addColumn(std::move(col)); + updateStats(*return_column); + + res.addColumn(std::move(return_column)); return Status(std::move(res), row_sources_buf.eof() && !source_to_fully_copy); } @@ -185,31 +197,10 @@ ColumnGathererTransform::ColumnGathererTransform( toString(header.columns())); } -void ColumnGathererTransform::work() -{ - Stopwatch stopwatch; - IMergingTransform::work(); - elapsed_ns += stopwatch.elapsedNanoseconds(); -} - void ColumnGathererTransform::onFinish() { - auto merged_rows = algorithm.getMergedRows(); - auto merged_bytes = algorithm.getMergedRows(); - /// Don't print info for small parts (< 10M rows) - if (merged_rows < 10000000) - return; - - double seconds = static_cast(elapsed_ns) / 1000000000ULL; const auto & column_name = getOutputPort().getHeader().getByPosition(0).name; - - if (seconds == 0.0) - LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.", - column_name, static_cast(merged_bytes) / merged_rows); - else - LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.", - column_name, static_cast(merged_bytes) / merged_rows, seconds, - merged_rows / seconds, ReadableSize(merged_bytes / seconds)); + logMergedStats(ProfileEvents::GatheringColumnMilliseconds, fmt::format("Gathered column {}", column_name), log); } } diff --git a/src/Processors/Transforms/ColumnGathererTransform.h b/src/Processors/Transforms/ColumnGathererTransform.h index ec5691316ce..a535b2669d0 100644 --- a/src/Processors/Transforms/ColumnGathererTransform.h +++ b/src/Processors/Transforms/ColumnGathererTransform.h @@ -2,6 +2,7 @@ #include #include +#include "base/types.h" #include #include @@ -72,10 +73,11 @@ public: template void gather(Column & column_res); - UInt64 getMergedRows() const { return merged_rows; } - UInt64 getMergedBytes() const { return merged_bytes; } + MergedStats getMergedStats() const override { return {.bytes = merged_bytes, .rows = merged_rows, .blocks = merged_blocks}; } private: + void updateStats(const IColumn & column); + /// Cache required fields struct Source { @@ -105,6 +107,7 @@ private: ssize_t next_required_source = -1; UInt64 merged_rows = 0; UInt64 merged_bytes = 0; + UInt64 merged_blocks = 0; }; class ColumnGathererTransform final : public IMergingTransform @@ -120,12 +123,8 @@ public: String getName() const override { return "ColumnGathererTransform"; } - void work() override; - protected: void onFinish() override; - UInt64 elapsed_ns = 0; - LoggerPtr log; }; diff --git a/src/Processors/Transforms/MergeJoinTransform.cpp b/src/Processors/Transforms/MergeJoinTransform.cpp index e96a75d277b..26601207da8 100644 --- a/src/Processors/Transforms/MergeJoinTransform.cpp +++ b/src/Processors/Transforms/MergeJoinTransform.cpp @@ -511,6 +511,16 @@ void MergeJoinAlgorithm::logElapsed(double seconds) stat.max_blocks_loaded); } +IMergingAlgorithm::MergedStats MergeJoinAlgorithm::getMergedStats() const +{ + return + { + .bytes = 0, + .rows = stat.num_rows[0] + stat.num_rows[1], + .blocks = stat.num_blocks[0] + stat.num_blocks[1], + }; +} + static void prepareChunk(Chunk & chunk) { if (!chunk) @@ -1271,7 +1281,7 @@ MergeJoinTransform::MergeJoinTransform( void MergeJoinTransform::onFinish() { - algorithm.logElapsed(total_stopwatch.elapsedSeconds()); + algorithm.logElapsed(merging_elapsed_ns / 1000000000ULL); } } diff --git a/src/Processors/Transforms/MergeJoinTransform.h b/src/Processors/Transforms/MergeJoinTransform.h index d37a0b9f3ae..841a3f15a92 100644 --- a/src/Processors/Transforms/MergeJoinTransform.h +++ b/src/Processors/Transforms/MergeJoinTransform.h @@ -245,6 +245,8 @@ public: void setAsofInequality(ASOFJoinInequality asof_inequality_); void logElapsed(double seconds); + MergedStats getMergedStats() const override; + private: std::optional handleAnyJoinState(); Status anyJoin(); diff --git a/src/Processors/Transforms/MergeSortingTransform.cpp b/src/Processors/Transforms/MergeSortingTransform.cpp index ede13b29219..c45192e7118 100644 --- a/src/Processors/Transforms/MergeSortingTransform.cpp +++ b/src/Processors/Transforms/MergeSortingTransform.cpp @@ -185,7 +185,6 @@ void MergeSortingTransform::consume(Chunk chunk) if (!external_merging_sorted) { - bool quiet = false; bool have_all_inputs = false; bool use_average_block_sizes = false; @@ -199,7 +198,6 @@ void MergeSortingTransform::consume(Chunk chunk) limit, /*always_read_till_end_=*/ false, nullptr, - quiet, use_average_block_sizes, have_all_inputs); diff --git a/src/Processors/Transforms/PasteJoinTransform.cpp b/src/Processors/Transforms/PasteJoinTransform.cpp index d2fa7eed256..ad01b721726 100644 --- a/src/Processors/Transforms/PasteJoinTransform.cpp +++ b/src/Processors/Transforms/PasteJoinTransform.cpp @@ -58,6 +58,16 @@ static void prepareChunk(Chunk & chunk) chunk.setColumns(std::move(columns), num_rows); } +IMergingAlgorithm::MergedStats PasteJoinAlgorithm::getMergedStats() const +{ + return + { + .bytes = 0, + .rows = stat.num_rows[0] + stat.num_rows[1], + .blocks = stat.num_blocks[0] + stat.num_blocks[1], + }; +} + void PasteJoinAlgorithm::initialize(Inputs inputs) { if (inputs.size() != 2) diff --git a/src/Processors/Transforms/PasteJoinTransform.h b/src/Processors/Transforms/PasteJoinTransform.h index 6a7e65ee27c..fbe85f6993b 100644 --- a/src/Processors/Transforms/PasteJoinTransform.h +++ b/src/Processors/Transforms/PasteJoinTransform.h @@ -35,8 +35,7 @@ public: void initialize(Inputs inputs) override; void consume(Input & input, size_t source_num) override; Status merge() override; - - void logElapsed(double seconds); + MergedStats getMergedStats() const override; private: Chunk createBlockWithDefaults(size_t source_num); diff --git a/src/QueryPipeline/tests/gtest_blocks_size_merging_streams.cpp b/src/QueryPipeline/tests/gtest_blocks_size_merging_streams.cpp index bc22f249f97..f41a447049c 100644 --- a/src/QueryPipeline/tests/gtest_blocks_size_merging_streams.cpp +++ b/src/QueryPipeline/tests/gtest_blocks_size_merging_streams.cpp @@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest) EXPECT_EQ(pipe.numOutputPorts(), 3); auto transform = std::make_shared(pipe.getHeader(), pipe.numOutputPorts(), sort_description, - 8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true); + 8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, true); pipe.addTransform(std::move(transform)); @@ -125,7 +125,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes) EXPECT_EQ(pipe.numOutputPorts(), 3); auto transform = std::make_shared(pipe.getHeader(), pipe.numOutputPorts(), sort_description, - 8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true); + 8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, true); pipe.addTransform(std::move(transform)); diff --git a/src/Storages/MergeTree/MergeList.h b/src/Storages/MergeTree/MergeList.h index d40af6abf43..3a96ba0abae 100644 --- a/src/Storages/MergeTree/MergeList.h +++ b/src/Storages/MergeTree/MergeList.h @@ -6,6 +6,7 @@ #include #include #include +#include "base/types.h" #include #include #include diff --git a/src/Storages/MergeTree/MergeProgress.h b/src/Storages/MergeTree/MergeProgress.h index dd4922051b5..8562e81e761 100644 --- a/src/Storages/MergeTree/MergeProgress.h +++ b/src/Storages/MergeTree/MergeProgress.h @@ -8,10 +8,10 @@ namespace ProfileEvents { - extern const Event MergesTimeMilliseconds; extern const Event MergedUncompressedBytes; extern const Event MergedRows; - extern const Event Merge; + extern const Event MutatedRows; + extern const Event MutatedUncompressedBytes; } namespace DB @@ -63,18 +63,17 @@ public: void updateWatch() { UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed(); - ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000); watch_prev_elapsed = watch_curr_elapsed; } - void operator() (const Progress & value) + void operator()(const Progress & value) { - ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes); - if (stage.is_first) - { - ProfileEvents::increment(ProfileEvents::MergedRows, value.read_rows); - ProfileEvents::increment(ProfileEvents::Merge); - } + if (merge_list_element_ptr->is_mutation) + updateProfileEvents(value, ProfileEvents::MutatedRows, ProfileEvents::MutatedUncompressedBytes); + else + updateProfileEvents(value, ProfileEvents::MergedRows, ProfileEvents::MergedUncompressedBytes); + + updateWatch(); merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes; @@ -90,6 +89,14 @@ public: std::memory_order_relaxed); } } + +private: + void updateProfileEvents(const Progress & value, ProfileEvents::Event rows_event, ProfileEvents::Event bytes_event) const + { + ProfileEvents::increment(bytes_event, value.read_bytes); + if (stage.is_first) + ProfileEvents::increment(rows_event, value.read_rows); + } }; } diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index ce06adf110c..5f178f08ec3 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -5,9 +5,13 @@ #include #include +#include "Common/ElapsedTimeProfileEventIncrement.h" +#include "Common/Logger.h" +#include "Common/Stopwatch.h" #include #include #include +#include #include #include #include @@ -39,6 +43,16 @@ #include #include +namespace ProfileEvents +{ + extern const Event Merge; + extern const Event MergeTotalMilliseconds; + extern const Event MergeExecuteMilliseconds; + extern const Event MergeHorizontalStageExecuteMilliseconds; + extern const Event MergeVerticalStageExecuteMilliseconds; + extern const Event MergeProjectionStageExecuteMilliseconds; +} + namespace DB { @@ -186,6 +200,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() if (isTTLMergeType(global_ctx->future_part->merge_type) && global_ctx->ttl_merges_blocker->isCancelled()) throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts with TTL"); + ProfileEvents::increment(ProfileEvents::Merge); + LOG_DEBUG(ctx->log, "Merging {} parts: from {} to {} into {} with storage {}", global_ctx->future_part->parts.size(), global_ctx->future_part->parts.front()->name, @@ -446,6 +462,9 @@ void MergeTask::addGatheringColumn(GlobalRuntimeContextPtr global_ctx, const Str MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::getContextForNextStage() { + ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL); + ProfileEvents::increment(ProfileEvents::MergeHorizontalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL); + auto new_ctx = std::make_shared(); new_ctx->rows_sources_write_buf = std::move(ctx->rows_sources_write_buf); @@ -463,8 +482,10 @@ MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::g MergeTask::StageRuntimeContextPtr MergeTask::VerticalMergeStage::getContextForNextStage() { - auto new_ctx = std::make_shared(); + ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL); + ProfileEvents::increment(ProfileEvents::MergeVerticalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL); + auto new_ctx = std::make_shared(); new_ctx->need_sync = std::move(ctx->need_sync); ctx.reset(); @@ -474,9 +495,14 @@ MergeTask::StageRuntimeContextPtr MergeTask::VerticalMergeStage::getContextForNe bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute() { - assert(subtasks_iterator != subtasks.end()); - if ((this->**subtasks_iterator)()) - return true; + chassert(subtasks_iterator != subtasks.end()); + + Stopwatch watch; + bool res = (this->**subtasks_iterator)(); + ctx->elapsed_execute_ns += watch.elapsedNanoseconds(); + + if (res) + return res; /// Move to the next subtask in an array of subtasks ++subtasks_iterator; @@ -534,7 +560,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl() bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const { - /// No need to execute this part if it is horizontal merge. + /// No need to execute this part if it is horizontal merge. if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical) return false; @@ -906,12 +932,24 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const return false; } +MergeTask::StageRuntimeContextPtr MergeTask::MergeProjectionsStage::getContextForNextStage() +{ + ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL); + ProfileEvents::increment(ProfileEvents::MergeProjectionStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL); + + return nullptr; +} bool MergeTask::VerticalMergeStage::execute() { - assert(subtasks_iterator != subtasks.end()); - if ((this->**subtasks_iterator)()) - return true; + chassert(subtasks_iterator != subtasks.end()); + + Stopwatch watch; + bool res = (this->**subtasks_iterator)(); + ctx->elapsed_execute_ns += watch.elapsedNanoseconds(); + + if (res) + return res; /// Move to the next subtask in an array of subtasks ++subtasks_iterator; @@ -920,9 +958,14 @@ bool MergeTask::VerticalMergeStage::execute() bool MergeTask::MergeProjectionsStage::execute() { - assert(subtasks_iterator != subtasks.end()); - if ((this->**subtasks_iterator)()) - return true; + chassert(subtasks_iterator != subtasks.end()); + + Stopwatch watch; + bool res = (this->**subtasks_iterator)(); + ctx->elapsed_execute_ns += watch.elapsedNanoseconds(); + + if (res) + return res; /// Move to the next subtask in an array of subtasks ++subtasks_iterator; @@ -969,12 +1012,22 @@ bool MergeTask::VerticalMergeStage::executeVerticalMergeForAllColumns() const bool MergeTask::execute() { - assert(stages_iterator != stages.end()); - if ((*stages_iterator)->execute()) + chassert(stages_iterator != stages.end()); + const auto & current_stage = *stages_iterator; + + if (current_stage->execute()) return true; - /// Stage is finished, need initialize context for the next stage - auto next_stage_context = (*stages_iterator)->getContextForNextStage(); + /// Stage is finished, need to initialize context for the next stage and update profile events. + + UInt64 current_elapsed_ms = global_ctx->merge_list_element_ptr->watch.elapsedMilliseconds(); + UInt64 stage_elapsed_ms = current_elapsed_ms - global_ctx->prev_elapesed_ms; + global_ctx->prev_elapesed_ms = current_elapsed_ms; + + ProfileEvents::increment(current_stage->getTotalTimeProfileEvent(), stage_elapsed_ms); + ProfileEvents::increment(ProfileEvents::MergeTotalMilliseconds, stage_elapsed_ms); + + auto next_stage_context = current_stage->getContextForNextStage(); /// Move to the next stage in an array of stages ++stages_iterator; @@ -1099,7 +1152,6 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() /* limit_= */0, /* always_read_till_end_= */false, ctx->rows_sources_write_buf.get(), - true, ctx->blocks_are_granules_size); break; diff --git a/src/Storages/MergeTree/MergeTask.h b/src/Storages/MergeTree/MergeTask.h index 8b0f2130e8e..979c85482e5 100644 --- a/src/Storages/MergeTree/MergeTask.h +++ b/src/Storages/MergeTree/MergeTask.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -26,6 +27,12 @@ #include #include +namespace ProfileEvents +{ + extern const Event MergeHorizontalStageTotalMilliseconds; + extern const Event MergeVerticalStageTotalMilliseconds; + extern const Event MergeProjectionStageTotalMilliseconds; +} namespace DB { @@ -134,6 +141,7 @@ private: { virtual void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) = 0; virtual StageRuntimeContextPtr getContextForNextStage() = 0; + virtual ProfileEvents::Event getTotalTimeProfileEvent() const = 0; virtual bool execute() = 0; virtual ~IStage() = default; }; @@ -195,6 +203,7 @@ private: bool need_prefix; scope_guard temporary_directory_lock; + UInt64 prev_elapesed_ms{0}; }; using GlobalRuntimeContextPtr = std::shared_ptr; @@ -233,6 +242,7 @@ private: /// Dependencies for next stages std::list::const_iterator it_name_and_type; bool need_sync{false}; + UInt64 elapsed_execute_ns{0}; }; using ExecuteAndFinalizeHorizontalPartRuntimeContextPtr = std::shared_ptr; @@ -256,7 +266,6 @@ private: ExecuteAndFinalizeHorizontalPartSubtasks::const_iterator subtasks_iterator = subtasks.begin(); - MergeAlgorithm chooseMergeAlgorithm() const; void createMergedStream(); void extractMergingAndGatheringColumns() const; @@ -268,6 +277,7 @@ private: } StageRuntimeContextPtr getContextForNextStage() override; + ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeHorizontalStageTotalMilliseconds; } ExecuteAndFinalizeHorizontalPartRuntimeContextPtr ctx; GlobalRuntimeContextPtr global_ctx; @@ -307,6 +317,7 @@ private: QueryPipeline column_parts_pipeline; std::unique_ptr executor; std::unique_ptr rows_sources_read_buf{nullptr}; + UInt64 elapsed_execute_ns{0}; }; using VerticalMergeRuntimeContextPtr = std::shared_ptr; @@ -321,6 +332,7 @@ private: global_ctx = static_pointer_cast(global); } StageRuntimeContextPtr getContextForNextStage() override; + ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeVerticalStageTotalMilliseconds; } bool prepareVerticalMergeForAllColumns() const; bool executeVerticalMergeForAllColumns() const; @@ -361,6 +373,7 @@ private: MergeTasks::iterator projections_iterator; LoggerPtr log{getLogger("MergeTask::MergeProjectionsStage")}; + UInt64 elapsed_execute_ns{0}; }; using MergeProjectionsRuntimeContextPtr = std::shared_ptr; @@ -368,12 +381,15 @@ private: struct MergeProjectionsStage : public IStage { bool execute() override; + void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) override { ctx = static_pointer_cast(local); global_ctx = static_pointer_cast(global); } - StageRuntimeContextPtr getContextForNextStage() override { return nullptr; } + + StageRuntimeContextPtr getContextForNextStage() override; + ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeProjectionStageTotalMilliseconds; } bool mergeMinMaxIndexAndPrepareProjections() const; bool executeProjections() const; diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 9a775db73e2..fe78964a241 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -38,7 +38,10 @@ namespace ProfileEvents { -extern const Event MutateTaskProjectionsCalculationMicroseconds; + extern const Event MutationTotalParts; + extern const Event MutationUntouchedParts; + extern const Event MutationTimeMilliseconds; + extern const Event MutateTaskProjectionsCalculationMicroseconds; } namespace CurrentMetrics @@ -2034,6 +2037,9 @@ bool MutateTask::execute() if (task->executeStep()) return true; + auto total_elapsed_ms = (*ctx->mutate_entry)->watch.elapsedMilliseconds(); + ProfileEvents::increment(ProfileEvents::MutationTimeMilliseconds, total_elapsed_ms); + // The `new_data_part` is a shared pointer and must be moved to allow // part deletion in case it is needed in `MutateFromLogEntryTask::finalize`. // @@ -2118,6 +2124,7 @@ bool MutateTask::prepare() throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to mutate {} parts, not one. " "This is a bug.", ctx->future_part->parts.size()); + ProfileEvents::increment(ProfileEvents::MutationTotalParts); ctx->num_mutations = std::make_unique(CurrentMetrics::PartMutation); auto context_for_reading = Context::createCopy(ctx->context); @@ -2174,6 +2181,7 @@ bool MutateTask::prepare() ctx->temporary_directory_lock = std::move(lock); } + ProfileEvents::increment(ProfileEvents::MutationUntouchedParts); promise.set_value(std::move(part)); return false; }