diff --git a/src/Common/CurrentThread.cpp b/src/Common/CurrentThread.cpp index e54b2c8abe4..14c9f4418e0 100644 --- a/src/Common/CurrentThread.cpp +++ b/src/Common/CurrentThread.cpp @@ -40,6 +40,8 @@ ThreadStatus & CurrentThread::get() ProfileEvents::Counters & CurrentThread::getProfileEvents() { + if (unlikely(subthread_profile_events)) + return *subthread_profile_events; return current_thread ? current_thread->performance_counters : ProfileEvents::global_counters; } diff --git a/src/Common/CurrentThread.h b/src/Common/CurrentThread.h index 24c9dac844a..c07b34acae3 100644 --- a/src/Common/CurrentThread.h +++ b/src/Common/CurrentThread.h @@ -102,29 +102,6 @@ public: bool log_peak_memory_usage_in_destructor = true; }; - class ScopedAttach : private boost::noncopyable - { - private: - bool attached = false; - public: - explicit ScopedAttach(const ThreadGroupStatusPtr & thread_group) - { - if (!CurrentThread::getGroup()) - { - CurrentThread::attachToIfDetached(thread_group); - attached = true; - } - } - - ~ScopedAttach() - { - if (attached) - { - CurrentThread::detachQuery(); - } - } - }; - private: static void defaultThreadDeleter(); diff --git a/src/Common/ProfileEventsScope.cpp b/src/Common/ProfileEventsScope.cpp new file mode 100644 index 00000000000..6dcb5f8fcf3 --- /dev/null +++ b/src/Common/ProfileEventsScope.cpp @@ -0,0 +1,32 @@ +#include + +namespace DB +{ +extern thread_local constinit ProfileEvents::Counters * subthread_profile_events; + + +ScopedProfileEvents::ScopedProfileEvents() + : performance_counters_holder(std::make_unique()) + , performance_counters_scope(performance_counters_holder.get()) +{ + CurrentThread::get().attachProfileCountersScope(performance_counters_scope); +} + +ScopedProfileEvents::ScopedProfileEvents(ProfileEvents::Counters * performance_counters_scope_) + : performance_counters_scope(performance_counters_scope_) +{ + CurrentThread::get().attachProfileCountersScope(performance_counters_scope); +} + +std::shared_ptr ScopedProfileEvents::getSnapshot() +{ + return std::make_shared(performance_counters_scope->getPartiallyAtomicSnapshot()); +} + +ScopedProfileEvents::~ScopedProfileEvents() +{ + subthread_profile_events = nullptr; +} + + +} diff --git a/src/Common/ProfileEventsScope.h b/src/Common/ProfileEventsScope.h new file mode 100644 index 00000000000..59ba4f616d9 --- /dev/null +++ b/src/Common/ProfileEventsScope.h @@ -0,0 +1,35 @@ +#pragma once + +#include +#include + +namespace DB +{ + + +/// Use specific performance counters for current thread in the current scope. +class ScopedProfileEvents : private boost::noncopyable +{ +public: + /// Counters are owned by this object. + ScopedProfileEvents(); + + /// Shared counters are stored outisde. + /// Useful when we calculate metrics entering into some scope several times. + explicit ScopedProfileEvents(ProfileEvents::Counters * performance_counters_scope_); + + std::shared_ptr getSnapshot(); + + ~ScopedProfileEvents(); + +private: + /// If set, then performance_counters_scope is owned by this object. + /// Otherwise, counters are passed to the constructor from outside. + std::unique_ptr performance_counters_holder; + + ProfileEvents::Counters * performance_counters_scope; +}; + + +} + diff --git a/src/Common/ThreadStatus.cpp b/src/Common/ThreadStatus.cpp index 9f9a78c4036..6949942f745 100644 --- a/src/Common/ThreadStatus.cpp +++ b/src/Common/ThreadStatus.cpp @@ -25,6 +25,7 @@ namespace ErrorCodes } thread_local ThreadStatus constinit * current_thread = nullptr; +thread_local ProfileEvents::Counters constinit * subthread_profile_events = nullptr; #if !defined(SANITIZER) namespace diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 69c5732ddb6..d44cd57b812 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -107,6 +107,7 @@ using ThreadGroupStatusPtr = std::shared_ptr; * - https://github.com/ClickHouse/ClickHouse/pull/40078 */ extern thread_local constinit ThreadStatus * current_thread; +extern thread_local constinit ProfileEvents::Counters * subthread_profile_events; /** Encapsulates all per-thread info (ProfileEvents, MemoryTracker, query_id, query context, etc.). * The object must be created in thread function and destroyed in the same thread before the exit. @@ -139,6 +140,7 @@ public: Deleter deleter; protected: + /// Group of threads, to which this thread attached ThreadGroupStatusPtr thread_group; std::atomic thread_state{ThreadState::DetachedFromQuery}; @@ -244,6 +246,8 @@ public: /// Attaches slave thread to existing thread group void attachQuery(const ThreadGroupStatusPtr & thread_group_, bool check_detached = true); + void attachProfileCountersScope(ProfileEvents::Counters * performance_counters_scope); + InternalTextLogsQueuePtr getInternalTextLogsQueue() const { return thread_state == Died ? nullptr : logs_queue_ptr.lock(); diff --git a/src/Interpreters/PartLog.cpp b/src/Interpreters/PartLog.cpp index b422f4f7f03..b736fa43e0c 100644 --- a/src/Interpreters/PartLog.cpp +++ b/src/Interpreters/PartLog.cpp @@ -189,16 +189,8 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const } } - -bool PartLog::addNewPart( - ContextPtr current_context, const MutableDataPartPtr & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status, std::shared_ptr profile_counters_) -{ - return addNewParts(current_context, {part}, elapsed_ns, execution_status, std::move(profile_counters_)); -} - - bool PartLog::addNewParts( - ContextPtr current_context, const PartLog::MutableDataPartsVector & parts, UInt64 elapsed_ns, const ExecutionStatus & execution_status, std::shared_ptr profile_counters_) + ContextPtr current_context, const PartLog::PartLogEntries & parts, const ExecutionStatus & execution_status) { if (parts.empty()) return true; @@ -207,15 +199,17 @@ bool PartLog::addNewParts( try { - auto table_id = parts.front()->storage.getStorageID(); + auto table_id = parts.front().part->storage.getStorageID(); part_log = current_context->getPartLog(table_id.database_name); // assume parts belong to the same table if (!part_log) return false; auto query_id = CurrentThread::getQueryId(); - for (const auto & part : parts) + for (const auto & part_log_entry : parts) { + const auto & part = part_log_entry.part; + PartLogElement elem; if (!query_id.empty()) @@ -228,7 +222,7 @@ bool PartLog::addNewParts( const auto time_now = std::chrono::system_clock::now(); elem.event_time = timeInSeconds(time_now); elem.event_time_microseconds = timeInMicroseconds(time_now); - elem.duration_ms = elapsed_ns / 1000000; + elem.duration_ms = part_log_entry.elapsed_ns / 1000000; elem.database_name = table_id.database_name; elem.table_name = table_id.table_name; @@ -245,7 +239,7 @@ bool PartLog::addNewParts( elem.error = static_cast(execution_status.code); elem.exception = execution_status.message; - elem.profile_counters = profile_counters_; + elem.profile_counters = part_log_entry.profile_counters; part_log->add(elem); } @@ -259,4 +253,28 @@ bool PartLog::addNewParts( return true; } +bool PartLog::addNewPart(ContextPtr context, const PartLog::PartLogEntry & part, const ExecutionStatus & execution_status) +{ + return addNewParts(context, {part}, execution_status); +} + +bool PartLog::addNewParts(ContextPtr context, const PartLog::MutableDataPartsVector & parts, UInt64 elapsed_ns, + const ExecutionStatus & execution_status) +{ + PartLog::PartLogEntries part_log_entries; + part_log_entries.reserve(parts.size()); + + for (const auto & part : parts) + part_log_entries.emplace_back(part, elapsed_ns); + + return addNewParts(context, part_log_entries, execution_status); +} + +bool PartLog::addNewPart(ContextPtr context, const PartLog::MutableDataPartPtr & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status) +{ + PartLog::PartLogEntries part_log_entries; + part_log_entries.emplace_back(part, elapsed_ns); + return addNewParts(context, part_log_entries, execution_status); +} + } diff --git a/src/Interpreters/PartLog.h b/src/Interpreters/PartLog.h index 75b2539bda9..57044cba844 100644 --- a/src/Interpreters/PartLog.h +++ b/src/Interpreters/PartLog.h @@ -109,14 +109,42 @@ class PartLog : public SystemLog using MutableDataPartPtr = std::shared_ptr; using MutableDataPartsVector = std::vector; + using ProfileCountersSnapshotPtr = std::shared_ptr; + public: + struct PartLogEntry + { + std::shared_ptr part; + ProfileCountersSnapshotPtr profile_counters; + UInt64 elapsed_ns; + + PartLogEntry(std::shared_ptr part_, UInt64 elapsed_ns_) + : part(std::move(part_)), elapsed_ns(elapsed_ns_) + { + } + + PartLogEntry(std::shared_ptr part_, UInt64 elapsed_ns_, ProfileCountersSnapshotPtr profile_counters_) + : part(std::move(part_)) + , profile_counters(std::move(profile_counters_)) + , elapsed_ns(elapsed_ns_) + { + } + }; + + using PartLogEntries = std::vector; + + /// Add a record about creation of new part. + static bool addNewPart(ContextPtr context, const PartLogEntry & part, + const ExecutionStatus & execution_status = {}); + + static bool addNewParts(ContextPtr context, const PartLogEntries & parts, + const ExecutionStatus & execution_status = {}); + /// Add a record about creation of new part. static bool addNewPart(ContextPtr context, const MutableDataPartPtr & part, UInt64 elapsed_ns, - const ExecutionStatus & execution_status = {}, - std::shared_ptr profile_counters_ = {}); + const ExecutionStatus & execution_status = {}); static bool addNewParts(ContextPtr context, const MutableDataPartsVector & parts, UInt64 elapsed_ns, - const ExecutionStatus & execution_status = {}, - std::shared_ptr profile_counters_ = {}); + const ExecutionStatus & execution_status = {}); }; } diff --git a/src/Interpreters/ProfileEventsExt.cpp b/src/Interpreters/ProfileEventsExt.cpp index 0f6b52b2611..b5a49c8f381 100644 --- a/src/Interpreters/ProfileEventsExt.cpp +++ b/src/Interpreters/ProfileEventsExt.cpp @@ -19,6 +19,22 @@ std::shared_ptr TypeEnum = std::make_shared(GAUGE)}, }); +String dumpToString(const Counters::Snapshot & counters, bool nonzero_only) +{ + std::vector ss; + for (Event event = 0; event < Counters::num_counters; ++event) + { + UInt64 value = counters[event]; + + if (nonzero_only && 0 == value) + continue; + + const char * desc = getName(event); + ss.push_back(fmt::format("{}: {}", desc, value)); + } + return fmt::format("[{}]", fmt::join(ss, ", ")); +} + /// Put implementation here to avoid extra linking dependencies for clickhouse_common_io void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only) { diff --git a/src/Interpreters/ProfileEventsExt.h b/src/Interpreters/ProfileEventsExt.h index 7d9fc512d15..be88ca02593 100644 --- a/src/Interpreters/ProfileEventsExt.h +++ b/src/Interpreters/ProfileEventsExt.h @@ -21,6 +21,8 @@ struct ProfileEventsSnapshot using ThreadIdToCountersSnapshot = std::unordered_map; +String dumpToString(const Counters::Snapshot & counters, bool nonzero_only = true); + /// Dumps profile events to columns Map(String, UInt64) void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true); diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 816b03f3a0e..b4f82b2a215 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -161,6 +161,12 @@ void ThreadStatus::attachQuery(const ThreadGroupStatusPtr & thread_group_, bool setupState(thread_group_); } +void ThreadStatus::attachProfileCountersScope(ProfileEvents::Counters * performance_counters_scope) +{ + subthread_profile_events = performance_counters_scope; + performance_counters_scope->setParent(&performance_counters); +} + void ThreadStatus::initPerformanceCounters() { performance_counters_finalized = false; diff --git a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp index 95bcfba937a..5346a932fba 100644 --- a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB { @@ -28,7 +29,7 @@ void MergePlainMergeTreeTask::onCompleted() bool MergePlainMergeTreeTask::executeStep() { /// Metrics will be saved in the thread_group. - CurrentThread::ScopedAttach scoped_attach(thread_group); + ScopedProfileEvents profile_events_scope(&profile_counters); /// Make out memory tracker a parent of current thread memory tracker MemoryTrackerThreadSwitcherPtr switcher; @@ -90,7 +91,7 @@ void MergePlainMergeTreeTask::prepare() { auto & thread_status = CurrentThread::get(); thread_status.finalizePerformanceCounters(); - auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); + auto profile_counters_snapshot = std::make_shared(profile_counters.getPartiallyAtomicSnapshot()); merge_task.reset(); storage.writePartLog( PartLogElement::MERGE_PARTS, @@ -100,7 +101,7 @@ void MergePlainMergeTreeTask::prepare() new_part, future_part->parts, merge_list_entry.get(), - profile_counters); + std::move(profile_counters_snapshot)); }; merge_task = storage.merger_mutator.mergePartsToTemporaryPart( diff --git a/src/Storages/MergeTree/MergePlainMergeTreeTask.h b/src/Storages/MergeTree/MergePlainMergeTreeTask.h index 4d0cecf1e55..eb659bf38ec 100644 --- a/src/Storages/MergeTree/MergePlainMergeTreeTask.h +++ b/src/Storages/MergeTree/MergePlainMergeTreeTask.h @@ -83,7 +83,7 @@ private: MergeTreeTransactionHolder txn_holder; MergeTreeTransactionPtr txn; - ThreadGroupStatusPtr thread_group = std::make_shared(); + ProfileEvents::Counters profile_counters; }; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 07385067ede..c5160a27d48 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -7347,6 +7348,10 @@ try { part_log_elem.profile_counters = profile_counters; } + else + { + LOG_WARNING(log, "Profile counters are not set"); + } part_log->add(part_log_elem); } @@ -7483,6 +7488,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge { Stopwatch stopwatch; MutableDataPartPtr cloned_part; + ScopedProfileEvents profile_events_scope; auto write_part_log = [&](const ExecutionStatus & execution_status) { @@ -7494,7 +7500,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge cloned_part, {moving_part.part}, nullptr, - nullptr); + profile_events_scope.getSnapshot()); }; // Register in global moves list (StorageSystemMoves) diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 99f6b1855e4..be645cf27c0 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace ProfileEvents { @@ -47,6 +48,7 @@ struct MergeTreeSink::DelayedChunk MergeTreeDataWriter::TemporaryPart temp_part; UInt64 elapsed_ns; String block_dedup_token; + ProfileEvents::Counters part_counters; }; std::vector partitions; @@ -70,12 +72,18 @@ void MergeTreeSink::consume(Chunk chunk) for (auto & current_block : part_blocks) { - Stopwatch watch; - String block_dedup_token; + ProfileEvents::Counters part_counters; - auto temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); + UInt64 elapsed_ns = 0; + MergeTreeDataWriter::TemporaryPart temp_part; - UInt64 elapsed_ns = watch.elapsed(); + { + ScopedProfileEvents scoped_attach(&part_counters); + + Stopwatch watch; + temp_part = storage.writer.writeTempPart(current_block, metadata_snapshot, context); + elapsed_ns = watch.elapsed(); + } /// If optimize_on_insert setting is true, current_block could become empty after merge /// and we didn't create part. @@ -85,6 +93,7 @@ void MergeTreeSink::consume(Chunk chunk) if (!support_parallel_write && temp_part.part->getDataPartStorage().supportParallelWrite()) support_parallel_write = true; + String block_dedup_token; if (storage.getDeduplicationLog()) { const String & dedup_token = settings.insert_deduplication_token; @@ -119,7 +128,8 @@ void MergeTreeSink::consume(Chunk chunk) { .temp_part = std::move(temp_part), .elapsed_ns = elapsed_ns, - .block_dedup_token = std::move(block_dedup_token) + .block_dedup_token = std::move(block_dedup_token), + .part_counters = std::move(part_counters), }); } @@ -135,6 +145,8 @@ void MergeTreeSink::finishDelayedChunk() for (auto & partition : delayed_chunk->partitions) { + ScopedProfileEvents scoped_attach(&partition.part_counters); + partition.temp_part.finalize(); auto & part = partition.temp_part.part; @@ -168,7 +180,8 @@ void MergeTreeSink::finishDelayedChunk() /// Part can be deduplicated, so increment counters and add to part log only if it's really added if (added) { - PartLog::addNewPart(storage.getContext(), part, partition.elapsed_ns); + auto counters_snapshot = std::make_shared(partition.part_counters.getPartiallyAtomicSnapshot()); + PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, partition.elapsed_ns, counters_snapshot)); storage.incrementInsertedPartsProfileEvent(part->getType()); /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp index 2e546574e42..4428f6c2bce 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp @@ -184,12 +184,10 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare() return {true, true, [this] (const ExecutionStatus & execution_status) { - auto & thread_status = CurrentThread::get(); - thread_status.finalizePerformanceCounters(); - auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); + auto profile_counters_snapshot = std::make_shared(profile_counters.getPartiallyAtomicSnapshot()); storage.writePartLog( PartLogElement::MUTATE_PART, execution_status, stopwatch_ptr->elapsed(), - entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), std::move(profile_counters)); + entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), std::move(profile_counters_snapshot)); }}; } diff --git a/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp index 4a29291b20d..944a2d1070b 100644 --- a/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp @@ -2,6 +2,7 @@ #include #include +#include namespace DB { @@ -38,9 +39,7 @@ void MutatePlainMergeTreeTask::prepare() write_part_log = [this] (const ExecutionStatus & execution_status) { - auto & thread_status = CurrentThread::get(); - thread_status.finalizePerformanceCounters(); - auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); + auto profile_counters_snapshot = std::make_shared(profile_counters.getPartiallyAtomicSnapshot()); mutate_task.reset(); storage.writePartLog( PartLogElement::MUTATE_PART, @@ -50,7 +49,7 @@ void MutatePlainMergeTreeTask::prepare() new_part, future_part->parts, merge_list_entry.get(), - profile_counters); + std::move(profile_counters_snapshot)); }; fake_query_context = Context::createCopy(storage.getContext()); @@ -65,8 +64,8 @@ void MutatePlainMergeTreeTask::prepare() bool MutatePlainMergeTreeTask::executeStep() { - /// Metrics will be saved in the thread_group. - CurrentThread::ScopedAttach scoped_attach(thread_group); + /// Metrics will be saved in the local profile_counters. + ScopedProfileEvents profile_events_scope(&profile_counters); /// Make out memory tracker a parent of current thread memory tracker MemoryTrackerThreadSwitcherPtr switcher; diff --git a/src/Storages/MergeTree/MutatePlainMergeTreeTask.h b/src/Storages/MergeTree/MutatePlainMergeTreeTask.h index 6356595a16b..ae2ac039543 100644 --- a/src/Storages/MergeTree/MutatePlainMergeTreeTask.h +++ b/src/Storages/MergeTree/MutatePlainMergeTreeTask.h @@ -78,7 +78,7 @@ private: ContextMutablePtr fake_query_context; MutateTaskPtr mutate_task; - ThreadGroupStatusPtr thread_group = std::make_shared(); + ProfileEvents::Counters profile_counters; }; diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 295a88d5f76..a5ea5c90b95 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -20,6 +20,7 @@ #include #include #include +#include namespace CurrentMetrics @@ -814,8 +815,8 @@ public: bool executeStep() override { - /// Metrics will be saved in the thread_group. - CurrentThread::ScopedAttach scoped_attach(thread_group); + /// Metrics will be saved in the local profile_counters. + ScopedProfileEvents profile_events_scope(&profile_counters); auto & current_level_parts = level_parts[current_level]; auto & next_level_parts = level_parts[next_level]; @@ -918,7 +919,7 @@ private: /// TODO(nikitamikhaylov): make this constant a setting static constexpr size_t max_parts_to_merge_in_one_level = 10; - ThreadGroupStatusPtr thread_group = std::make_shared(); + ProfileEvents::Counters profile_counters; }; @@ -929,7 +930,9 @@ private: // In short it executed a mutation for the part an original part and for its every projection -/** An overview of how the process of mutation works for projections: +/** + * + * An overview of how the process of mutation works for projections: * * The mutation for original parts is executed block by block, * but additionally we execute a SELECT query for each projection over a current block. @@ -1140,8 +1143,8 @@ public: bool executeStep() override { - /// Metrics will be saved in the thread_group. - CurrentThread::ScopedAttach scoped_attach(thread_group); + /// Metrics will be saved in the local profile_counters. + ScopedProfileEvents profile_events_scope(&profile_counters); switch (state) { @@ -1176,6 +1179,7 @@ public: } private: + void prepare() { if (ctx->new_data_part->isStoredOnDisk()) @@ -1258,7 +1262,7 @@ private: std::unique_ptr part_merger_writer_task; - ThreadGroupStatusPtr thread_group = std::make_shared(); + ProfileEvents::Counters profile_counters; }; @@ -1273,8 +1277,8 @@ public: bool executeStep() override { - /// Metrics will be saved in the thread_group. - CurrentThread::ScopedAttach scoped_attach(thread_group); + /// Metrics will be saved in the local profile_counters. + ScopedProfileEvents profile_events_scope(&profile_counters); switch (state) { @@ -1308,6 +1312,7 @@ public: } private: + void prepare() { if (ctx->execute_ttl_type != ExecuteTTLType::NONE) @@ -1466,7 +1471,7 @@ private: MergedColumnOnlyOutputStreamPtr out; std::unique_ptr part_merger_writer_task{nullptr}; - ThreadGroupStatusPtr thread_group = std::make_shared(); + ProfileEvents::Counters profile_counters; }; diff --git a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp index 2ef36654d95..d81bbd67441 100644 --- a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp @@ -2,6 +2,7 @@ #include #include +#include namespace DB @@ -29,8 +30,8 @@ void ReplicatedMergeMutateTaskBase::onCompleted() bool ReplicatedMergeMutateTaskBase::executeStep() { - /// Metrics will be saved in the thread_group. - CurrentThread::ScopedAttach scoped_attach(thread_group); + /// Metrics will be saved in the local profile_counters. + ScopedProfileEvents profile_events_scope(&profile_counters); std::exception_ptr saved_exception; diff --git a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.h b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.h index e08074cd4ff..d9a1cbff166 100644 --- a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.h +++ b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.h @@ -11,9 +11,9 @@ namespace DB class StorageReplicatedMergeTree; - -/** This is used as a base of MergeFromLogEntryTask and MutateFromLogEntryTaskBase - */ +/** + * This is used as a base of MergeFromLogEntryTask and MutateFromLogEntryTaskBase + */ class ReplicatedMergeMutateTaskBase : public IExecutableTask { public: @@ -33,11 +33,8 @@ public: } ~ReplicatedMergeMutateTaskBase() override = default; - void onCompleted() override; - StorageID getStorageID() override; - bool executeStep() override; protected: @@ -63,6 +60,8 @@ protected: MergeList::EntryPtr merge_mutate_entry{nullptr}; Poco::Logger * log; StorageReplicatedMergeTree & storage; + /// ProfileEvents for current part will be stored here + ProfileEvents::Counters profile_counters; private: enum class CheckExistingPartResult @@ -86,7 +85,6 @@ private: PartLogWriter part_log_writer{}; State state{State::NEED_PREPARE}; IExecutableTask::TaskResultCallback task_result_callback; - ThreadGroupStatusPtr thread_group = std::make_shared(); }; } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index e9958a68406..b9c178cab5c 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2,8 +2,10 @@ #include #include -#include "Common/hex.h" + +#include #include +#include #include #include #include @@ -1592,6 +1594,8 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) if (entry.type == LogEntry::ATTACH_PART) { + ScopedProfileEvents profile_events_scope; + if (MutableDataPartPtr part = attachPartHelperFoundValidPart(entry)) { LOG_TRACE(log, "Found valid local part for {}, preparing the transaction", part->name); @@ -1602,11 +1606,9 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) renameTempPartAndReplace(part, transaction); checkPartChecksumsAndCommit(transaction, part); - auto & thread_status = CurrentThread::get(); - thread_status.finalizePerformanceCounters(); - auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */, - part->name, part, {} /** log entry is fake so there are no initial parts */, nullptr, std::move(profile_counters)); + part->name, part, {} /** log entry is fake so there are no initial parts */, nullptr, + profile_events_scope.getSnapshot()); return true; } @@ -4007,15 +4009,14 @@ bool StorageReplicatedMergeTree::fetchPart( Stopwatch stopwatch; MutableDataPartPtr part; DataPartsVector replaced_parts; + ScopedProfileEvents profile_events_scope; auto write_part_log = [&] (const ExecutionStatus & execution_status) { - auto & thread_status = CurrentThread::get(); - thread_status.finalizePerformanceCounters(); - auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); writePartLog( PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(), - part_name, part, replaced_parts, nullptr, std::move(profile_counters)); + part_name, part, replaced_parts, nullptr, + profile_events_scope.getSnapshot()); }; DataPartPtr part_to_clone; @@ -4246,15 +4247,14 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart( Stopwatch stopwatch; MutableDataPartPtr part; DataPartsVector replaced_parts; + ScopedProfileEvents profile_events_scope; auto write_part_log = [&] (const ExecutionStatus & execution_status) { - auto & thread_status = CurrentThread::get(); - thread_status.finalizePerformanceCounters(); - auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); writePartLog( PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(), - part_name, part, replaced_parts, nullptr, std::move(profile_counters)); + part_name, part, replaced_parts, nullptr, + profile_events_scope.getSnapshot()); }; std::function get_part; diff --git a/tests/queries/0_stateless/02378_part_log_profile_events.reference b/tests/queries/0_stateless/02378_part_log_profile_events.reference index d00491fd7e5..e8183f05f5d 100644 --- a/tests/queries/0_stateless/02378_part_log_profile_events.reference +++ b/tests/queries/0_stateless/02378_part_log_profile_events.reference @@ -1 +1,3 @@ 1 +1 +1 diff --git a/tests/queries/0_stateless/02378_part_log_profile_events.sql b/tests/queries/0_stateless/02378_part_log_profile_events.sql index 9ffd6aa4135..af8fe8a2669 100644 --- a/tests/queries/0_stateless/02378_part_log_profile_events.sql +++ b/tests/queries/0_stateless/02378_part_log_profile_events.sql @@ -1,13 +1,47 @@ DROP TABLE IF EXISTS test; -CREATE TABLE test (x UInt64) ENGINE = MergeTree ORDER BY x; -SET max_block_size = 1; -INSERT INTO test SELECT * FROM system.numbers LIMIT 1000; +CREATE TABLE test (key UInt64, val UInt64) engine = MergeTree Order by key PARTITION BY key >= 128; + +SET max_block_size = 64, max_insert_block_size = 64, min_insert_block_size_rows = 64; + +INSERT INTO test SELECT number AS key, sipHash64(number) AS val FROM numbers(512); + +SYSTEM FLUSH LOGS; +SELECT + count(DISTINCT query_id) == 1 + AND count() >= 512 / 64 -- 512 rows inserted, 64 rows per block + AND SUM(ProfileEvents['MergeTreeDataWriterRows']) == 512 + AND SUM(ProfileEvents['MergeTreeDataWriterUncompressedBytes']) > 1024 + AND SUM(ProfileEvents['MergeTreeDataWriterCompressedBytes']) > 1024 + AND SUM(ProfileEvents['MergeTreeDataWriterBlocks']) >= 8 +FROM system.part_log +WHERE event_time > now() - INTERVAL 10 MINUTE + AND database == currentDatabase() AND table == 'test' + AND event_type == 'NewPart' +; + OPTIMIZE TABLE test FINAL; SYSTEM FLUSH LOGS; +SELECT + count() >= 2 AND SUM(ProfileEvents['MergedRows']) >= 512 +FROM system.part_log +WHERE event_time > now() - INTERVAL 10 MINUTE + AND database == currentDatabase() AND table == 'test' + AND event_type == 'MergeParts' +; --- ProfileEvents field exist and contains something plausible: -SELECT count() > 0 FROM system.part_log WHERE ProfileEvents['MergedRows'] = 1000 AND table = 'test' AND database = currentDatabase() AND event_time >= now() - 600; +ALTER TABLE test UPDATE val = 0 WHERE key % 2 == 0 SETTINGS mutations_sync = 2; + +SYSTEM FLUSH LOGS; +SELECT + count() == 2 + AND SUM(ProfileEvents['SelectedRows']) == 512 + AND SUM(ProfileEvents['FileOpen']) > 2 +FROM system.part_log +WHERE event_time > now() - INTERVAL 10 MINUTE + AND database == currentDatabase() AND table == 'test' + AND event_type == 'MutatePart' +; DROP TABLE test;