This commit is contained in:
vdimir 2023-01-24 11:23:17 +00:00
parent 994dcb93f9
commit 07d7478bc7
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
4 changed files with 22 additions and 19 deletions

View File

@ -2,6 +2,7 @@
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/ProfileEventsScope.h>
#include <Storages/StorageReplicatedMergeTree.h>
namespace ProfileEvents
@ -267,6 +268,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
transaction_ptr = std::make_unique<MergeTreeData::Transaction>(storage, NO_TRANSACTION_RAW);
stopwatch_ptr = std::make_unique<Stopwatch>();
ProfileEventsScope profile_events_scope(&profile_counters);
merge_task = storage.merger_mutator.mergePartsToTemporaryPart(
future_merged_part,
@ -289,18 +291,18 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare()
return {true, true, [this, stopwatch = *stopwatch_ptr] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_status.performance_counters.getPartiallyAtomicSnapshot());
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
storage.writePartLog(
PartLogElement::MERGE_PARTS, execution_status, stopwatch.elapsed(),
entry.new_part_name, part, parts, merge_mutate_entry.get(), std::move(profile_counters));
entry.new_part_name, part, parts, merge_mutate_entry.get(), std::move(profile_counters_snapshot));
}};
}
bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWriter write_part_log)
{
ProfileEventsScope profile_events_scope(&profile_counters);
part = merge_task->getFuture().get();
storage.merger_mutator.renameMergedTemporaryPart(part, parts, NO_TRANSACTION_PTR, *transaction_ptr);

View File

@ -89,8 +89,6 @@ void MergePlainMergeTreeTask::prepare()
write_part_log = [this] (const ExecutionStatus & execution_status)
{
auto & thread_status = CurrentThread::get();
thread_status.finalizePerformanceCounters();
auto profile_counters_snapshot = std::make_shared<ProfileEvents::Counters::Snapshot>(profile_counters.getPartiallyAtomicSnapshot());
merge_task.reset();
storage.writePartLog(

View File

@ -1,3 +1,3 @@
1
1
1
Ok Ok Ok Ok Ok Ok
Ok Ok
Ok Ok Ok

View File

@ -7,13 +7,14 @@ SET max_block_size = 64, max_insert_block_size = 64, min_insert_block_size_rows
INSERT INTO test SELECT number AS key, sipHash64(number) AS val FROM numbers(512);
SYSTEM FLUSH LOGS;
SELECT
count(DISTINCT query_id) == 1
AND count() >= 512 / 64 -- 512 rows inserted, 64 rows per block
AND SUM(ProfileEvents['MergeTreeDataWriterRows']) == 512
AND SUM(ProfileEvents['MergeTreeDataWriterUncompressedBytes']) > 1024
AND SUM(ProfileEvents['MergeTreeDataWriterCompressedBytes']) > 1024
AND SUM(ProfileEvents['MergeTreeDataWriterBlocks']) >= 8
if(count(DISTINCT query_id) == 1, 'Ok', 'Error: ' || toString(count(DISTINCT query_id))),
if(count() == 512 / 64, 'Ok', 'Error: ' || toString(count())), -- 512 rows inserted, 64 rows per block
if(SUM(ProfileEvents['MergeTreeDataWriterRows']) == 512, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergeTreeDataWriterRows']))),
if(SUM(ProfileEvents['MergeTreeDataWriterUncompressedBytes']) >= 1024, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergeTreeDataWriterUncompressedBytes']))),
if(SUM(ProfileEvents['MergeTreeDataWriterCompressedBytes']) >= 1024, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergeTreeDataWriterCompressedBytes']))),
if(SUM(ProfileEvents['MergeTreeDataWriterBlocks']) >= 8, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergeTreeDataWriterBlocks'])))
FROM system.part_log
WHERE event_time > now() - INTERVAL 10 MINUTE
AND database == currentDatabase() AND table == 'test'
@ -24,7 +25,8 @@ OPTIMIZE TABLE test FINAL;
SYSTEM FLUSH LOGS;
SELECT
count() >= 2 AND SUM(ProfileEvents['MergedRows']) >= 512
if(count() > 2, 'Ok', 'Error: ' || toString(count())),
if(SUM(ProfileEvents['MergedRows']) >= 512, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['MergedRows'])))
FROM system.part_log
WHERE event_time > now() - INTERVAL 10 MINUTE
AND database == currentDatabase() AND table == 'test'
@ -34,10 +36,11 @@ WHERE event_time > now() - INTERVAL 10 MINUTE
ALTER TABLE test UPDATE val = 0 WHERE key % 2 == 0 SETTINGS mutations_sync = 2;
SYSTEM FLUSH LOGS;
SELECT
count() == 2
AND SUM(ProfileEvents['SelectedRows']) == 512
AND SUM(ProfileEvents['FileOpen']) > 2
if(count() == 2, 'Ok', 'Error: ' || toString(count())),
if(SUM(ProfileEvents['SelectedRows']) == 512, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['SelectedRows']))),
if(SUM(ProfileEvents['FileOpen']) > 1, 'Ok', 'Error: ' || toString(SUM(ProfileEvents['FileOpen'])))
FROM system.part_log
WHERE event_time > now() - INTERVAL 10 MINUTE
AND database == currentDatabase() AND table == 'test'