add profile events for merges

This commit is contained in:
Anton Popov 2024-08-08 00:38:05 +00:00
parent e809dbed60
commit 42aa967311
31 changed files with 308 additions and 104 deletions

View File

@ -210,7 +210,29 @@
M(Merge, "Number of launched background merges.") \
M(MergedRows, "Rows read for background merges. This is the number of rows before merge.") \
M(MergedUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) that was read for background merges. This is the number before merge.") \
M(MergesTimeMilliseconds, "Total time spent for background merges.")\
M(MergeTotalMilliseconds, "Total time spent for background merges") \
M(MergeExecuteMilliseconds, "Total busy time spent for execution of background merges") \
M(MergeHorizontalStageTotalMilliseconds, "Total time spent for horizontal stage of background merges") \
M(MergeHorizontalStageExecuteMilliseconds, "Total busy time spent for execution of horizontal stage of background merges") \
M(MergeVerticalStageTotalMilliseconds, "Total time spent for vertical stage of background merges") \
M(MergeVerticalStageExecuteMilliseconds, "Total busy time spent for execution of vertical stage of background merges") \
M(MergeProjectionStageTotalMilliseconds, "Total time spent for projection stage of background merges") \
M(MergeProjectionStageExecuteMilliseconds, "Total busy time spent for execution of projection stage of background merges") \
\
M(MergingSortedMilliseconds, "Total time spent while merging sorted columns") \
M(AggregatingSortedMilliseconds, "Total time spent while aggregating sorted columns") \
M(CollapsingSortedMilliseconds, "Total time spent while collapsing sorted columns") \
M(ReplacingSortedMilliseconds, "Total time spent while replacing sorted columns") \
M(SummingSortedMilliseconds, "Total time spent while summing sorted columns") \
M(VersionedCollapsingSortedMilliseconds, "Total time spent while version collapsing sorted columns") \
M(GatheringColumnMilliseconds, "Total time spent while gathering columns for vertical merge") \
\
M(MutationTotalParts, "Number of total parts for which mutations tried to be applied") \
M(MutationUntouchedParts, "Number of total parts for which mutations tried to be applied but which was completely skipped according to predicate") \
M(MutatedRows, "Rows read for mutations. This is the number of rows before mutation") \
M(MutatedUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) that was read for mutations. This is the number before mutation.") \
M(MutationTimeMilliseconds, "Total time spent for mutations.") \
M(MutateTaskProjectionsCalculationMicroseconds, "Time spent calculating projections") \
\
M(MergeTreeDataWriterRows, "Number of rows INSERTed to MergeTree tables.") \
M(MergeTreeDataWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables.") \
@ -225,7 +247,6 @@
M(MergeTreeDataWriterProjectionsCalculationMicroseconds, "Time spent calculating projections") \
M(MergeTreeDataProjectionWriterSortingBlocksMicroseconds, "Time spent sorting blocks (for projection it might be a key different from table's sorting key)") \
M(MergeTreeDataProjectionWriterMergingBlocksMicroseconds, "Time spent merging blocks") \
M(MutateTaskProjectionsCalculationMicroseconds, "Time spent calculating projections") \
\
M(InsertedWideParts, "Number of parts inserted in Wide format.") \
M(InsertedCompactParts, "Number of parts inserted in Compact format.") \

View File

@ -3,6 +3,11 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/AggregatingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event AggregatingSortedMilliseconds;
}
namespace DB
{
@ -29,6 +34,11 @@ public:
}
String getName() const override { return "AggregatingSortedTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::AggregatingSortedMilliseconds, "Aggregated sorted", getLogger("AggregatingSortedTransform"));
}
};
}

View File

@ -30,6 +30,8 @@ public:
void consume(Input & input, size_t source_num) override;
Status merge() override;
MergedStats getMergedStats() const override { return merged_data.getMergedStats(); }
/// Stores information for aggregation of SimpleAggregateFunction columns
struct SimpleAggregateDescription
{

View File

@ -126,6 +126,9 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
Chunk FinishAggregatingInOrderAlgorithm::prepareToMerge()
{
total_merged_rows += accumulated_rows;
total_merged_bytes += accumulated_bytes;
accumulated_rows = 0;
accumulated_bytes = 0;

View File

@ -50,6 +50,8 @@ public:
void consume(Input & input, size_t source_num) override;
Status merge() override;
MergedStats getMergedStats() const override { return {.bytes = accumulated_bytes, .rows = accumulated_rows, .blocks = chunk_num}; }
private:
Chunk prepareToMerge();
void addToAggregation();
@ -92,6 +94,9 @@ private:
UInt64 chunk_num = 0;
size_t accumulated_rows = 0;
size_t accumulated_bytes = 0;
size_t total_merged_rows = 0;
size_t total_merged_bytes = 0;
};
}

View File

@ -33,6 +33,8 @@ public:
const char * getName() const override { return "GraphiteRollupSortedAlgorithm"; }
Status merge() override;
MergedStats getMergedStats() const override { return merged_data->getMergedStats(); }
struct ColumnsDefinition
{
size_t path_column_num;

View File

@ -1,7 +1,7 @@
#pragma once
#include <Processors/Chunk.h>
#include <variant>
#include <Common/ProfileEvents.h>
namespace DB
{
@ -65,6 +65,15 @@ public:
IMergingAlgorithm() = default;
virtual ~IMergingAlgorithm() = default;
struct MergedStats
{
UInt64 bytes = 0;
UInt64 rows = 0;
UInt64 blocks = 0;
};
virtual MergedStats getMergedStats() const = 0;
};
// TODO: use when compile with clang which could support it

View File

@ -16,6 +16,8 @@ public:
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
MergedStats getMergedStats() const override { return merged_data->getMergedStats(); }
private:
Block header;
SortDescription description;

View File

@ -183,6 +183,8 @@ public:
UInt64 totalAllocatedBytes() const { return total_allocated_bytes; }
UInt64 maxBlockSize() const { return max_block_size; }
IMergingAlgorithm::MergedStats getMergedStats() const { return {.bytes = total_allocated_bytes, .rows = total_merged_rows, .blocks = total_chunks}; }
virtual ~MergedData() = default;
protected:

View File

@ -31,7 +31,7 @@ public:
void consume(Input & input, size_t source_num) override;
Status merge() override;
const MergedData & getMergedData() const { return merged_data; }
MergedStats getMergedStats() const override { return merged_data.getMergedStats(); }
private:
Block header;

View File

@ -30,6 +30,8 @@ public:
void consume(Input & input, size_t source_num) override;
Status merge() override;
MergedStats getMergedStats() const override { return merged_data.getMergedStats(); }
struct AggregateDescription;
struct MapDescription;

View File

@ -3,6 +3,11 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/CollapsingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event CollapsingSortedMilliseconds;
}
namespace DB
{
@ -36,6 +41,11 @@ public:
}
String getName() const override { return "CollapsingSortedTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::CollapsingSortedMilliseconds, "Collapsed sorted", getLogger("CollapsingSortedTransform"));
}
};
}

View File

@ -2,7 +2,10 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/IProcessor.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
namespace DB
{
@ -110,6 +113,8 @@ public:
void work() override
{
Stopwatch watch;
if (!state.init_chunks.empty())
algorithm.initialize(std::move(state.init_chunks));
@ -147,6 +152,8 @@ public:
// std::cerr << "Finished" << std::endl;
state.is_finished = true;
}
merging_elapsed_ns += watch.elapsedNanoseconds();
}
protected:
@ -156,7 +163,33 @@ protected:
Algorithm algorithm;
/// Profile info.
Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE};
UInt64 merging_elapsed_ns = 0;
void logMergedStats(ProfileEvents::Event elapsed_ms_event, std::string_view transform_message, LoggerPtr log) const
{
auto stats = algorithm.getMergedStats();
UInt64 elapsed_ms = merging_elapsed_ns / 1000000LL;
ProfileEvents::increment(elapsed_ms_event, elapsed_ms);
/// Don't print info for small parts (< 1M rows)
if (stats.rows < 1000000)
return;
double seconds = static_cast<double>(merging_elapsed_ns) / 1000000000ULL;
if (seconds == 0.0)
{
LOG_DEBUG(log, "{}: {} blocks, {} rows, {} bytes in 0 sec.",
transform_message, stats.blocks, stats.rows, stats.bytes);
}
else
{
LOG_DEBUG(log, "{}: {} blocks, {} rows, {} bytes in {} sec., {} rows/sec., {}/sec.",
transform_message, stats.blocks, stats.rows, stats.bytes,
seconds, stats.rows / seconds, ReadableSize(stats.bytes / seconds));
}
}
private:
using IMergingTransformBase::state;

View File

@ -1,9 +1,12 @@
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBuffer.h>
#include <Common/logger_useful.h>
#include <Common/formatReadable.h>
namespace ProfileEvents
{
extern const Event MergingSortedMilliseconds;
}
namespace DB
{
@ -18,7 +21,6 @@ MergingSortedTransform::MergingSortedTransform(
UInt64 limit_,
bool always_read_till_end_,
WriteBuffer * out_row_sources_buf_,
bool quiet_,
bool use_average_block_sizes,
bool have_all_inputs_)
: IMergingTransform(
@ -37,7 +39,6 @@ MergingSortedTransform::MergingSortedTransform(
limit_,
out_row_sources_buf_,
use_average_block_sizes)
, quiet(quiet_)
{
}
@ -48,22 +49,7 @@ void MergingSortedTransform::onNewInput()
void MergingSortedTransform::onFinish()
{
if (quiet)
return;
const auto & merged_data = algorithm.getMergedData();
auto log = getLogger("MergingSortedTransform");
double seconds = total_stopwatch.elapsedSeconds();
if (seconds == 0.0)
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in 0 sec.", merged_data.totalChunks(), merged_data.totalMergedRows());
else
LOG_DEBUG(log, "Merge sorted {} blocks, {} rows in {} sec., {} rows/sec., {}/sec",
merged_data.totalChunks(), merged_data.totalMergedRows(), seconds,
merged_data.totalMergedRows() / seconds,
ReadableSize(merged_data.totalAllocatedBytes() / seconds));
logMergedStats(ProfileEvents::MergingSortedMilliseconds, "Merged sorted", getLogger("MergingSortedTransform"));
}
}

View File

@ -21,7 +21,6 @@ public:
UInt64 limit_ = 0,
bool always_read_till_end_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool quiet_ = false,
bool use_average_block_sizes = false,
bool have_all_inputs_ = true);
@ -30,9 +29,6 @@ public:
protected:
void onNewInput() override;
void onFinish() override;
private:
bool quiet = false;
};
}

View File

@ -3,6 +3,10 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/ReplacingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event ReplacingSortedMilliseconds;
}
namespace DB
{
@ -38,6 +42,11 @@ public:
}
String getName() const override { return "ReplacingSorted"; }
void onFinish() override
{
logMergedStats(ProfileEvents::ReplacingSortedMilliseconds, "Replaced sorted", getLogger("ReplacingSortedTransform"));
}
};
}

View File

@ -3,6 +3,11 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/SummingSortedAlgorithm.h>
namespace ProfileEvents
{
extern const Event SummingSortedMilliseconds;
}
namespace DB
{
@ -33,6 +38,11 @@ public:
}
String getName() const override { return "SummingSortedTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::SummingSortedMilliseconds, "Summed sorted", getLogger("SummingSortedTransform"));
}
};
}

View File

@ -3,6 +3,10 @@
#include <Processors/Merges/IMergingTransform.h>
#include <Processors/Merges/Algorithms/VersionedCollapsingAlgorithm.h>
namespace ProfileEvents
{
extern const Event VersionedCollapsingSortedMilliseconds;
}
namespace DB
{
@ -33,6 +37,11 @@ public:
}
String getName() const override { return "VersionedCollapsingTransform"; }
void onFinish() override
{
logMergedStats(ProfileEvents::VersionedCollapsingSortedMilliseconds, "Versioned collapsed sorted", getLogger("VersionedCollapsingTransform"));
}
};
}

View File

@ -1,11 +1,15 @@
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Common/formatReadable.h>
#include <Columns/ColumnSparse.h>
#include <IO/WriteHelpers.h>
#include <iomanip>
namespace ProfileEvents
{
extern const Event GatheringColumnMilliseconds;
}
namespace DB
{
@ -33,6 +37,13 @@ ColumnGathererStream::ColumnGathererStream(
throw Exception(ErrorCodes::EMPTY_DATA_PASSED, "There are no streams to gather");
}
void ColumnGathererStream::updateStats(const IColumn & column)
{
merged_rows += column.size();
merged_bytes += column.allocatedBytes();
++merged_blocks;
}
void ColumnGathererStream::initialize(Inputs inputs)
{
Columns source_columns;
@ -82,7 +93,9 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
{
res.addColumn(source_to_fully_copy->column);
}
merged_rows += source_to_fully_copy->size;
updateStats(*source_to_fully_copy->column);
source_to_fully_copy->pos = source_to_fully_copy->size;
source_to_fully_copy = nullptr;
return Status(std::move(res));
@ -96,8 +109,7 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
{
next_required_source = 0;
Chunk res;
merged_rows += sources.front().column->size();
merged_bytes += sources.front().column->allocatedBytes();
updateStats(*sources.front().column);
res.addColumn(std::move(sources.front().column));
sources.front().pos = sources.front().size = 0;
return Status(std::move(res));
@ -123,8 +135,8 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
if (source_to_fully_copy && result_column->empty())
{
Chunk res;
merged_rows += source_to_fully_copy->column->size();
merged_bytes += source_to_fully_copy->column->allocatedBytes();
updateStats(*source_to_fully_copy->column);
if (result_column->hasDynamicStructure())
{
auto col = result_column->cloneEmpty();
@ -140,13 +152,13 @@ IMergingAlgorithm::Status ColumnGathererStream::merge()
return Status(std::move(res));
}
auto col = result_column->cloneEmpty();
result_column.swap(col);
auto return_column = result_column->cloneEmpty();
result_column.swap(return_column);
Chunk res;
merged_rows += col->size();
merged_bytes += col->allocatedBytes();
res.addColumn(std::move(col));
updateStats(*return_column);
res.addColumn(std::move(return_column));
return Status(std::move(res), row_sources_buf.eof() && !source_to_fully_copy);
}
@ -185,31 +197,10 @@ ColumnGathererTransform::ColumnGathererTransform(
toString(header.columns()));
}
void ColumnGathererTransform::work()
{
Stopwatch stopwatch;
IMergingTransform<ColumnGathererStream>::work();
elapsed_ns += stopwatch.elapsedNanoseconds();
}
void ColumnGathererTransform::onFinish()
{
auto merged_rows = algorithm.getMergedRows();
auto merged_bytes = algorithm.getMergedRows();
/// Don't print info for small parts (< 10M rows)
if (merged_rows < 10000000)
return;
double seconds = static_cast<double>(elapsed_ns) / 1000000000ULL;
const auto & column_name = getOutputPort().getHeader().getByPosition(0).name;
if (seconds == 0.0)
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.",
column_name, static_cast<double>(merged_bytes) / merged_rows);
else
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
column_name, static_cast<double>(merged_bytes) / merged_rows, seconds,
merged_rows / seconds, ReadableSize(merged_bytes / seconds));
logMergedStats(ProfileEvents::GatheringColumnMilliseconds, fmt::format("Gathered column {}", column_name), log);
}
}

View File

@ -2,6 +2,7 @@
#include <IO/ReadBuffer.h>
#include <Common/PODArray.h>
#include "base/types.h"
#include <Processors/Merges/Algorithms/IMergingAlgorithm.h>
#include <Processors/Merges/IMergingTransform.h>
@ -72,10 +73,11 @@ public:
template <typename Column>
void gather(Column & column_res);
UInt64 getMergedRows() const { return merged_rows; }
UInt64 getMergedBytes() const { return merged_bytes; }
MergedStats getMergedStats() const override { return {.bytes = merged_bytes, .rows = merged_rows, .blocks = merged_blocks}; }
private:
void updateStats(const IColumn & column);
/// Cache required fields
struct Source
{
@ -105,6 +107,7 @@ private:
ssize_t next_required_source = -1;
UInt64 merged_rows = 0;
UInt64 merged_bytes = 0;
UInt64 merged_blocks = 0;
};
class ColumnGathererTransform final : public IMergingTransform<ColumnGathererStream>
@ -120,12 +123,8 @@ public:
String getName() const override { return "ColumnGathererTransform"; }
void work() override;
protected:
void onFinish() override;
UInt64 elapsed_ns = 0;
LoggerPtr log;
};

View File

@ -511,6 +511,16 @@ void MergeJoinAlgorithm::logElapsed(double seconds)
stat.max_blocks_loaded);
}
IMergingAlgorithm::MergedStats MergeJoinAlgorithm::getMergedStats() const
{
return
{
.bytes = 0,
.rows = stat.num_rows[0] + stat.num_rows[1],
.blocks = stat.num_blocks[0] + stat.num_blocks[1],
};
}
static void prepareChunk(Chunk & chunk)
{
if (!chunk)
@ -1271,7 +1281,7 @@ MergeJoinTransform::MergeJoinTransform(
void MergeJoinTransform::onFinish()
{
algorithm.logElapsed(total_stopwatch.elapsedSeconds());
algorithm.logElapsed(merging_elapsed_ns / 1000000000ULL);
}
}

View File

@ -245,6 +245,8 @@ public:
void setAsofInequality(ASOFJoinInequality asof_inequality_);
void logElapsed(double seconds);
MergedStats getMergedStats() const override;
private:
std::optional<Status> handleAnyJoinState();
Status anyJoin();

View File

@ -185,7 +185,6 @@ void MergeSortingTransform::consume(Chunk chunk)
if (!external_merging_sorted)
{
bool quiet = false;
bool have_all_inputs = false;
bool use_average_block_sizes = false;
@ -199,7 +198,6 @@ void MergeSortingTransform::consume(Chunk chunk)
limit,
/*always_read_till_end_=*/ false,
nullptr,
quiet,
use_average_block_sizes,
have_all_inputs);

View File

@ -58,6 +58,16 @@ static void prepareChunk(Chunk & chunk)
chunk.setColumns(std::move(columns), num_rows);
}
IMergingAlgorithm::MergedStats PasteJoinAlgorithm::getMergedStats() const
{
return
{
.bytes = 0,
.rows = stat.num_rows[0] + stat.num_rows[1],
.blocks = stat.num_blocks[0] + stat.num_blocks[1],
};
}
void PasteJoinAlgorithm::initialize(Inputs inputs)
{
if (inputs.size() != 2)

View File

@ -35,8 +35,7 @@ public:
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
Status merge() override;
void logElapsed(double seconds);
MergedStats getMergedStats() const override;
private:
Chunk createBlockWithDefaults(size_t source_num);

View File

@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, true);
pipe.addTransform(std::move(transform));
@ -125,7 +125,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, true);
pipe.addTransform(std::move(transform));

View File

@ -6,6 +6,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/ThreadStatus.h>
#include "base/types.h"
#include <Storages/MergeTree/MergeType.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>

View File

@ -8,10 +8,10 @@
namespace ProfileEvents
{
extern const Event MergesTimeMilliseconds;
extern const Event MergedUncompressedBytes;
extern const Event MergedRows;
extern const Event Merge;
extern const Event MutatedRows;
extern const Event MutatedUncompressedBytes;
}
namespace DB
@ -63,18 +63,17 @@ public:
void updateWatch()
{
UInt64 watch_curr_elapsed = merge_list_element_ptr->watch.elapsed();
ProfileEvents::increment(ProfileEvents::MergesTimeMilliseconds, (watch_curr_elapsed - watch_prev_elapsed) / 1000000);
watch_prev_elapsed = watch_curr_elapsed;
}
void operator() (const Progress & value)
void operator()(const Progress & value)
{
ProfileEvents::increment(ProfileEvents::MergedUncompressedBytes, value.read_bytes);
if (stage.is_first)
{
ProfileEvents::increment(ProfileEvents::MergedRows, value.read_rows);
ProfileEvents::increment(ProfileEvents::Merge);
}
if (merge_list_element_ptr->is_mutation)
updateProfileEvents(value, ProfileEvents::MutatedRows, ProfileEvents::MutatedUncompressedBytes);
else
updateProfileEvents(value, ProfileEvents::MergedRows, ProfileEvents::MergedUncompressedBytes);
updateWatch();
merge_list_element_ptr->bytes_read_uncompressed += value.read_bytes;
@ -90,6 +89,14 @@ public:
std::memory_order_relaxed);
}
}
private:
void updateProfileEvents(const Progress & value, ProfileEvents::Event rows_event, ProfileEvents::Event bytes_event) const
{
ProfileEvents::increment(bytes_event, value.read_bytes);
if (stage.is_first)
ProfileEvents::increment(rows_event, value.read_rows);
}
};
}

View File

@ -5,9 +5,13 @@
#include <memory>
#include <fmt/format.h>
#include "Common/ElapsedTimeProfileEventIncrement.h"
#include "Common/Logger.h"
#include "Common/Stopwatch.h"
#include <Common/logger_useful.h>
#include <Common/ActionBlocker.h>
#include <Core/Settings.h>
#include <Common/ProfileEvents.h>
#include <Processors/Transforms/CheckSortedTransform.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Compression/CompressedWriteBuffer.h>
@ -39,6 +43,16 @@
#include <Interpreters/MergeTreeTransaction.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace ProfileEvents
{
extern const Event Merge;
extern const Event MergeTotalMilliseconds;
extern const Event MergeExecuteMilliseconds;
extern const Event MergeHorizontalStageExecuteMilliseconds;
extern const Event MergeVerticalStageExecuteMilliseconds;
extern const Event MergeProjectionStageExecuteMilliseconds;
}
namespace DB
{
@ -186,6 +200,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (isTTLMergeType(global_ctx->future_part->merge_type) && global_ctx->ttl_merges_blocker->isCancelled())
throw Exception(ErrorCodes::ABORTED, "Cancelled merging parts with TTL");
ProfileEvents::increment(ProfileEvents::Merge);
LOG_DEBUG(ctx->log, "Merging {} parts: from {} to {} into {} with storage {}",
global_ctx->future_part->parts.size(),
global_ctx->future_part->parts.front()->name,
@ -446,6 +462,9 @@ void MergeTask::addGatheringColumn(GlobalRuntimeContextPtr global_ctx, const Str
MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::getContextForNextStage()
{
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
ProfileEvents::increment(ProfileEvents::MergeHorizontalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
auto new_ctx = std::make_shared<VerticalMergeRuntimeContext>();
new_ctx->rows_sources_write_buf = std::move(ctx->rows_sources_write_buf);
@ -463,8 +482,10 @@ MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::g
MergeTask::StageRuntimeContextPtr MergeTask::VerticalMergeStage::getContextForNextStage()
{
auto new_ctx = std::make_shared<MergeProjectionsRuntimeContext>();
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
ProfileEvents::increment(ProfileEvents::MergeVerticalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
auto new_ctx = std::make_shared<MergeProjectionsRuntimeContext>();
new_ctx->need_sync = std::move(ctx->need_sync);
ctx.reset();
@ -474,9 +495,14 @@ MergeTask::StageRuntimeContextPtr MergeTask::VerticalMergeStage::getContextForNe
bool MergeTask::ExecuteAndFinalizeHorizontalPart::execute()
{
assert(subtasks_iterator != subtasks.end());
if ((this->**subtasks_iterator)())
return true;
chassert(subtasks_iterator != subtasks.end());
Stopwatch watch;
bool res = (this->**subtasks_iterator)();
ctx->elapsed_execute_ns += watch.elapsedNanoseconds();
if (res)
return res;
/// Move to the next subtask in an array of subtasks
++subtasks_iterator;
@ -534,7 +560,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::executeImpl()
bool MergeTask::VerticalMergeStage::prepareVerticalMergeForAllColumns() const
{
/// No need to execute this part if it is horizontal merge.
/// No need to execute this part if it is horizontal merge.
if (global_ctx->chosen_merge_algorithm != MergeAlgorithm::Vertical)
return false;
@ -906,12 +932,24 @@ bool MergeTask::MergeProjectionsStage::finalizeProjectionsAndWholeMerge() const
return false;
}
MergeTask::StageRuntimeContextPtr MergeTask::MergeProjectionsStage::getContextForNextStage()
{
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
ProfileEvents::increment(ProfileEvents::MergeProjectionStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
return nullptr;
}
bool MergeTask::VerticalMergeStage::execute()
{
assert(subtasks_iterator != subtasks.end());
if ((this->**subtasks_iterator)())
return true;
chassert(subtasks_iterator != subtasks.end());
Stopwatch watch;
bool res = (this->**subtasks_iterator)();
ctx->elapsed_execute_ns += watch.elapsedNanoseconds();
if (res)
return res;
/// Move to the next subtask in an array of subtasks
++subtasks_iterator;
@ -920,9 +958,14 @@ bool MergeTask::VerticalMergeStage::execute()
bool MergeTask::MergeProjectionsStage::execute()
{
assert(subtasks_iterator != subtasks.end());
if ((this->**subtasks_iterator)())
return true;
chassert(subtasks_iterator != subtasks.end());
Stopwatch watch;
bool res = (this->**subtasks_iterator)();
ctx->elapsed_execute_ns += watch.elapsedNanoseconds();
if (res)
return res;
/// Move to the next subtask in an array of subtasks
++subtasks_iterator;
@ -969,12 +1012,22 @@ bool MergeTask::VerticalMergeStage::executeVerticalMergeForAllColumns() const
bool MergeTask::execute()
{
assert(stages_iterator != stages.end());
if ((*stages_iterator)->execute())
chassert(stages_iterator != stages.end());
const auto & current_stage = *stages_iterator;
if (current_stage->execute())
return true;
/// Stage is finished, need initialize context for the next stage
auto next_stage_context = (*stages_iterator)->getContextForNextStage();
/// Stage is finished, need to initialize context for the next stage and update profile events.
UInt64 current_elapsed_ms = global_ctx->merge_list_element_ptr->watch.elapsedMilliseconds();
UInt64 stage_elapsed_ms = current_elapsed_ms - global_ctx->prev_elapesed_ms;
global_ctx->prev_elapesed_ms = current_elapsed_ms;
ProfileEvents::increment(current_stage->getTotalTimeProfileEvent(), stage_elapsed_ms);
ProfileEvents::increment(ProfileEvents::MergeTotalMilliseconds, stage_elapsed_ms);
auto next_stage_context = current_stage->getContextForNextStage();
/// Move to the next stage in an array of stages
++stages_iterator;
@ -1099,7 +1152,6 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
/* limit_= */0,
/* always_read_till_end_= */false,
ctx->rows_sources_write_buf.get(),
true,
ctx->blocks_are_granules_size);
break;

View File

@ -3,6 +3,7 @@
#include <list>
#include <memory>
#include <Common/ProfileEvents.h>
#include <Common/filesystemHelpers.h>
#include <Compression/CompressedReadBuffer.h>
@ -26,6 +27,12 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
namespace ProfileEvents
{
extern const Event MergeHorizontalStageTotalMilliseconds;
extern const Event MergeVerticalStageTotalMilliseconds;
extern const Event MergeProjectionStageTotalMilliseconds;
}
namespace DB
{
@ -134,6 +141,7 @@ private:
{
virtual void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) = 0;
virtual StageRuntimeContextPtr getContextForNextStage() = 0;
virtual ProfileEvents::Event getTotalTimeProfileEvent() const = 0;
virtual bool execute() = 0;
virtual ~IStage() = default;
};
@ -195,6 +203,7 @@ private:
bool need_prefix;
scope_guard temporary_directory_lock;
UInt64 prev_elapesed_ms{0};
};
using GlobalRuntimeContextPtr = std::shared_ptr<GlobalRuntimeContext>;
@ -233,6 +242,7 @@ private:
/// Dependencies for next stages
std::list<DB::NameAndTypePair>::const_iterator it_name_and_type;
bool need_sync{false};
UInt64 elapsed_execute_ns{0};
};
using ExecuteAndFinalizeHorizontalPartRuntimeContextPtr = std::shared_ptr<ExecuteAndFinalizeHorizontalPartRuntimeContext>;
@ -256,7 +266,6 @@ private:
ExecuteAndFinalizeHorizontalPartSubtasks::const_iterator subtasks_iterator = subtasks.begin();
MergeAlgorithm chooseMergeAlgorithm() const;
void createMergedStream();
void extractMergingAndGatheringColumns() const;
@ -268,6 +277,7 @@ private:
}
StageRuntimeContextPtr getContextForNextStage() override;
ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeHorizontalStageTotalMilliseconds; }
ExecuteAndFinalizeHorizontalPartRuntimeContextPtr ctx;
GlobalRuntimeContextPtr global_ctx;
@ -307,6 +317,7 @@ private:
QueryPipeline column_parts_pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
std::unique_ptr<CompressedReadBufferFromFile> rows_sources_read_buf{nullptr};
UInt64 elapsed_execute_ns{0};
};
using VerticalMergeRuntimeContextPtr = std::shared_ptr<VerticalMergeRuntimeContext>;
@ -321,6 +332,7 @@ private:
global_ctx = static_pointer_cast<GlobalRuntimeContext>(global);
}
StageRuntimeContextPtr getContextForNextStage() override;
ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeVerticalStageTotalMilliseconds; }
bool prepareVerticalMergeForAllColumns() const;
bool executeVerticalMergeForAllColumns() const;
@ -361,6 +373,7 @@ private:
MergeTasks::iterator projections_iterator;
LoggerPtr log{getLogger("MergeTask::MergeProjectionsStage")};
UInt64 elapsed_execute_ns{0};
};
using MergeProjectionsRuntimeContextPtr = std::shared_ptr<MergeProjectionsRuntimeContext>;
@ -368,12 +381,15 @@ private:
struct MergeProjectionsStage : public IStage
{
bool execute() override;
void setRuntimeContext(StageRuntimeContextPtr local, StageRuntimeContextPtr global) override
{
ctx = static_pointer_cast<MergeProjectionsRuntimeContext>(local);
global_ctx = static_pointer_cast<GlobalRuntimeContext>(global);
}
StageRuntimeContextPtr getContextForNextStage() override { return nullptr; }
StageRuntimeContextPtr getContextForNextStage() override;
ProfileEvents::Event getTotalTimeProfileEvent() const override { return ProfileEvents::MergeProjectionStageTotalMilliseconds; }
bool mergeMinMaxIndexAndPrepareProjections() const;
bool executeProjections() const;

View File

@ -38,7 +38,10 @@
namespace ProfileEvents
{
extern const Event MutateTaskProjectionsCalculationMicroseconds;
extern const Event MutationTotalParts;
extern const Event MutationUntouchedParts;
extern const Event MutationTimeMilliseconds;
extern const Event MutateTaskProjectionsCalculationMicroseconds;
}
namespace CurrentMetrics
@ -2034,6 +2037,9 @@ bool MutateTask::execute()
if (task->executeStep())
return true;
auto total_elapsed_ms = (*ctx->mutate_entry)->watch.elapsedMilliseconds();
ProfileEvents::increment(ProfileEvents::MutationTimeMilliseconds, total_elapsed_ms);
// The `new_data_part` is a shared pointer and must be moved to allow
// part deletion in case it is needed in `MutateFromLogEntryTask::finalize`.
//
@ -2118,6 +2124,7 @@ bool MutateTask::prepare()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to mutate {} parts, not one. "
"This is a bug.", ctx->future_part->parts.size());
ProfileEvents::increment(ProfileEvents::MutationTotalParts);
ctx->num_mutations = std::make_unique<CurrentMetrics::Increment>(CurrentMetrics::PartMutation);
auto context_for_reading = Context::createCopy(ctx->context);
@ -2174,6 +2181,7 @@ bool MutateTask::prepare()
ctx->temporary_directory_lock = std::move(lock);
}
ProfileEvents::increment(ProfileEvents::MutationUntouchedParts);
promise.set_value(std::move(part));
return false;
}