better accounting of time for merge of projections

This commit is contained in:
Anton Popov 2024-08-12 12:23:32 +00:00
parent 2024d6b976
commit 3172bf8d76
4 changed files with 25 additions and 11 deletions

View File

@ -1282,7 +1282,7 @@ MergeJoinTransform::MergeJoinTransform(
void MergeJoinTransform::onFinish() void MergeJoinTransform::onFinish()
{ {
algorithm.logElapsed(merging_elapsed_ns / 1000000000ULL); algorithm.logElapsed(static_cast<double>(merging_elapsed_ns) / 1000000000ULL);
} }
} }

View File

@ -461,8 +461,12 @@ void MergeTask::addGatheringColumn(GlobalRuntimeContextPtr global_ctx, const Str
MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::getContextForNextStage() MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::getContextForNextStage()
{ {
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL); /// Do not increment for projection stage because time is already accounted in main task.
ProfileEvents::increment(ProfileEvents::MergeHorizontalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL); if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
ProfileEvents::increment(ProfileEvents::MergeHorizontalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
}
auto new_ctx = std::make_shared<VerticalMergeRuntimeContext>(); auto new_ctx = std::make_shared<VerticalMergeRuntimeContext>();
@ -481,8 +485,12 @@ MergeTask::StageRuntimeContextPtr MergeTask::ExecuteAndFinalizeHorizontalPart::g
MergeTask::StageRuntimeContextPtr MergeTask::VerticalMergeStage::getContextForNextStage() MergeTask::StageRuntimeContextPtr MergeTask::VerticalMergeStage::getContextForNextStage()
{ {
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL); /// Do not increment for projection stage because time is already accounted in main task.
ProfileEvents::increment(ProfileEvents::MergeVerticalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL); if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(ProfileEvents::MergeExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
ProfileEvents::increment(ProfileEvents::MergeVerticalStageExecuteMilliseconds, ctx->elapsed_execute_ns / 1000000UL);
}
auto new_ctx = std::make_shared<MergeProjectionsRuntimeContext>(); auto new_ctx = std::make_shared<MergeProjectionsRuntimeContext>();
new_ctx->need_sync = std::move(ctx->need_sync); new_ctx->need_sync = std::move(ctx->need_sync);
@ -1026,8 +1034,12 @@ bool MergeTask::execute()
UInt64 stage_elapsed_ms = current_elapsed_ms - global_ctx->prev_elapsed_ms; UInt64 stage_elapsed_ms = current_elapsed_ms - global_ctx->prev_elapsed_ms;
global_ctx->prev_elapsed_ms = current_elapsed_ms; global_ctx->prev_elapsed_ms = current_elapsed_ms;
ProfileEvents::increment(current_stage->getTotalTimeProfileEvent(), stage_elapsed_ms); /// Do not increment for projection stage because time is already accounted in main task.
ProfileEvents::increment(ProfileEvents::MergeTotalMilliseconds, stage_elapsed_ms); if (global_ctx->parent_part == nullptr)
{
ProfileEvents::increment(current_stage->getTotalTimeProfileEvent(), stage_elapsed_ms);
ProfileEvents::increment(ProfileEvents::MergeTotalMilliseconds, stage_elapsed_ms);
}
auto next_stage_context = current_stage->getContextForNextStage(); auto next_stage_context = current_stage->getContextForNextStage();

View File

@ -1,3 +1,3 @@
Horizontal 1 20000 3 0 480000 1 1 1 1 Horizontal 1 20000 3 0 480000 1 1 1 1
Vertical 1 20000 1 2 480000 1 1 1 1 1 1 Vertical 1 20000 1 2 480000 1 1 1 1 1 1
Vertical 2 20020 4 2 480660 1 1 1 1 1 1 1 1 Vertical 2 400000 2 6 12800000 1 1 1 1 1 1 1 1 1 1

View File

@ -58,12 +58,12 @@ DROP TABLE IF EXISTS t_merge_profile_events_2;
DROP TABLE IF EXISTS t_merge_profile_events_3; DROP TABLE IF EXISTS t_merge_profile_events_3;
CREATE TABLE t_merge_profile_events_3 (id UInt64, v1 UInt64, v2 UInt64, PROJECTION p (SELECT sum(v1), sum(v2) GROUP BY id % 10)) CREATE TABLE t_merge_profile_events_3 (id UInt64, v1 UInt64, v2 UInt64, PROJECTION p (SELECT v2, v2 * v2, v2 * 2, v2 * 10, v1 ORDER BY v1))
ENGINE = MergeTree ORDER BY id ENGINE = MergeTree ORDER BY id
SETTINGS min_bytes_for_wide_part = 0, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1; SETTINGS min_bytes_for_wide_part = 0, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 1;
INSERT INTO t_merge_profile_events_3 SELECT number, number, number FROM numbers(10000); INSERT INTO t_merge_profile_events_3 SELECT number, number, number FROM numbers(100000);
INSERT INTO t_merge_profile_events_3 SELECT number, number, number FROM numbers(10000); INSERT INTO t_merge_profile_events_3 SELECT number, number, number FROM numbers(100000);
OPTIMIZE TABLE t_merge_profile_events_3 FINAL; OPTIMIZE TABLE t_merge_profile_events_3 FINAL;
SYSTEM FLUSH LOGS; SYSTEM FLUSH LOGS;
@ -83,6 +83,8 @@ SELECT
ProfileEvents['MergeVerticalStageExecuteMilliseconds'] > 0, ProfileEvents['MergeVerticalStageExecuteMilliseconds'] > 0,
ProfileEvents['MergeProjectionStageTotalMilliseconds'] > 0, ProfileEvents['MergeProjectionStageTotalMilliseconds'] > 0,
ProfileEvents['MergeProjectionStageExecuteMilliseconds'] > 0, ProfileEvents['MergeProjectionStageExecuteMilliseconds'] > 0,
ProfileEvents['MergeExecuteMilliseconds'] <= duration_ms,
ProfileEvents['MergeTotalMilliseconds'] <= duration_ms
FROM system.part_log WHERE database = currentDatabase() AND table = 't_merge_profile_events_3' AND event_type = 'MergeParts' AND part_name = 'all_1_2_1'; FROM system.part_log WHERE database = currentDatabase() AND table = 't_merge_profile_events_3' AND event_type = 'MergeParts' AND part_name = 'all_1_2_1';
DROP TABLE IF EXISTS t_merge_profile_events_3; DROP TABLE IF EXISTS t_merge_profile_events_3;