Backport #68015 to 24.8: Add profile events for merges

This commit is contained in:
robot-clickhouse 2024-08-16 21:09:24 +00:00
parent 2adfd03b48
commit 25a6c51c4a
38 changed files with 482 additions and 105 deletions

View File

@ -209,8 +209,35 @@
\ \
M(Merge, "Number of launched background merges.") \ M(Merge, "Number of launched background merges.") \
M(MergedRows, "Rows read for background merges. This is the number of rows before merge.") \ M(MergedRows, "Rows read for background merges. This is the number of rows before merge.") \
M(MergedColumns, "Number of columns merged during the horizontal stage of merges.") \
M(GatheredColumns, "Number of columns gathered during the vertical stage of merges.") \
M(MergedUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) that was read for background merges. This is the number before merge.") \ M(MergedUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) that was read for background merges. This is the number before merge.") \
M(MergesTimeMilliseconds, "Total time spent for background merges.")\ M(MergeTotalMilliseconds, "Total time spent for background merges") \
M(MergeExecuteMilliseconds, "Total busy time spent for execution of background merges") \
M(MergeHorizontalStageTotalMilliseconds, "Total time spent for horizontal stage of background merges") \
M(MergeHorizontalStageExecuteMilliseconds, "Total busy time spent for execution of horizontal stage of background merges") \
M(MergeVerticalStageTotalMilliseconds, "Total time spent for vertical stage of background merges") \
M(MergeVerticalStageExecuteMilliseconds, "Total busy time spent for execution of vertical stage of background merges") \
M(MergeProjectionStageTotalMilliseconds, "Total time spent for projection stage of background merges") \
M(MergeProjectionStageExecuteMilliseconds, "Total busy time spent for execution of projection stage of background merges") \
\
M(MergingSortedMilliseconds, "Total time spent while merging sorted columns") \
M(AggregatingSortedMilliseconds, "Total time spent while aggregating sorted columns") \
M(CollapsingSortedMilliseconds, "Total time spent while collapsing sorted columns") \
M(ReplacingSortedMilliseconds, "Total time spent while replacing sorted columns") \
M(SummingSortedMilliseconds, "Total time spent while summing sorted columns") \
M(VersionedCollapsingSortedMilliseconds, "Total time spent while version collapsing sorted columns") \
M(GatheringColumnMilliseconds, "Total time spent while gathering columns for vertical merge") \
\
M(MutationTotalParts, "Number of total parts for which mutations tried to be applied") \
M(MutationUntouchedParts, "Number of total parts for which mutations tried to be applied but which was completely skipped according to predicate") \
M(MutatedRows, "Rows read for mutations. This is the number of rows before mutation") \
M(MutatedUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) that was read for mutations. This is the number before mutation.") \
M(MutationTotalMilliseconds, "Total time spent for mutations.") \
M(MutationExecuteMilliseconds, "Total busy time spent for execution of mutations.") \
M(MutationAllPartColumns, "Number of times when task to mutate all columns in part was created") \
M(MutationSomePartColumns, "Number of times when task to mutate some columns in part was created") \
M(MutateTaskProjectionsCalculationMicroseconds, "Time spent calculating projections in mutations.") \
\ \
M(MergeTreeDataWriterRows, "Number of rows INSERTed to MergeTree tables.") \ M(MergeTreeDataWriterRows, "Number of rows INSERTed to MergeTree tables.") \
M(MergeTreeDataWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables.") \ M(MergeTreeDataWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables.") \
@ -225,7 +252,6 @@
M(MergeTreeDataWriterProjectionsCalculationMicroseconds, "Time spent calculating projections") \ M(MergeTreeDataWriterProjectionsCalculationMicroseconds, "Time spent calculating projections") \
M(MergeTreeDataProjectionWriterSortingBlocksMicroseconds, "Time spent sorting blocks (for projection it might be a key different from table's sorting key)") \ M(MergeTreeDataProjectionWriterSortingBlocksMicroseconds, "Time spent sorting blocks (for projection it might be a key different from table's sorting key)") \
M(MergeTreeDataProjectionWriterMergingBlocksMicroseconds, "Time spent merging blocks") \ M(MergeTreeDataProjectionWriterMergingBlocksMicroseconds, "Time spent merging blocks") \
M(MutateTaskProjectionsCalculationMicroseconds, "Time spent calculating projections") \
\ \
M(InsertedWideParts, "Number of parts inserted in Wide format.") \ M(InsertedWideParts, "Number of parts inserted in Wide format.") \
M(InsertedCompactParts, "Number of parts inserted in Compact format.") \ M(InsertedCompactParts, "Number of parts inserted in Compact format.") \

View File

@ -3,6 +3,11 @@
#include <Processors/Merges/IMergingTransform.h> #include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h> #include <Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event AggregatingSortedMilliseconds;
}
namespace DB namespace DB
{ {
@ -29,6 +34,11 @@ public:
} }
String getName() const override { return "AggregatingSortedTransform"; } String getName() const override { return "AggregatingSortedTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::AggregatingSortedMilliseconds, "Aggregated sorted", getLogger("AggregatingSortedTransform"));
}
}; };
} }

View File

@ -30,6 +30,8 @@ public:
void consume(Input & input, size_t source_num) override; void consume(Input & input, size_t source_num) override;
Status merge() override; Status merge() override;
MergedStats getMergedStats() const override { return merged_data.getMergedStats(); }
/// Stores information for aggregation of SimpleAggregateFunction columns /// Stores information for aggregation of SimpleAggregateFunction columns
struct SimpleAggregateDescription struct SimpleAggregateDescription
{ {

View File

@ -126,6 +126,9 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
Chunk FinishAggregatingInOrderAlgorithm::prepareToMerge() Chunk FinishAggregatingInOrderAlgorithm::prepareToMerge()
{ {
total_merged_rows += accumulated_rows;
total_merged_bytes += accumulated_bytes;
accumulated_rows = 0; accumulated_rows = 0;
accumulated_bytes = 0; accumulated_bytes = 0;

View File

@ -50,6 +50,8 @@ public:
void consume(Input & input, size_t source_num) override; void consume(Input & input, size_t source_num) override;
Status merge() override; Status merge() override;
MergedStats getMergedStats() const override { return {.bytes = accumulated_bytes, .rows = accumulated_rows, .blocks = chunk_num}; }
private: private:
Chunk prepareToMerge(); Chunk prepareToMerge();
void addToAggregation(); void addToAggregation();
@ -92,6 +94,9 @@ private:
UInt64 chunk_num = 0; UInt64 chunk_num = 0;
size_t accumulated_rows = 0; size_t accumulated_rows = 0;
size_t accumulated_bytes = 0; size_t accumulated_bytes = 0;
size_t total_merged_rows = 0;
size_t total_merged_bytes = 0;
}; };
} }

View File

@ -33,6 +33,8 @@ public:
const char * getName() const override { return "GraphiteRollupSortedAlgorithm"; } const char * getName() const override { return "GraphiteRollupSortedAlgorithm"; }
Status merge() override; Status merge() override;
MergedStats getMergedStats() const override { return merged_data->getMergedStats(); }
struct ColumnsDefinition struct ColumnsDefinition
{ {
size_t path_column_num; size_t path_column_num;

View File

@ -1,7 +1,7 @@
#pragma once #pragma once
#include <Processors/Chunk.h> #include <Processors/Chunk.h>
#include <variant> #include <Common/ProfileEvents.h>
namespace DB namespace DB
{ {
@ -65,6 +65,15 @@ public:
IMergingAlgorithm() = default; IMergingAlgorithm() = default;
virtual ~IMergingAlgorithm() = default; virtual ~IMergingAlgorithm() = default;
struct MergedStats
{
UInt64 bytes = 0;
UInt64 rows = 0;
UInt64 blocks = 0;
};
virtual MergedStats getMergedStats() const = 0;
}; };
// TODO: use when compile with clang which could support it // TODO: use when compile with clang which could support it

View File

@ -16,6 +16,8 @@ public:
void initialize(Inputs inputs) override; void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override; void consume(Input & input, size_t source_num) override;
MergedStats getMergedStats() const override { return merged_data->getMergedStats(); }
private: private:
Block header; Block header;
SortDescription description; SortDescription description;

View File

@ -183,6 +183,8 @@ public:
UInt64 totalAllocatedBytes() const { return total_allocated_bytes; } UInt64 totalAllocatedBytes() const { return total_allocated_bytes; }
UInt64 maxBlockSize() const { return max_block_size; } UInt64 maxBlockSize() const { return max_block_size; }
IMergingAlgorithm::MergedStats getMergedStats() const { return {.bytes = total_allocated_bytes, .rows = total_merged_rows, .blocks = total_chunks}; }
virtual ~MergedData() = default; virtual ~MergedData() = default;
protected: protected:

View File

@ -31,7 +31,7 @@ public:
void consume(Input & input, size_t source_num) override; void consume(Input & input, size_t source_num) override;
Status merge() override; Status merge() override;
const MergedData & getMergedData() const { return merged_data; } MergedStats getMergedStats() const override { return merged_data.getMergedStats(); }
private: private:
Block header; Block header;

View File

@ -30,6 +30,8 @@ public:
void consume(Input & input, size_t source_num) override; void consume(Input & input, size_t source_num) override;
Status merge() override; Status merge() override;
MergedStats getMergedStats() const override { return merged_data.getMergedStats(); }
struct AggregateDescription; struct AggregateDescription;
struct MapDescription; struct MapDescription;

View File

@ -3,6 +3,11 @@
#include <Processors/Merges/IMergingTransform.h> #include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h> #include <Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event CollapsingSortedMilliseconds;
}
namespace DB namespace DB
{ {
@ -36,6 +41,11 @@ public:
} }
String getName() const override { return "CollapsingSortedTransform"; } String getName() const override { return "CollapsingSortedTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::CollapsingSortedMilliseconds, "Collapsed sorted", getLogger("CollapsingSortedTransform"));
}
}; };
} }

View File

@ -2,7 +2,10 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h> #include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/IProcessor.h> #include <Processors/IProcessor.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
namespace DB namespace DB
{ {
@ -110,6 +113,8 @@ public:
void work() override void work() override
{ {
Stopwatch watch{CLOCK_MONOTONIC_COARSE};
if (!state.init_chunks.empty()) if (!state.init_chunks.empty())
algorithm.initialize(std::move(state.init_chunks)); algorithm.initialize(std::move(state.init_chunks));
@ -147,6 +152,8 @@ public:
// std::cerr << "Finished" << std::endl; // std::cerr << "Finished" << std::endl;
state.is_finished = true; state.is_finished = true;
} }
merging_elapsed_ns += watch.elapsedNanoseconds();
} }
protected: protected:
@ -156,7 +163,33 @@ protected:
Algorithm algorithm; Algorithm algorithm;
/// Profile info. /// Profile info.
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; UInt64 merging_elapsed_ns = 0;
void logMergedStats(ProfileEvents::Event elapsed_ms_event, std::string_view transform_message, LoggerPtr log) const
{
auto stats = algorithm.getMergedStats();
UInt64 elapsed_ms = merging_elapsed_ns / 1000000LL;
ProfileEvents::increment(elapsed_ms_event, elapsed_ms);
/// Don't print info for small parts (< 1M rows)
if (stats.rows < 1000000)
return;
double seconds = static_cast<double>(merging_elapsed_ns) / 1000000000ULL;
if (seconds == 0.0)
{
LOG_DEBUG(log, "{}, {} blocks, {} rows, {} bytes in 0 sec.",
transform_message, stats.blocks, stats.rows, stats.bytes);
}
else
{
LOG_DEBUG(log, "{}, {} blocks, {} rows, {} bytes in {} sec., {} rows/sec., {}/sec.",
transform_message, stats.blocks, stats.rows, stats.bytes,
seconds, stats.rows / seconds, ReadableSize(stats.bytes / seconds));
}
}
private: private:
using IMergingTransformBase::state; using IMergingTransformBase::state;

View File

@ -1,9 +1,12 @@
#include <Processors/Merges/MergingSortedTransform.h> #include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Transforms/ColumnGathererTransform.h> #include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/formatReadable.h>
namespace ProfileEvents
{
extern const Event MergingSortedMilliseconds;
}
namespace DB namespace DB
{ {
@ -18,7 +21,6 @@ MergingSortedTransform::MergingSortedTransform(
UInt64 limit_, UInt64 limit_,
bool always_read_till_end_, bool always_read_till_end_,
WriteBuffer * out_row_sources_buf_, WriteBuffer * out_row_sources_buf_,
bool quiet_,
bool use_average_block_sizes, bool use_average_block_sizes,
bool have_all_inputs_) bool have_all_inputs_)
: IMergingTransform( : IMergingTransform(
@ -37,7 +39,6 @@ MergingSortedTransform::MergingSortedTransform(
limit_, limit_,
out_row_sources_buf_, out_row_sources_buf_,
use_average_block_sizes) use_average_block_sizes)
, quiet(quiet_)
{ {
} }
@ -48,22 +49,7 @@ void MergingSortedTransform::onNewInput()
void MergingSortedTransform::onFinish() void MergingSortedTransform::onFinish()
{ {
if (quiet) logMergedStats(ProfileEvents::MergingSortedMilliseconds, "Merged sorted", getLogger("MergingSortedTransform"));
return;
const auto & merged_data = algorithm.getMergedData();
auto log = getLogger("MergingSortedTransform");
double seconds = total_stopwatch.elapsedSeconds();
if (seconds == 0.0)
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in 0 sec.", merged_data.totalChunks(), merged_data.totalMergedRows());
else
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in {} sec., {} rows/sec., {}/sec",
merged_data.totalChunks(), merged_data.totalMergedRows(), seconds,
merged_data.totalMergedRows() / seconds,
ReadableSize(merged_data.totalAllocatedBytes() / seconds));
} }
} }

View File

@ -21,7 +21,6 @@ public:
UInt64 limit_ = 0, UInt64 limit_ = 0,
bool always_read_till_end_ = false, bool always_read_till_end_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr, WriteBuffer * out_row_sources_buf_ = nullptr,
bool quiet_ = false,
bool use_average_block_sizes = false, bool use_average_block_sizes = false,
bool have_all_inputs_ = true); bool have_all_inputs_ = true);
@ -30,9 +29,6 @@ public:
protected: protected:
void onNewInput() override; void onNewInput() override;
void onFinish() override; void onFinish() override;
private:
bool quiet = false;
}; };
} }

View File

@ -3,6 +3,10 @@
#include <Processors/Merges/IMergingTransform.h> #include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h> #include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event ReplacingSortedMilliseconds;
}
namespace DB namespace DB
{ {
@ -38,6 +42,11 @@ public:
} }
String getName() const override { return "ReplacingSorted"; } String getName() const override { return "ReplacingSorted"; }
void onFinish() override
{
logMergedStats(ProfileEvents::ReplacingSortedMilliseconds, "Replaced sorted", getLogger("ReplacingSortedTransform"));
}
}; };
} }

View File

@ -3,6 +3,11 @@
#include <Processors/Merges/IMergingTransform.h> #include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h> #include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event SummingSortedMilliseconds;
}
namespace DB namespace DB
{ {
@ -33,6 +38,11 @@ public:
} }
String getName() const override { return "SummingSortedTransform"; } String getName() const override { return "SummingSortedTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::SummingSortedMilliseconds, "Summed sorted", getLogger("SummingSortedTransform"));
}
}; };
} }

View File

@ -3,6 +3,10 @@
#include <Processors/Merges/IMergingTransform.h> #include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h> #include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
namespace ProfileEvents
{
extern const Event VersionedCollapsingSortedMilliseconds;
}
namespace DB namespace DB
{ {
@ -33,6 +37,11 @@ public:
} }
String getName() const override { return "VersionedCollapsingTransform"; } String getName() const override { return "VersionedCollapsingTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::VersionedCollapsingSortedMilliseconds, "Versioned collapsed sorted", getLogger("VersionedCollapsingTransform"));
}
}; };
} }

View File

@ -1,11 +1,15 @@
#include <Processors/Transforms/ColumnGathererTransform.h> #include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/formatReadable.h> #include <Common/formatReadable.h>
#include <Columns/ColumnSparse.h> #include <Columns/ColumnSparse.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <iomanip>
namespace ProfileEvents
{
extern const Event GatheringColumnMilliseconds;
}
namespace DB namespace DB
{ {
@ -33,6 +37,13 @@ ColumnGathererStream::ColumnGathererStream(
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "There are no streams to gather"); throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "There are no streams to gather");
} }
void ColumnGathererStream::updateStats(const IColumn & column)
{
merged_rows += column.size();
merged_bytes += column.allocatedBytes();
++merged_blocks;
}
void ColumnGathererStream::initialize(Inputs inputs) void ColumnGathererStream::initialize(Inputs inputs)
{ {
Columns source_columns; Columns source_columns;
@ -82,7 +93,9 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
{ {
res.addColumn(source_to_fully_copy->column); res.addColumn(source_to_fully_copy->column);
} }
merged_rows += source_to_fully_copy->size;
updateStats(*source_to_fully_copy->column);
source_to_fully_copy->pos = source_to_fully_copy->size; source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr; source_to_fully_copy = nullptr;
return Status(std::move(res)); return Status(std::move(res));
@ -96,8 +109,7 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
{ {
next_required_source = 0; next_required_source = 0;
Chunk res; Chunk res;
merged_rows += sources.front().column->size(); updateStats(*sources.front().column);
merged_bytes += sources.front().column->allocatedBytes();
res.addColumn(std::move(sources.front().column)); res.addColumn(std::move(sources.front().column));
sources.front().pos = sources.front().size = 0; sources.front().pos = sources.front().size = 0;
return Status(std::move(res)); return Status(std::move(res));
@ -123,8 +135,8 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
if (source_to_fully_copy && result_column->empty()) if (source_to_fully_copy && result_column->empty())
{ {
Chunk res; Chunk res;
merged_rows += source_to_fully_copy->column->size(); updateStats(*source_to_fully_copy->column);
merged_bytes += source_to_fully_copy->column->allocatedBytes();
if (result_column->hasDynamicStructure()) if (result_column->hasDynamicStructure())
{ {
auto col = result_column->cloneEmpty(); auto col = result_column->cloneEmpty();
@ -140,13 +152,13 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
return Status(std::move(res)); return Status(std::move(res));
} }
auto col = result_column->cloneEmpty(); auto return_column = result_column->cloneEmpty();
result_column.swap(col); result_column.swap(return_column);
Chunk res; Chunk res;
merged_rows += col->size(); updateStats(*return_column);
merged_bytes += col->allocatedBytes();
res.addColumn(std::move(col)); res.addColumn(std::move(return_column));
return Status(std::move(res), row_sources_buf.eof() && !source_to_fully_copy); return Status(std::move(res), row_sources_buf.eof() && !source_to_fully_copy);
} }
@ -185,31 +197,10 @@ ColumnGathererTransform::ColumnGathererTransform(
toString(header.columns())); toString(header.columns()));
} }
void ColumnGathererTransform::work()
{
Stopwatch stopwatch;
IMergingTransform<ColumnGathererStream>::work();
elapsed_ns += stopwatch.elapsedNanoseconds();
}
void ColumnGathererTransform::onFinish() void ColumnGathererTransform::onFinish()
{ {
auto merged_rows = algorithm.getMergedRows();
auto merged_bytes = algorithm.getMergedRows();
/// Don't print info for small parts (< 10M rows)
if (merged_rows < 10000000)
return;
double seconds = static_cast<double>(elapsed_ns) / 1000000000ULL;
const auto & column_name = getOutputPort().getHeader().getByPosition(0).name; const auto & column_name = getOutputPort().getHeader().getByPosition(0).name;
logMergedStats(ProfileEvents::GatheringColumnMilliseconds, fmt::format("Gathered column {}", column_name), log);
if (seconds == 0.0)
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.",
column_name, static_cast<double>(merged_bytes) / merged_rows);
else
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
column_name, static_cast<double>(merged_bytes) / merged_rows, seconds,
merged_rows / seconds, ReadableSize(merged_bytes / seconds));
} }
} }

View File

@ -72,10 +72,11 @@ public:
template <typename Column> template <typename Column>
void gather(Column & column_res); void gather(Column & column_res);
UInt64 getMergedRows() const { return merged_rows; } MergedStats getMergedStats() const override { return {.bytes = merged_bytes, .rows = merged_rows, .blocks = merged_blocks}; }
UInt64 getMergedBytes() const { return merged_bytes; }
private: private:
void updateStats(const IColumn & column);
/// Cache required fields /// Cache required fields
struct Source struct Source
{ {
@ -105,6 +106,7 @@ private:
ssize_t next_required_source = -1; ssize_t next_required_source = -1;
UInt64 merged_rows = 0; UInt64 merged_rows = 0;
UInt64 merged_bytes = 0; UInt64 merged_bytes = 0;
UInt64 merged_blocks = 0;
}; };
class ColumnGathererTransform final : public IMergingTransform<ColumnGathererStream> class ColumnGathererTransform final : public IMergingTransform<ColumnGathererStream>
@ -120,12 +122,8 @@ public:
String getName() const override { return "ColumnGathererTransform"; } String getName() const override { return "ColumnGathererTransform"; }
void work() override;
protected: protected:
void onFinish() override; void onFinish() override;
UInt64 elapsed_ns = 0;
LoggerPtr log; LoggerPtr log;
}; };

View File

@ -511,6 +511,16 @@ void MergeJoinAlgorithm::logElapsed(double seconds)
stat.max_blocks_loaded); stat.max_blocks_loaded);
} }
IMergingAlgorithm::MergedStats MergeJoinAlgorithm::getMergedStats() const
{
return
{
.bytes = stat.num_bytes[0] + stat.num_bytes[1],
.rows = stat.num_rows[0] + stat.num_rows[1],
.blocks = stat.num_blocks[0] + stat.num_blocks[1],
};
}
static void prepareChunk(Chunk & chunk) static void prepareChunk(Chunk & chunk)
{ {
if (!chunk) if (!chunk)
@ -547,6 +557,7 @@ void MergeJoinAlgorithm::consume(Input & input, size_t source_num)
{ {
stat.num_blocks[source_num] += 1; stat.num_blocks[source_num] += 1;
stat.num_rows[source_num] += input.chunk.getNumRows(); stat.num_rows[source_num] += input.chunk.getNumRows();
stat.num_bytes[source_num] += input.chunk.allocatedBytes();
} }
prepareChunk(input.chunk); prepareChunk(input.chunk);
@ -1271,7 +1282,7 @@ MergeJoinTransform::MergeJoinTransform(
void MergeJoinTransform::onFinish() void MergeJoinTransform::onFinish()
{ {
algorithm.logElapsed(total_stopwatch.elapsedSeconds()); algorithm.logElapsed(static_cast<double>(merging_elapsed_ns) / 1000000000ULL);
} }
} }

View File

@ -245,6 +245,8 @@ public:
void setAsofInequality(ASOFJoinInequality asof_inequality_); void setAsofInequality(ASOFJoinInequality asof_inequality_);
void logElapsed(double seconds); void logElapsed(double seconds);
MergedStats getMergedStats() const override;
private: private:
std::optional<Status> handleAnyJoinState(); std::optional<Status> handleAnyJoinState();
Status anyJoin(); Status anyJoin();
@ -280,6 +282,7 @@ private:
{ {
size_t num_blocks[2] = {0, 0}; size_t num_blocks[2] = {0, 0};
size_t num_rows[2] = {0, 0}; size_t num_rows[2] = {0, 0};
size_t num_bytes[2] = {0, 0};
size_t max_blocks_loaded = 0; size_t max_blocks_loaded = 0;
}; };

View File

@ -185,7 +185,6 @@ void MergeSortingTransform::consume(Chunk chunk)
if (!external_merging_sorted) if (!external_merging_sorted)
{ {
bool quiet = false;
bool have_all_inputs = false; bool have_all_inputs = false;
bool use_average_block_sizes = false; bool use_average_block_sizes = false;
@ -199,7 +198,6 @@ void MergeSortingTransform::consume(Chunk chunk)
limit, limit,
/*always_read_till_end_=*/ false, /*always_read_till_end_=*/ false,
nullptr, nullptr,
quiet,
use_average_block_sizes, use_average_block_sizes,
have_all_inputs); have_all_inputs);

View File

@ -58,6 +58,16 @@ static void prepareChunk(Chunk & chunk)
chunk.setColumns(std::move(columns), num_rows); chunk.setColumns(std::move(columns), num_rows);
} }
IMergingAlgorithm::MergedStats PasteJoinAlgorithm::getMergedStats() const
{
return
{
.bytes = stat.num_bytes[0] + stat.num_bytes[1],
.rows = stat.num_rows[0] + stat.num_rows[1],
.blocks = stat.num_blocks[0] + stat.num_blocks[1],
};
}
void PasteJoinAlgorithm::initialize(Inputs inputs) void PasteJoinAlgorithm::initialize(Inputs inputs)
{ {
if (inputs.size() != 2) if (inputs.size() != 2)

View File

@ -35,8 +35,7 @@ public:
void initialize(Inputs inputs) override; void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override; void consume(Input & input, size_t source_num) override;
Status merge() override; Status merge() override;
MergedStats getMergedStats() const override;
void logElapsed(double seconds);
private: private:
Chunk createBlockWithDefaults(size_t source_num); Chunk createBlockWithDefaults(size_t source_num);
@ -55,6 +54,7 @@ private:
{ {
size_t num_blocks[2] = {0, 0}; size_t num_blocks[2] = {0, 0};
size_t num_rows[2] = {0, 0}; size_t num_rows[2] = {0, 0};
size_t num_bytes[2] = {0, 0};
size_t max_blocks_loaded = 0; size_t max_blocks_loaded = 0;
}; };

View File

@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
EXPECT_EQ(pipe.numOutputPorts(), 3); EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description, auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true); 8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, true);
pipe.addTransform(std::move(transform)); pipe.addTransform(std::move(transform));
@ -125,7 +125,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
EXPECT_EQ(pipe.numOutputPorts(), 3); EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description, auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true); 8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, true);
pipe.addTransform(std::move(transform)); pipe.addTransform(std::move(transform));

View File

@ -8,10 +8,10 @@
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event MergesTimeMilliseconds;
extern const Event MergedUncompressedBytes; extern const Event MergedUncompressedBytes;
extern const Event MergedRows; extern const Event MergedRows;
extern const Event Merge; extern const Event MutatedRows;
extern const Event MutatedUncompressedBytes;
} }
namespace DB namespace DB
@ -63,18 +63,17 @@ public:
void updateWatch() void updateWatch()
{ {
UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed(); UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed();
ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000);
watch_prev_elapsed = watch_curr_elapsed; watch_prev_elapsed = watch_curr_elapsed;
} }
void operator() (const Progress & value) void operator()(const Progress & value)
{ {
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes); if (merge_list_element_ptr->is_mutation)
if (stage.is_first) updateProfileEvents(value, ProfileEvents::MutatedRows, ProfileEvents::MutatedUncompressedBytes);
{ else
ProfileEvents::increment(ProfileEvents::MergedRows, value.read_rows); updateProfileEvents(value, ProfileEvents::MergedRows, ProfileEvents::MergedUncompressedBytes);
ProfileEvents::increment(ProfileEvents::Merge);
}
updateWatch(); updateWatch();
merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes; merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes;
@ -90,6 +89,14 @@ public:
std::memory_order_relaxed); std::memory_order_relaxed);
} }
} }
private:
void updateProfileEvents(const Progress & value, ProfileEvents::Event rows_event, ProfileEvents::Event bytes_event) const
{
ProfileEvents::increment(bytes_event, value.read_bytes);
if (stage.is_first)
ProfileEvents::increment(rows_event, value.read_rows);
}
}; };
} }

View File

@ -8,6 +8,7 @@
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/ActionBlocker.h> #include <Common/ActionBlocker.h>
#include <Core/Settings.h> #include <Core/Settings.h>
#include <Common/ProfileEvents.h>
#include <Processors/Transforms/CheckSortedTransform.h> #include <Processors/Transforms/CheckSortedTransform.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h> #include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Compression/CompressedWriteBuffer.h> #include <Compression/CompressedWriteBuffer.h>
@ -39,6 +40,18 @@
#include <Interpreters/MergeTreeTransaction.h> #include <Interpreters/MergeTreeTransaction.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
namespace ProfileEvents
{
extern const Event Merge;
extern const Event MergedColumns;
extern const Event GatheredColumns;
extern const Event MergeTotalMilliseconds;
extern const Event MergeExecuteMilliseconds;
extern const Event MergeHorizontalStageExecuteMilliseconds;
extern const Event MergeVerticalStageExecuteMilliseconds;
extern const Event MergeProjectionStageExecuteMilliseconds;
}
namespace DB namespace DB
{ {
@ -169,6 +182,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::extractMergingAndGatheringColu
bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
{ {
ProfileEvents::increment(ProfileEvents::Merge);
String local_tmp_prefix; String local_tmp_prefix;
if (global_ctx->need_prefix) if (global_ctx->need_prefix)
{ {
@ -446,6 +461,13 @@ void MergeTask::addGatheringColumn(GlobalRuntimeContextPtr global_ctx, const Str
MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::getContextForNextStage() MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::getContextForNextStage()
{ {
/// Do not increment for projection stage because time is already accounted in main task.
if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
ProfileEvents::increment(ProfileEvents::MergeHorizontalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
}
auto new_ctx = std::make_shared<VerticalMergeRuntimeContext>(); auto new_ctx = std::make_shared<VerticalMergeRuntimeContext>();
new_ctx->rows_sources_write_buf = std::move(ctx->rows_sources_write_buf); new_ctx->rows_sources_write_buf = std::move(ctx->rows_sources_write_buf);
@ -463,8 +485,14 @@ MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::g
MergeTask::StageRuntimeContextPtr MergeTask::VerticalMergeStage::getContextForNextStage() MergeTask::StageRuntimeContextPtr MergeTask::VerticalMergeStage::getContextForNextStage()
{ {
auto new_ctx = std::make_shared<MergeProjectionsRuntimeContext>(); /// Do not increment for projection stage because time is already accounted in main task.
if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
ProfileEvents::increment(ProfileEvents::MergeVerticalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
}
auto new_ctx = std::make_shared<MergeProjectionsRuntimeContext>();
new_ctx->need_sync = std::move(ctx->need_sync); new_ctx->need_sync = std::move(ctx->need_sync);
ctx.reset(); ctx.reset();
@ -474,9 +502,14 @@ MergeTask::StageRuntimeContextPtr MergeTask::VerticalMergeStage::getContextForNe
bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute() bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
{ {
assert(subtasks_iterator != subtasks.end()); chassert(subtasks_iterator != subtasks.end());
if ((this->**subtasks_iterator)())
return true; Stopwatch watch;
bool res = (this->**subtasks_iterator)();
ctx->elapsed_execute_ns += watch.elapsedNanoseconds();
if (res)
return res;
/// Move to the next subtask in an array of subtasks /// Move to the next subtask in an array of subtasks
++subtasks_iterator; ++subtasks_iterator;
@ -534,7 +567,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
{ {
/// No need to execute this part if it is horizontal merge. /// No need to execute this part if it is horizontal merge.
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical) if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
return false; return false;
@ -784,6 +817,9 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
/// Print overall profiling info. NOTE: it may duplicates previous messages /// Print overall profiling info. NOTE: it may duplicates previous messages
{ {
ProfileEvents::increment(ProfileEvents::MergedColumns, global_ctx->merging_columns.size());
ProfileEvents::increment(ProfileEvents::GatheredColumns, global_ctx->gathering_columns.size());
double elapsed_seconds = global_ctx->merge_list_element_ptr->watch.elapsedSeconds(); double elapsed_seconds = global_ctx->merge_list_element_ptr->watch.elapsedSeconds();
LOG_DEBUG(ctx->log, LOG_DEBUG(ctx->log,
"Merge sorted {} rows, containing {} columns ({} merged, {} gathered) in {} sec., {} rows/sec., {}/sec.", "Merge sorted {} rows, containing {} columns ({} merged, {} gathered) in {} sec., {} rows/sec., {}/sec.",
@ -906,12 +942,29 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const
return false; return false;
} }
MergeTask::StageRuntimeContextPtr MergeTask::MergeProjectionsStage::getContextForNextStage()
{
/// Do not increment for projection stage because time is already accounted in main task.
/// The projection stage has its own empty projection stage which may add a drift of several milliseconds.
if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
ProfileEvents::increment(ProfileEvents::MergeProjectionStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
}
return nullptr;
}
bool MergeTask::VerticalMergeStage::execute() bool MergeTask::VerticalMergeStage::execute()
{ {
assert(subtasks_iterator != subtasks.end()); chassert(subtasks_iterator != subtasks.end());
if ((this->**subtasks_iterator)())
return true; Stopwatch watch;
bool res = (this->**subtasks_iterator)();
ctx->elapsed_execute_ns += watch.elapsedNanoseconds();
if (res)
return res;
/// Move to the next subtask in an array of subtasks /// Move to the next subtask in an array of subtasks
++subtasks_iterator; ++subtasks_iterator;
@ -920,9 +973,14 @@ bool MergeTask::VerticalMergeStage::execute()
bool MergeTask::MergeProjectionsStage::execute() bool MergeTask::MergeProjectionsStage::execute()
{ {
assert(subtasks_iterator != subtasks.end()); chassert(subtasks_iterator != subtasks.end());
if ((this->**subtasks_iterator)())
return true; Stopwatch watch;
bool res = (this->**subtasks_iterator)();
ctx->elapsed_execute_ns += watch.elapsedNanoseconds();
if (res)
return res;
/// Move to the next subtask in an array of subtasks /// Move to the next subtask in an array of subtasks
++subtasks_iterator; ++subtasks_iterator;
@ -969,12 +1027,26 @@ bool MergeTask::VerticalMergeStage::executeVerticalMergeForAllColumns() const
bool MergeTask::execute() bool MergeTask::execute()
{ {
assert(stages_iterator != stages.end()); chassert(stages_iterator != stages.end());
if ((*stages_iterator)->execute()) const auto & current_stage = *stages_iterator;
if (current_stage->execute())
return true; return true;
/// Stage is finished, need initialize context for the next stage /// Stage is finished, need to initialize context for the next stage and update profile events.
auto next_stage_context = (*stages_iterator)->getContextForNextStage();
UInt64 current_elapsed_ms = global_ctx->merge_list_element_ptr->watch.elapsedMilliseconds();
UInt64 stage_elapsed_ms = current_elapsed_ms - global_ctx->prev_elapsed_ms;
global_ctx->prev_elapsed_ms = current_elapsed_ms;
auto next_stage_context = current_stage->getContextForNextStage();
/// Do not increment for projection stage because time is already accounted in main task.
if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(current_stage->getTotalTimeProfileEvent(), stage_elapsed_ms);
ProfileEvents::increment(ProfileEvents::MergeTotalMilliseconds, stage_elapsed_ms);
}
/// Move to the next stage in an array of stages /// Move to the next stage in an array of stages
++stages_iterator; ++stages_iterator;
@ -1099,7 +1171,6 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
/* limit_= */0, /* limit_= */0,
/* always_read_till_end_= */false, /* always_read_till_end_= */false,
ctx->rows_sources_write_buf.get(), ctx->rows_sources_write_buf.get(),
true,
ctx->blocks_are_granules_size); ctx->blocks_are_granules_size);
break; break;

View File

@ -3,6 +3,7 @@
#include <list> #include <list>
#include <memory> #include <memory>
#include <Common/ProfileEvents.h>
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <Compression/CompressedReadBuffer.h> #include <Compression/CompressedReadBuffer.h>
@ -26,6 +27,12 @@
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeIndices.h> #include <Storages/MergeTree/MergeTreeIndices.h>
namespace ProfileEvents
{
extern const Event MergeHorizontalStageTotalMilliseconds;
extern const Event MergeVerticalStageTotalMilliseconds;
extern const Event MergeProjectionStageTotalMilliseconds;
}
namespace DB namespace DB
{ {
@ -134,6 +141,7 @@ private:
{ {
virtual void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) = 0; virtual void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) = 0;
virtual StageRuntimeContextPtr getContextForNextStage() = 0; virtual StageRuntimeContextPtr getContextForNextStage() = 0;
virtual ProfileEvents::Event getTotalTimeProfileEvent() const = 0;
virtual bool execute() = 0; virtual bool execute() = 0;
virtual ~IStage() = default; virtual ~IStage() = default;
}; };
@ -195,6 +203,7 @@ private:
bool need_prefix; bool need_prefix;
scope_guard temporary_directory_lock; scope_guard temporary_directory_lock;
UInt64 prev_elapsed_ms{0};
}; };
using GlobalRuntimeContextPtr = std::shared_ptr<GlobalRuntimeContext>; using GlobalRuntimeContextPtr = std::shared_ptr<GlobalRuntimeContext>;
@ -233,6 +242,7 @@ private:
/// Dependencies for next stages /// Dependencies for next stages
std::list<DB::NameAndTypePair>::const_iterator it_name_and_type; std::list<DB::NameAndTypePair>::const_iterator it_name_and_type;
bool need_sync{false}; bool need_sync{false};
UInt64 elapsed_execute_ns{0};
}; };
using ExecuteAndFinalizeHorizontalPartRuntimeContextPtr = std::shared_ptr<ExecuteAndFinalizeHorizontalPartRuntimeContext>; using ExecuteAndFinalizeHorizontalPartRuntimeContextPtr = std::shared_ptr<ExecuteAndFinalizeHorizontalPartRuntimeContext>;
@ -256,7 +266,6 @@ private:
ExecuteAndFinalizeHorizontalPartSubtasks::const_iterator subtasks_iterator = subtasks.begin(); ExecuteAndFinalizeHorizontalPartSubtasks::const_iterator subtasks_iterator = subtasks.begin();
MergeAlgorithm chooseMergeAlgorithm() const; MergeAlgorithm chooseMergeAlgorithm() const;
void createMergedStream(); void createMergedStream();
void extractMergingAndGatheringColumns() const; void extractMergingAndGatheringColumns() const;
@ -268,6 +277,7 @@ private:
} }
StageRuntimeContextPtr getContextForNextStage() override; StageRuntimeContextPtr getContextForNextStage() override;
ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeHorizontalStageTotalMilliseconds; }
ExecuteAndFinalizeHorizontalPartRuntimeContextPtr ctx; ExecuteAndFinalizeHorizontalPartRuntimeContextPtr ctx;
GlobalRuntimeContextPtr global_ctx; GlobalRuntimeContextPtr global_ctx;
@ -307,6 +317,7 @@ private:
QueryPipeline column_parts_pipeline; QueryPipeline column_parts_pipeline;
std::unique_ptr<PullingPipelineExecutor> executor; std::unique_ptr<PullingPipelineExecutor> executor;
std::unique_ptr<CompressedReadBufferFromFile> rows_sources_read_buf{nullptr}; std::unique_ptr<CompressedReadBufferFromFile> rows_sources_read_buf{nullptr};
UInt64 elapsed_execute_ns{0};
}; };
using VerticalMergeRuntimeContextPtr = std::shared_ptr<VerticalMergeRuntimeContext>; using VerticalMergeRuntimeContextPtr = std::shared_ptr<VerticalMergeRuntimeContext>;
@ -321,6 +332,7 @@ private:
global_ctx = static_pointer_cast<GlobalRuntimeContext>(global); global_ctx = static_pointer_cast<GlobalRuntimeContext>(global);
} }
StageRuntimeContextPtr getContextForNextStage() override; StageRuntimeContextPtr getContextForNextStage() override;
ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeVerticalStageTotalMilliseconds; }
bool prepareVerticalMergeForAllColumns() const; bool prepareVerticalMergeForAllColumns() const;
bool executeVerticalMergeForAllColumns() const; bool executeVerticalMergeForAllColumns() const;
@ -361,6 +373,7 @@ private:
MergeTasks::iterator projections_iterator; MergeTasks::iterator projections_iterator;
LoggerPtr log{getLogger("MergeTask::MergeProjectionsStage")}; LoggerPtr log{getLogger("MergeTask::MergeProjectionsStage")};
UInt64 elapsed_execute_ns{0};
}; };
using MergeProjectionsRuntimeContextPtr = std::shared_ptr<MergeProjectionsRuntimeContext>; using MergeProjectionsRuntimeContextPtr = std::shared_ptr<MergeProjectionsRuntimeContext>;
@ -368,12 +381,15 @@ private:
struct MergeProjectionsStage : public IStage struct MergeProjectionsStage : public IStage
{ {
bool execute() override; bool execute() override;
void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) override void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) override
{ {
ctx = static_pointer_cast<MergeProjectionsRuntimeContext>(local); ctx = static_pointer_cast<MergeProjectionsRuntimeContext>(local);
global_ctx = static_pointer_cast<GlobalRuntimeContext>(global); global_ctx = static_pointer_cast<GlobalRuntimeContext>(global);
} }
StageRuntimeContextPtr getContextForNextStage() override { return nullptr; }
StageRuntimeContextPtr getContextForNextStage() override;
ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeProjectionStageTotalMilliseconds; }
bool mergeMinMaxIndexAndPrepareProjections() const; bool mergeMinMaxIndexAndPrepareProjections() const;
bool executeProjections() const; bool executeProjections() const;

View File

@ -254,6 +254,7 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
LOG_ERROR(log, "{}. Data after mutation is not byte-identical to data on another replicas. " LOG_ERROR(log, "{}. Data after mutation is not byte-identical to data on another replicas. "
"We will download merged part from replica to force byte-identical result.", getCurrentExceptionMessage(false)); "We will download merged part from replica to force byte-identical result.", getCurrentExceptionMessage(false));
mutate_task->updateProfileEvents();
write_part_log(ExecutionStatus::fromCurrentException("", true)); write_part_log(ExecutionStatus::fromCurrentException("", true));
if (storage.getSettings()->detach_not_byte_identical_parts) if (storage.getSettings()->detach_not_byte_identical_parts)
@ -281,6 +282,7 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
*/ */
finish_callback = [storage_ptr = &storage]() { storage_ptr->merge_selecting_task->schedule(); }; finish_callback = [storage_ptr = &storage]() { storage_ptr->merge_selecting_task->schedule(); };
ProfileEvents::increment(ProfileEvents::ReplicatedPartMutations); ProfileEvents::increment(ProfileEvents::ReplicatedPartMutations);
mutate_task->updateProfileEvents();
write_part_log({}); write_part_log({});
return true; return true;

View File

@ -102,6 +102,7 @@ bool MutatePlainMergeTreeTask::executeStep()
transaction.commit(); transaction.commit();
storage.updateMutationEntriesErrors(future_part, true, ""); storage.updateMutationEntriesErrors(future_part, true, "");
mutate_task->updateProfileEvents();
write_part_log({}); write_part_log({});
state = State::NEED_FINISH; state = State::NEED_FINISH;
@ -114,6 +115,7 @@ bool MutatePlainMergeTreeTask::executeStep()
PreformattedMessage exception_message = getCurrentExceptionMessageAndPattern(/* with_stacktrace */ false); PreformattedMessage exception_message = getCurrentExceptionMessageAndPattern(/* with_stacktrace */ false);
LOG_ERROR(getLogger("MutatePlainMergeTreeTask"), exception_message); LOG_ERROR(getLogger("MutatePlainMergeTreeTask"), exception_message);
storage.updateMutationEntriesErrors(future_part, false, exception_message.text); storage.updateMutationEntriesErrors(future_part, false, exception_message.text);
mutate_task->updateProfileEvents();
write_part_log(ExecutionStatus::fromCurrentException("", true)); write_part_log(ExecutionStatus::fromCurrentException("", true));
tryLogCurrentException(__PRETTY_FUNCTION__); tryLogCurrentException(__PRETTY_FUNCTION__);
return false; return false;

View File

@ -38,7 +38,13 @@
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event MutateTaskProjectionsCalculationMicroseconds; extern const Event MutationTotalParts;
extern const Event MutationUntouchedParts;
extern const Event MutationTotalMilliseconds;
extern const Event MutationExecuteMilliseconds;
extern const Event MutationAllPartColumns;
extern const Event MutationSomePartColumns;
extern const Event MutateTaskProjectionsCalculationMicroseconds;
} }
namespace CurrentMetrics namespace CurrentMetrics
@ -1046,6 +1052,7 @@ struct MutationContext
/// Whether we need to count lightweight delete rows in this mutation /// Whether we need to count lightweight delete rows in this mutation
bool count_lightweight_deleted_rows; bool count_lightweight_deleted_rows;
UInt64 execute_elapsed_ns = 0;
}; };
using MutationContextPtr = std::shared_ptr<MutationContext>; using MutationContextPtr = std::shared_ptr<MutationContext>;
@ -2017,6 +2024,9 @@ MutateTask::MutateTask(
bool MutateTask::execute() bool MutateTask::execute()
{ {
Stopwatch watch;
SCOPE_EXIT({ ctx->execute_elapsed_ns += watch.elapsedNanoseconds(); });
switch (state) switch (state)
{ {
case State::NEED_PREPARE: case State::NEED_PREPARE:
@ -2050,6 +2060,15 @@ bool MutateTask::execute()
return false; return false;
} }
void MutateTask::updateProfileEvents() const
{
UInt64 total_elapsed_ms = (*ctx->mutate_entry)->watch.elapsedMilliseconds();
UInt64 execute_elapsed_ms = ctx->execute_elapsed_ns / 1000000UL;
ProfileEvents::increment(ProfileEvents::MutationTotalMilliseconds, total_elapsed_ms);
ProfileEvents::increment(ProfileEvents::MutationExecuteMilliseconds, execute_elapsed_ms);
}
static bool canSkipConversionToNullable(const MergeTreeDataPartPtr & part, const MutationCommand & command) static bool canSkipConversionToNullable(const MergeTreeDataPartPtr & part, const MutationCommand & command)
{ {
if (command.type != MutationCommand::READ_COLUMN) if (command.type != MutationCommand::READ_COLUMN)
@ -2112,6 +2131,7 @@ static bool canSkipMutationCommandForPart(const MergeTreeDataPartPtr & part, con
bool MutateTask::prepare() bool MutateTask::prepare()
{ {
ProfileEvents::increment(ProfileEvents::MutationTotalParts);
MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry); MutationHelpers::checkOperationIsNotCanceled(*ctx->merges_blocker, ctx->mutate_entry);
if (ctx->future_part->parts.size() != 1) if (ctx->future_part->parts.size() != 1)
@ -2174,6 +2194,7 @@ bool MutateTask::prepare()
ctx->temporary_directory_lock = std::move(lock); ctx->temporary_directory_lock = std::move(lock);
} }
ProfileEvents::increment(ProfileEvents::MutationUntouchedParts);
promise.set_value(std::move(part)); promise.set_value(std::move(part));
return false; return false;
} }
@ -2283,6 +2304,7 @@ bool MutateTask::prepare()
ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS; ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS;
task = std::make_unique<MutateAllPartColumnsTask>(ctx); task = std::make_unique<MutateAllPartColumnsTask>(ctx);
ProfileEvents::increment(ProfileEvents::MutationAllPartColumns);
} }
else /// TODO: check that we modify only non-key columns in this case. else /// TODO: check that we modify only non-key columns in this case.
{ {
@ -2322,6 +2344,7 @@ bool MutateTask::prepare()
ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::ASK_KEEPER; ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::ASK_KEEPER;
task = std::make_unique<MutateSomePartColumnsTask>(ctx); task = std::make_unique<MutateSomePartColumnsTask>(ctx);
ProfileEvents::increment(ProfileEvents::MutationSomePartColumns);
} }
return true; return true;

View File

@ -39,6 +39,7 @@ public:
bool need_prefix_); bool need_prefix_);
bool execute(); bool execute();
void updateProfileEvents() const;
std::future<MergeTreeData::MutableDataPartPtr> getFuture() std::future<MergeTreeData::MutableDataPartPtr> getFuture()
{ {

View File

@ -39,7 +39,7 @@ SYSTEM FLUSH LOGS;
SELECT SELECT
if(count() == 2, 'Ok', 'Error: ' || toString(count())), if(count() == 2, 'Ok', 'Error: ' || toString(count())),
if(SUM(ProfileEvents['MergedRows']) == 512, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergedRows']))), if(SUM(ProfileEvents['MutatedRows']) == 512, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MutatedRows']))),
if(SUM(ProfileEvents['FileOpen']) > 1, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['FileOpen']))) if(SUM(ProfileEvents['FileOpen']) > 1, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['FileOpen'])))
FROM system.part_log FROM system.part_log
WHERE event_time > now() - INTERVAL 10 MINUTE WHERE event_time > now() - INTERVAL 10 MINUTE

View File

@ -0,0 +1,3 @@
Horizontal 1 20000 3 0 480000 1 1 1 1
Vertical 1 20000 1 2 480000 1 1 1 1 1 1
Vertical 2 400000 2 6 12800000 1 1 1 1 1 1 1 1 1 1

View File

@ -0,0 +1,90 @@
-- Tags: no-random-settings, no-random-merge-tree-settings
DROP TABLE IF EXISTS t_merge_profile_events_1;
CREATE TABLE t_merge_profile_events_1 (id UInt64, v1 UInt64, v2 UInt64)
ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO t_merge_profile_events_1 SELECT number, number, number FROM numbers(10000);
INSERT INTO t_merge_profile_events_1 SELECT number, number, number FROM numbers(10000);
OPTIMIZE TABLE t_merge_profile_events_1 FINAL;
SYSTEM FLUSH LOGS;
SELECT
merge_algorithm,
ProfileEvents['Merge'],
ProfileEvents['MergedRows'],
ProfileEvents['MergedColumns'],
ProfileEvents['GatheredColumns'],
ProfileEvents['MergedUncompressedBytes'],
ProfileEvents['MergeTotalMilliseconds'] > 0,
ProfileEvents['MergeExecuteMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageTotalMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageExecuteMilliseconds'] > 0
FROM system.part_log WHERE database = currentDatabase() AND table = 't_merge_profile_events_1' AND event_type = 'MergeParts' AND part_name = 'all_1_2_1';
DROP TABLE IF EXISTS t_merge_profile_events_1;
DROP TABLE IF EXISTS t_merge_profile_events_2;
CREATE TABLE t_merge_profile_events_2 (id UInt64, v1 UInt64, v2 UInt64)
ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1;
INSERT INTO t_merge_profile_events_2 SELECT number, number, number FROM numbers(10000);
INSERT INTO t_merge_profile_events_2 SELECT number, number, number FROM numbers(10000);
OPTIMIZE TABLE t_merge_profile_events_2 FINAL;
SYSTEM FLUSH LOGS;
SELECT
merge_algorithm,
ProfileEvents['Merge'],
ProfileEvents['MergedRows'],
ProfileEvents['MergedColumns'],
ProfileEvents['GatheredColumns'],
ProfileEvents['MergedUncompressedBytes'],
ProfileEvents['MergeTotalMilliseconds'] > 0,
ProfileEvents['MergeExecuteMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageTotalMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageExecuteMilliseconds'] > 0,
ProfileEvents['MergeVerticalStageTotalMilliseconds'] > 0,
ProfileEvents['MergeVerticalStageExecuteMilliseconds'] > 0,
FROM system.part_log WHERE database = currentDatabase() AND table = 't_merge_profile_events_2' AND event_type = 'MergeParts' AND part_name = 'all_1_2_1';
DROP TABLE IF EXISTS t_merge_profile_events_2;
DROP TABLE IF EXISTS t_merge_profile_events_3;
CREATE TABLE t_merge_profile_events_3 (id UInt64, v1 UInt64, v2 UInt64, PROJECTION p (SELECT v2, v2 * v2, v2 * 2, v2 * 10, v1 ORDER BY v1))
ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1;
INSERT INTO t_merge_profile_events_3 SELECT number, number, number FROM numbers(100000);
INSERT INTO t_merge_profile_events_3 SELECT number, number, number FROM numbers(100000);
OPTIMIZE TABLE t_merge_profile_events_3 FINAL;
SYSTEM FLUSH LOGS;
SELECT
merge_algorithm,
ProfileEvents['Merge'],
ProfileEvents['MergedRows'],
ProfileEvents['MergedColumns'],
ProfileEvents['GatheredColumns'],
ProfileEvents['MergedUncompressedBytes'],
ProfileEvents['MergeTotalMilliseconds'] > 0,
ProfileEvents['MergeExecuteMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageTotalMilliseconds'] > 0,
ProfileEvents['MergeHorizontalStageExecuteMilliseconds'] > 0,
ProfileEvents['MergeVerticalStageTotalMilliseconds'] > 0,
ProfileEvents['MergeVerticalStageExecuteMilliseconds'] > 0,
ProfileEvents['MergeProjectionStageTotalMilliseconds'] > 0,
ProfileEvents['MergeProjectionStageExecuteMilliseconds'] > 0,
ProfileEvents['MergeExecuteMilliseconds'] <= duration_ms,
ProfileEvents['MergeTotalMilliseconds'] <= duration_ms
FROM system.part_log WHERE database = currentDatabase() AND table = 't_merge_profile_events_3' AND event_type = 'MergeParts' AND part_name = 'all_1_2_1';
DROP TABLE IF EXISTS t_merge_profile_events_3;

View File

@ -0,0 +1,2 @@
3 2 1 10000 160000 0 1 1 1
4 2 1 10000 320000 1 0 1 1

View File

@ -0,0 +1,33 @@
-- Tags: no-random-settings, no-random-merge-tree-settings
DROP TABLE IF EXISTS t_mutate_profile_events;
CREATE TABLE t_mutate_profile_events (key UInt64, id UInt64, v1 UInt64, v2 UInt64)
ENGINE = MergeTree ORDER BY id PARTITION BY key
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO t_mutate_profile_events SELECT 1, number, number, number FROM numbers(10000);
INSERT INTO t_mutate_profile_events SELECT 2, number, number, number FROM numbers(10000);
SET mutations_sync = 2;
ALTER TABLE t_mutate_profile_events UPDATE v1 = 1000 WHERE key = 1;
ALTER TABLE t_mutate_profile_events DELETE WHERE key = 2 AND v2 % 10 = 0;
SYSTEM FLUSH LOGS;
SELECT
splitByChar('_', part_name)[-1] AS version,
sum(ProfileEvents['MutationTotalParts']),
sum(ProfileEvents['MutationUntouchedParts']),
sum(ProfileEvents['MutatedRows']),
sum(ProfileEvents['MutatedUncompressedBytes']),
sum(ProfileEvents['MutationAllPartColumns']),
sum(ProfileEvents['MutationSomePartColumns']),
sum(ProfileEvents['MutationTotalMilliseconds']) > 0,
sum(ProfileEvents['MutationExecuteMilliseconds']) > 0,
FROM system.part_log
WHERE database = currentDatabase() AND table = 't_mutate_profile_events' AND event_type = 'MutatePart'
GROUP BY version ORDER BY version;
DROP TABLE IF EXISTS t_mutate_profile_events