diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 62d56dead62..0395f2470af 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -13,7 +13,6 @@ M(BackgroundMergesAndMutationsPoolTask, "Number of active merges and mutations in an associated background pool") \ M(BackgroundFetchesPoolTask, "Number of active fetches in an associated background pool") \ M(BackgroundCommonPoolTask, "Number of active tasks in an associated background pool") \ - M(BackgroundMaintPoolTask, "Number of active tasks in BackgroundProcessingPool (maint)") \ M(BackgroundMovePoolTask, "Number of active tasks in BackgroundProcessingPool for moves") \ M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \ M(BackgroundBufferFlushSchedulePoolTask, "Number of active tasks in BackgroundBufferFlushSchedulePool. This pool is used for periodic Buffer flushes") \ diff --git a/src/Common/CurrentThread.h b/src/Common/CurrentThread.h index cbe60365798..9548a927cfd 100644 --- a/src/Common/CurrentThread.h +++ b/src/Common/CurrentThread.h @@ -92,7 +92,7 @@ public: static void detachQueryIfNotDetached(); /// Initializes query with current thread as master thread in constructor, and detaches it in destructor - struct QueryScope + struct QueryScope : private boost::noncopyable { explicit QueryScope(ContextMutablePtr query_context); explicit QueryScope(ContextPtr query_context); @@ -102,6 +102,20 @@ public: bool log_peak_memory_usage_in_destructor = true; }; + class ScopedAttach : private boost::noncopyable + { + public: + explicit ScopedAttach(const ThreadGroupStatusPtr & thread_group) + { + CurrentThread::attachTo(thread_group); + } + + ~ScopedAttach() + { + CurrentThread::detachQuery(); + } + }; + private: static void defaultThreadDeleter(); diff --git a/src/Interpreters/PartLog.cpp b/src/Interpreters/PartLog.cpp index b39f7423bbe..b422f4f7f03 100644 --- a/src/Interpreters/PartLog.cpp +++ b/src/Interpreters/PartLog.cpp @@ -193,7 +193,7 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const bool PartLog::addNewPart( ContextPtr current_context, const MutableDataPartPtr & part, UInt64 elapsed_ns, const ExecutionStatus & execution_status, std::shared_ptr profile_counters_) { - return addNewParts(current_context, {part}, elapsed_ns, execution_status, profile_counters_); + return addNewParts(current_context, {part}, elapsed_ns, execution_status, std::move(profile_counters_)); } diff --git a/src/Storages/MergeTree/IExecutableTask.h b/src/Storages/MergeTree/IExecutableTask.h index d878d57dec8..9617960c182 100644 --- a/src/Storages/MergeTree/IExecutableTask.h +++ b/src/Storages/MergeTree/IExecutableTask.h @@ -29,9 +29,7 @@ class IExecutableTask { public: using TaskResultCallback = std::function; - virtual bool onResume() = 0; virtual bool executeStep() = 0; - virtual bool onSuspend() = 0; virtual void onCompleted() = 0; virtual StorageID getStorageID() = 0; virtual UInt64 getPriority() = 0; @@ -56,11 +54,6 @@ public: , job_result_callback(std::forward(job_result_callback_)) , id(id_) {} - bool onResume() override - { - return true; - } - bool executeStep() override { res = job_to_execute(); @@ -68,11 +61,6 @@ public: return false; } - bool onSuspend() override - { - return true; - } - void onCompleted() override { job_result_callback(!res); } StorageID getStorageID() override { return id; } UInt64 getPriority() override diff --git a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp index fab9b7c0f7e..abb871be372 100644 --- a/src/Storages/MergeTree/MergeFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MergeFromLogEntryTask.cpp @@ -294,7 +294,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MergeFromLogEntryTask::prepare() auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); storage.writePartLog( PartLogElement::MERGE_PARTS, execution_status, stopwatch.elapsed(), - entry.new_part_name, part, parts, merge_mutate_entry.get(), profile_counters); + entry.new_part_name, part, parts, merge_mutate_entry.get(), std::move(profile_counters)); }}; } diff --git a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp index ccd132beb4e..95bcfba937a 100644 --- a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp @@ -27,6 +27,9 @@ void MergePlainMergeTreeTask::onCompleted() bool MergePlainMergeTreeTask::executeStep() { + /// Metrics will be saved in the thread_group. + CurrentThread::ScopedAttach scoped_attach(thread_group); + /// Make out memory tracker a parent of current thread memory tracker MemoryTrackerThreadSwitcherPtr switcher; if (merge_list_entry) diff --git a/src/Storages/MergeTree/MergePlainMergeTreeTask.h b/src/Storages/MergeTree/MergePlainMergeTreeTask.h index 472518c2723..4d0cecf1e55 100644 --- a/src/Storages/MergeTree/MergePlainMergeTreeTask.h +++ b/src/Storages/MergeTree/MergePlainMergeTreeTask.h @@ -5,7 +5,6 @@ #include #include #include -#include namespace DB @@ -36,18 +35,7 @@ public: priority += item->getBytesOnDisk(); } - bool onResume() override - { - return observer.doResume(); - } - bool executeStep() override; - - bool onSuspend() override - { - return observer.doSuspend(); - } - void onCompleted() override; StorageID getStorageID() override; UInt64 getPriority() override { return priority; } @@ -59,7 +47,6 @@ public: } private: - void prepare(); void finish(); @@ -95,8 +82,8 @@ private: MergeTreeTransactionHolder txn_holder; MergeTreeTransactionPtr txn; - - TaskObserverMetrics observer; + + ThreadGroupStatusPtr thread_group = std::make_shared(); }; diff --git a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp index 028da3a3b5f..5bc3fda88bb 100644 --- a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp @@ -130,15 +130,6 @@ void MergeTreeBackgroundExecutor::routine(TaskRuntimeDataPtr item) bool need_execute_again = false; - try - { - item->task->onResume(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - try { ALLOW_ALLOCATIONS_IN_SCOPE; @@ -162,15 +153,6 @@ void MergeTreeBackgroundExecutor::routine(TaskRuntimeDataPtr item) }); } - try - { - item->task->onSuspend(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - if (need_execute_again) { std::lock_guard guard(mutex); diff --git a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h index 5c1178a1bc1..cf4d4b08c4d 100644 --- a/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h @@ -17,6 +17,7 @@ #include #include + namespace DB { namespace ErrorCodes diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 31c797556f6..07385067ede 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -7345,7 +7345,7 @@ try if (profile_counters) { - part_log_elem.profile_counters = profile_counters; + part_log_elem.profile_counters = profile_counters; } part_log->add(part_log_elem); @@ -7493,6 +7493,7 @@ bool MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr & moving_tagge moving_part.part->name, cloned_part, {moving_part.part}, + nullptr, nullptr); }; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index ffedb0ebfdc..10ba0045826 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1299,7 +1299,7 @@ protected: const DataPartPtr & result_part, const DataPartsVector & source_parts, const MergeListEntry * merge_entry, - std::shared_ptr profile_counters = nullptr); + std::shared_ptr profile_counters); /// If part is assigned to merge or mutation (possibly replicated) /// Should be overridden by children, because they can have different diff --git a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp index 322d8e78585..2e546574e42 100644 --- a/src/Storages/MergeTree/MutateFromLogEntryTask.cpp +++ b/src/Storages/MergeTree/MutateFromLogEntryTask.cpp @@ -189,7 +189,7 @@ ReplicatedMergeMutateTaskBase::PrepareResult MutateFromLogEntryTask::prepare() auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); storage.writePartLog( PartLogElement::MUTATE_PART, execution_status, stopwatch_ptr->elapsed(), - entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), profile_counters); + entry.new_part_name, new_part, future_mutated_part->parts, merge_mutate_entry.get(), std::move(profile_counters)); }}; } diff --git a/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp index d611773a5d5..4a29291b20d 100644 --- a/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MutatePlainMergeTreeTask.cpp @@ -62,8 +62,12 @@ void MutatePlainMergeTreeTask::prepare() time(nullptr), fake_query_context, merge_mutate_entry->txn, merge_mutate_entry->tagger->reserved_space, table_lock_holder); } + bool MutatePlainMergeTreeTask::executeStep() { + /// Metrics will be saved in the thread_group. + CurrentThread::ScopedAttach scoped_attach(thread_group); + /// Make out memory tracker a parent of current thread memory tracker MemoryTrackerThreadSwitcherPtr switcher; if (merge_list_entry) @@ -127,5 +131,4 @@ bool MutatePlainMergeTreeTask::executeStep() return false; } - } diff --git a/src/Storages/MergeTree/MutatePlainMergeTreeTask.h b/src/Storages/MergeTree/MutatePlainMergeTreeTask.h index 577529422de..6356595a16b 100644 --- a/src/Storages/MergeTree/MutatePlainMergeTreeTask.h +++ b/src/Storages/MergeTree/MutatePlainMergeTreeTask.h @@ -8,7 +8,7 @@ #include #include #include -#include + namespace DB { @@ -39,18 +39,7 @@ public: priority += part->getBytesOnDisk(); } - bool onSuspend() override - { - return observer.doSuspend(); - } - bool executeStep() override; - - bool onResume() override - { - return observer.doResume(); - } - void onCompleted() override; StorageID getStorageID() override; UInt64 getPriority() override { return priority; } @@ -89,7 +78,7 @@ private: ContextMutablePtr fake_query_context; MutateTaskPtr mutate_task; - TaskObserverMetrics observer; + ThreadGroupStatusPtr thread_group = std::make_shared(); }; diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 5416e731d84..295a88d5f76 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -813,13 +812,11 @@ public: StorageID getStorageID() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } UInt64 getPriority() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } - bool onSuspend() override - { - return observer.doSuspend(); - } - bool executeStep() override { + /// Metrics will be saved in the thread_group. + CurrentThread::ScopedAttach scoped_attach(thread_group); + auto & current_level_parts = level_parts[current_level]; auto & next_level_parts = level_parts[next_level]; @@ -905,11 +902,6 @@ public: return true; } - bool onResume() override - { - return observer.doResume(); - } - private: String name; MergeTreeData::MutableDataPartsVector parts; @@ -926,7 +918,7 @@ private: /// TODO(nikitamikhaylov): make this constant a setting static constexpr size_t max_parts_to_merge_in_one_level = 10; - TaskObserverMetrics observer; + ThreadGroupStatusPtr thread_group = std::make_shared(); }; @@ -937,9 +929,7 @@ private: // In short it executed a mutation for the part an original part and for its every projection -/** - * - * An overview of how the process of mutation works for projections: +/** An overview of how the process of mutation works for projections: * * The mutation for original parts is executed block by block, * but additionally we execute a SELECT query for each projection over a current block. @@ -1148,14 +1138,11 @@ public: StorageID getStorageID() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } UInt64 getPriority() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } - - bool onSuspend() override - { - return observer.doSuspend(); - } - bool executeStep() override { + /// Metrics will be saved in the thread_group. + CurrentThread::ScopedAttach scoped_attach(thread_group); + switch (state) { case State::NEED_PREPARE: @@ -1188,13 +1175,7 @@ public: return false; } - bool onResume() override - { - return observer.doResume(); - } - private: - void prepare() { if (ctx->new_data_part->isStoredOnDisk()) @@ -1277,9 +1258,10 @@ private: std::unique_ptr part_merger_writer_task; - TaskObserverMetrics observer; + ThreadGroupStatusPtr thread_group = std::make_shared(); }; + class MutateSomePartColumnsTask : public IExecutableTask { public: @@ -1289,13 +1271,11 @@ public: StorageID getStorageID() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } UInt64 getPriority() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } - bool onSuspend() override - { - return observer.doSuspend(); - } - bool executeStep() override { + /// Metrics will be saved in the thread_group. + CurrentThread::ScopedAttach scoped_attach(thread_group); + switch (state) { case State::NEED_PREPARE: @@ -1327,13 +1307,7 @@ public: return false; } - bool onResume() override - { - return observer.doResume(); - } - private: - void prepare() { if (ctx->execute_ttl_type != ExecuteTTLType::NONE) @@ -1492,7 +1466,7 @@ private: MergedColumnOnlyOutputStreamPtr out; std::unique_ptr part_merger_writer_task{nullptr}; - TaskObserverMetrics observer; + ThreadGroupStatusPtr thread_group = std::make_shared(); }; diff --git a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp index a22aab8d6ce..2ef36654d95 100644 --- a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp @@ -29,6 +29,9 @@ void ReplicatedMergeMutateTaskBase::onCompleted() bool ReplicatedMergeMutateTaskBase::executeStep() { + /// Metrics will be saved in the thread_group. + CurrentThread::ScopedAttach scoped_attach(thread_group); + std::exception_ptr saved_exception; bool retryable_error = false; @@ -83,7 +86,6 @@ bool ReplicatedMergeMutateTaskBase::executeStep() saved_exception = std::current_exception(); } - if (!retryable_error && saved_exception) { std::lock_guard lock(storage.queue.state_mutex); diff --git a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.h b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.h index 6080d488ca4..e08074cd4ff 100644 --- a/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.h +++ b/src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.h @@ -4,16 +4,16 @@ #include #include -#include + namespace DB { class StorageReplicatedMergeTree; -/** - * This is used as a base of MergeFromLogEntryTask and MutateFromLogEntryTaskBase - */ + +/** This is used as a base of MergeFromLogEntryTask and MutateFromLogEntryTaskBase + */ class ReplicatedMergeMutateTaskBase : public IExecutableTask { public: @@ -33,17 +33,12 @@ public: } ~ReplicatedMergeMutateTaskBase() override = default; + void onCompleted() override; + StorageID getStorageID() override; - bool onSuspend() override - { - return observer.doSuspend(); - } + bool executeStep() override; - bool onResume() override - { - return observer.doResume(); - } protected: using PartLogWriter = std::function; @@ -70,7 +65,6 @@ protected: StorageReplicatedMergeTree & storage; private: - enum class CheckExistingPartResult { PART_EXISTS, @@ -78,7 +72,7 @@ private: }; CheckExistingPartResult checkExistingPart(); - bool executeImpl() ; + bool executeImpl(); enum class State { @@ -92,7 +86,7 @@ private: PartLogWriter part_log_writer{}; State state{State::NEED_PREPARE}; IExecutableTask::TaskResultCallback task_result_callback; - TaskObserverMetrics observer; + ThreadGroupStatusPtr thread_group = std::make_shared(); }; } diff --git a/src/Storages/MergeTree/TaskObserverMetrics.h b/src/Storages/MergeTree/TaskObserverMetrics.h deleted file mode 100644 index b518682f0c9..00000000000 --- a/src/Storages/MergeTree/TaskObserverMetrics.h +++ /dev/null @@ -1,31 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - -class TaskObserverMetrics : public boost::noncopyable -{ -public: - TaskObserverMetrics() : thread_group(std::make_shared()) { } - ~TaskObserverMetrics() { } - - bool doResume() - { - CurrentThread::attachTo(thread_group); - return true; - } - - bool doSuspend() - { - CurrentThread::detachQueryIfNotDetached(); - return true; - } - - -private: - ThreadGroupStatusPtr thread_group; -}; - -} diff --git a/src/Storages/MergeTree/tests/gtest_executor.cpp b/src/Storages/MergeTree/tests/gtest_executor.cpp index 1ae2123ce5a..60149220643 100644 --- a/src/Storages/MergeTree/tests/gtest_executor.cpp +++ b/src/Storages/MergeTree/tests/gtest_executor.cpp @@ -24,11 +24,6 @@ public: { } - bool onSuspend() override - { - suspend_calls++ - } - bool executeStep() override { auto sleep_time = distribution(generator); @@ -41,11 +36,6 @@ public: return false; } - bool onSuspend() override - { - resume_calls++ - } - StorageID getStorageID() override { return {"test", name}; @@ -65,9 +55,7 @@ private: std::uniform_int_distribution<> distribution; String name; - size_t suspend_calls; std::function on_completed; - size_t resume_calls; }; @@ -87,8 +75,7 @@ TEST(Executor, RemoveTasks) for (size_t i = 0; i < batch; ++i) for (size_t j = 0; j < tasks_kinds; ++j) ASSERT_TRUE( - executor->trySchedule(std::make_shared(std::to_string(j))) - ); + executor->trySchedule(std::make_shared(std::to_string(j)))); std::vector threads(batch); @@ -105,9 +92,6 @@ TEST(Executor, RemoveTasks) thread.join(); ASSERT_EQ(CurrentMetrics::values[CurrentMetrics::BackgroundMergesAndMutationsPoolTask], 0); - /// TODO: move to a test by itself - ASSERT_EQ(batch*tasks_kinds, suspend_calls); - ASSERT_EQ(batch*tasks_kinds, resume_calls); executor->wait(); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 3672bd44bb2..e9958a68406 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1606,7 +1606,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) thread_status.finalizePerformanceCounters(); auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); writePartLog(PartLogElement::Type::NEW_PART, {}, 0 /** log entry is fake so we don't measure the time */, - part->name, part, {} /** log entry is fake so there are no initial parts */, nullptr, profile_counters); + part->name, part, {} /** log entry is fake so there are no initial parts */, nullptr, std::move(profile_counters)); return true; } @@ -4015,7 +4015,7 @@ bool StorageReplicatedMergeTree::fetchPart( auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); writePartLog( PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(), - part_name, part, replaced_parts, nullptr, profile_counters); + part_name, part, replaced_parts, nullptr, std::move(profile_counters)); }; DataPartPtr part_to_clone; @@ -4254,7 +4254,7 @@ MutableDataPartStoragePtr StorageReplicatedMergeTree::fetchExistsPart( auto profile_counters = std::make_shared(thread_status.performance_counters.getPartiallyAtomicSnapshot()); writePartLog( PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(), - part_name, part, replaced_parts, nullptr, profile_counters); + part_name, part, replaced_parts, nullptr, std::move(profile_counters)); }; std::function get_part;