diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index abb871be372..0d8dc7f68a6 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -2,6 +2,7 @@ #include #include +#include #include namespace ProfileEvents @@ -267,6 +268,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare() transaction_ptr = std::make_unique(storage, NO_TRANSACTION_RAW); stopwatch_ptr = std::make_unique(); + ProfileEventsScope profile_events_scope(&profile_counters); merge_task = storage.merger_mutator.mergePartsToTemporaryPart( future_merged_part, @@ -289,18 +291,18 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare() return {true, true, [this, stopwatch = *stopwatch_ptr] (const ExecutionStatus & execution_status) { - auto & thread_status = CurrentThread::get(); - thread_status.finalizePerformanceCounters(); - auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); + auto profile_counters_snapshot = std::make_shared(profile_counters.getPartiallyAtomicSnapshot()); storage.writePartLog( PartLogElement::MERGE_PARTS, execution_status, stopwatch.elapsed(), - entry.new_part_name, part, parts, merge_mutate_entry.get(), std::move(profile_counters)); + entry.new_part_name, part, parts, merge_mutate_entry.get(), std::move(profile_counters_snapshot)); }}; } bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log) { + ProfileEventsScope profile_events_scope(&profile_counters); + part = merge_task->getFuture().get(); storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, *transaction_ptr); diff --git a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp index de88a1984e7..855b93dc90e 100644 --- a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp @@ -89,8 +89,6 @@ void MergePlainMergeTreeTask::prepare() write_part_log = [this] (const ExecutionStatus & execution_status) { - auto & thread_status = CurrentThread::get(); - thread_status.finalizePerformanceCounters(); auto profile_counters_snapshot = std::make_shared(profile_counters.getPartiallyAtomicSnapshot()); merge_task.reset(); storage.writePartLog( diff --git a/tests/queries/0_stateless/02378_part_log_profile_events.reference b/tests/queries/0_stateless/02378_part_log_profile_events.reference index e8183f05f5d..c09e6c997c5 100644 --- a/tests/queries/0_stateless/02378_part_log_profile_events.reference +++ b/tests/queries/0_stateless/02378_part_log_profile_events.reference @@ -1,3 +1,3 @@ -1 -1 -1 +Ok Ok Ok Ok Ok Ok +Ok Ok +Ok Ok Ok diff --git a/tests/queries/0_stateless/02378_part_log_profile_events.sql b/tests/queries/0_stateless/02378_part_log_profile_events.sql index af8fe8a2669..af68360dbbb 100644 --- a/tests/queries/0_stateless/02378_part_log_profile_events.sql +++ b/tests/queries/0_stateless/02378_part_log_profile_events.sql @@ -7,13 +7,14 @@ SET max_block_size = 64, max_insert_block_size = 64, min_insert_block_size_rows INSERT INTO test SELECT number AS key, sipHash64(number) AS val FROM numbers(512); SYSTEM FLUSH LOGS; + SELECT - count(DISTINCT query_id) == 1 - AND count() >= 512 / 64 -- 512 rows inserted, 64 rows per block - AND SUM(ProfileEvents['MergeTreeDataWriterRows']) == 512 - AND SUM(ProfileEvents['MergeTreeDataWriterUncompressedBytes']) > 1024 - AND SUM(ProfileEvents['MergeTreeDataWriterCompressedBytes']) > 1024 - AND SUM(ProfileEvents['MergeTreeDataWriterBlocks']) >= 8 + if(count(DISTINCT query_id) == 1, 'Ok', 'Error: ' || toString(count(DISTINCT query_id))), + if(count() == 512 / 64, 'Ok', 'Error: ' || toString(count())), -- 512 rows inserted, 64 rows per block + if(SUM(ProfileEvents['MergeTreeDataWriterRows']) == 512, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergeTreeDataWriterRows']))), + if(SUM(ProfileEvents['MergeTreeDataWriterUncompressedBytes']) >= 1024, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergeTreeDataWriterUncompressedBytes']))), + if(SUM(ProfileEvents['MergeTreeDataWriterCompressedBytes']) >= 1024, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergeTreeDataWriterCompressedBytes']))), + if(SUM(ProfileEvents['MergeTreeDataWriterBlocks']) >= 8, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergeTreeDataWriterBlocks']))) FROM system.part_log WHERE event_time > now() - INTERVAL 10 MINUTE AND database == currentDatabase() AND table == 'test' @@ -24,7 +25,8 @@ OPTIMIZE TABLE test FINAL; SYSTEM FLUSH LOGS; SELECT - count() >= 2 AND SUM(ProfileEvents['MergedRows']) >= 512 + if(count() > 2, 'Ok', 'Error: ' || toString(count())), + if(SUM(ProfileEvents['MergedRows']) >= 512, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergedRows']))) FROM system.part_log WHERE event_time > now() - INTERVAL 10 MINUTE AND database == currentDatabase() AND table == 'test' @@ -34,10 +36,11 @@ WHERE event_time > now() - INTERVAL 10 MINUTE ALTER TABLE test UPDATE val = 0 WHERE key % 2 == 0 SETTINGS mutations_sync = 2; SYSTEM FLUSH LOGS; + SELECT - count() == 2 - AND SUM(ProfileEvents['SelectedRows']) == 512 - AND SUM(ProfileEvents['FileOpen']) > 2 + if(count() == 2, 'Ok', 'Error: ' || toString(count())), + if(SUM(ProfileEvents['SelectedRows']) == 512, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['SelectedRows']))), + if(SUM(ProfileEvents['FileOpen']) > 1, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['FileOpen']))) FROM system.part_log WHERE event_time > now() - INTERVAL 10 MINUTE AND database == currentDatabase() AND table == 'test'