diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index b7e59041839..f2b6ed10a39 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -13,7 +13,6 @@ #include #include -#include #include /** Very simple thread pool similar to boost::threadpool. diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 2c7bf9e6eca..a75f552ab05 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -77,7 +77,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp similarity index 79% rename from src/Storages/MergeTree/BackgroundJobsExecutor.cpp rename to src/Storages/MergeTree/BackgroundJobsAssignee.cpp index a8d2f52926c..598c43f2153 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include #include @@ -8,7 +8,7 @@ namespace DB { -BackgroundJobAssignee::BackgroundJobAssignee(MergeTreeData & data_, BackgroundJobAssignee::Type type_, ContextPtr global_context_) +BackgroundJobsAssignee::BackgroundJobsAssignee(MergeTreeData & data_, BackgroundJobsAssignee::Type type_, ContextPtr global_context_) : WithContext(global_context_) , data(data_) , sleep_settings(global_context_->getBackgroundMoveTaskSchedulingSettings()) @@ -17,7 +17,7 @@ BackgroundJobAssignee::BackgroundJobAssignee(MergeTreeData & data_, BackgroundJo { } -void BackgroundJobAssignee::trigger() +void BackgroundJobsAssignee::trigger() { std::lock_guard lock(holder_mutex); @@ -29,7 +29,7 @@ void BackgroundJobAssignee::trigger() holder->schedule(); } -void BackgroundJobAssignee::postpone() +void BackgroundJobsAssignee::postpone() { std::lock_guard lock(holder_mutex); @@ -48,28 +48,28 @@ void BackgroundJobAssignee::postpone() } -void BackgroundJobAssignee::scheduleMergeMutateTask(ExecutableTaskPtr merge_task) +void BackgroundJobsAssignee::scheduleMergeMutateTask(ExecutableTaskPtr merge_task) { bool res = getContext()->getMergeMutateExecutor()->trySchedule(merge_task); res ? trigger() : postpone(); } -void BackgroundJobAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task) +void BackgroundJobsAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task) { bool res = getContext()->getFetchesExecutor()->trySchedule(fetch_task); res ? trigger() : postpone(); } -void BackgroundJobAssignee::scheduleMoveTask(ExecutableTaskPtr move_task) +void BackgroundJobsAssignee::scheduleMoveTask(ExecutableTaskPtr move_task) { bool res = getContext()->getMovesExecutor()->trySchedule(move_task); res ? trigger() : postpone(); } -String BackgroundJobAssignee::toString(Type type) +String BackgroundJobsAssignee::toString(Type type) { switch (type) { @@ -80,16 +80,16 @@ String BackgroundJobAssignee::toString(Type type) } } -void BackgroundJobAssignee::start() +void BackgroundJobsAssignee::start() { std::lock_guard lock(holder_mutex); if (!holder) - holder = getContext()->getSchedulePool().createTask("BackgroundJobAssignee:" + toString(type), [this]{ main(); }); + holder = getContext()->getSchedulePool().createTask("BackgroundJobsAssignee:" + toString(type), [this]{ main(); }); holder->activateAndSchedule(); } -void BackgroundJobAssignee::finish() +void BackgroundJobsAssignee::finish() { /// No lock here, because scheduled tasks could call trigger method if (holder) @@ -105,7 +105,7 @@ void BackgroundJobAssignee::finish() } -void BackgroundJobAssignee::main() +void BackgroundJobsAssignee::main() try { bool succeed = false; @@ -128,7 +128,7 @@ catch (...) /// Catch any exception to avoid thread termination. postpone(); } -BackgroundJobAssignee::~BackgroundJobAssignee() +BackgroundJobsAssignee::~BackgroundJobsAssignee() { try { diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.h b/src/Storages/MergeTree/BackgroundJobsAssignee.h similarity index 92% rename from src/Storages/MergeTree/BackgroundJobsExecutor.h rename to src/Storages/MergeTree/BackgroundJobsAssignee.h index 82bd6882b96..6d2778adea2 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.h +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include #include @@ -29,7 +29,7 @@ struct ExecutableTaskSchedulingSettings class MergeTreeData; -class BackgroundJobAssignee : protected WithContext +class BackgroundJobsAssignee : protected WithContext { private: MergeTreeData & data; @@ -66,9 +66,9 @@ public: void scheduleMoveTask(ExecutableTaskPtr move_task); /// Just call finish - virtual ~BackgroundJobAssignee(); + virtual ~BackgroundJobsAssignee(); - BackgroundJobAssignee( + BackgroundJobsAssignee( MergeTreeData & data_, Type type, ContextPtr global_context_); diff --git a/src/Storages/MergeTree/MergeMutateExecutor.cpp b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp similarity index 96% rename from src/Storages/MergeTree/MergeMutateExecutor.cpp rename to src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp index cbed2aa9f69..f43bbce2ebd 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp @@ -1,7 +1,7 @@ -#include +#include #include -#include +#include namespace DB @@ -57,7 +57,7 @@ void MergeTreeBackgroundExecutor::wait() if (scheduler.joinable()) scheduler.join(); - pool.wait(); + /// ThreadPool will be finalized in destructor. } @@ -115,7 +115,7 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id /// Erase storage related tasks from pending and select active tasks to wait for auto it = std::remove_if(pending.begin(), pending.end(), - [&] (auto item) -> bool { return item->task->getStorageID() == id; } ); + [&] (auto item) -> bool { return item->task->getStorageID() == id; }); pending.erase(it, pending.end()); /// Copy items to wait for their completion @@ -179,7 +179,7 @@ void MergeTreeBackgroundExecutor::routine(ItemPtr item) /// But it is rather safe, because we have try...catch block here, and another one in ThreadPool. item->task->onCompleted(); } - catch(...) + catch (...) { std::lock_guard guard(mutex); erase_from_active(); diff --git a/src/Storages/MergeTree/MergeMutateExecutor.h b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h similarity index 97% rename from src/Storages/MergeTree/MergeMutateExecutor.h rename to src/Storages/MergeTree/MergeTreeBackgroundExecutor.h index 67edf282862..bdeeecb7592 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.h +++ b/src/Storages/MergeTree/MergeTreeBackgroundExecutor.h @@ -35,7 +35,7 @@ namespace DB * * Due to all caveats I described above we use boost::circular_buffer as a container for queues. * - * Another nuisance that we faces with is than backgroud operations always interacts with an associated Storage. + * Another nuisance that we faces with is than background operations always interact with an associated Storage. * So, when a Storage want to shutdown, it must wait until all its background operaions are finished. */ class MergeTreeBackgroundExecutor : public shared_ptr_helper diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index a57c27c33ef..6592b9a043d 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -200,8 +200,8 @@ MergeTreeData::MergeTreeData( , data_parts_by_info(data_parts_indexes.get()) , data_parts_by_state_and_info(data_parts_indexes.get()) , parts_mover(this) - , background_executor(*this, BackgroundJobAssignee::Type::DataProcessing, getContext()) - , background_moves_executor(*this, BackgroundJobAssignee::Type::Moving, getContext()) + , background_executor(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext()) + , background_moves_executor(*this, BackgroundJobsAssignee::Type::Moving, getContext()) { const auto settings = getSettings(); allow_nullable_key = attach || settings->allow_nullable_key; @@ -5029,7 +5029,7 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger() } } -bool MergeTreeData::scheduleDataMovingJob(BackgroundJobAssignee & executor) +bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & executor) { if (parts_mover.moves_blocker.isCancelled()) return false; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index bc0b12c3e46..c3b9eaf1b59 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -3,7 +3,7 @@ #include #include #include -#include +#include #include #include #include @@ -827,9 +827,9 @@ public: PinnedPartUUIDsPtr getPinnedPartUUIDs() const; /// Schedules background job to like merge/mutate/fetch an executor - virtual bool scheduleDataProcessingJob(BackgroundJobAssignee & executor) = 0; + virtual bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) = 0; /// Schedules job to move parts between disks/volumes and so on. - bool scheduleDataMovingJob(BackgroundJobAssignee & executor); + bool scheduleDataMovingJob(BackgroundJobsAssignee & executor); bool areBackgroundMovesNeeded() const; /// Lock part in zookeeper for shared data in several nodes @@ -925,8 +925,8 @@ protected: /// Executors are common for both ReplicatedMergeTree and plain MergeTree /// but they are being started and finished in derived classes, so let them be protected. - BackgroundJobAssignee background_executor; - BackgroundJobAssignee background_moves_executor; + BackgroundJobsAssignee background_executor; + BackgroundJobsAssignee background_moves_executor; /// Every task that is finished will ask to assign a new one into an executor. std::function common_assignee_trigger; diff --git a/src/Storages/MergeTree/tests/gtest_executor.cpp b/src/Storages/MergeTree/tests/gtest_executor.cpp index ece859fbb0f..1377d9add9a 100644 --- a/src/Storages/MergeTree/tests/gtest_executor.cpp +++ b/src/Storages/MergeTree/tests/gtest_executor.cpp @@ -6,7 +6,7 @@ #include #include -#include +#include using namespace DB; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 92b8be63826..46bc319a288 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1041,7 +1041,7 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn return true; } -bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobAssignee & executor) //-V657 +bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & executor) //-V657 { if (shutdown_called) return false; diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index c05e4cf44ab..09d362b6eef 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -95,7 +95,7 @@ public: RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context) override; - bool scheduleDataProcessingJob(BackgroundJobAssignee & executor) override; + bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override; MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index c49868d6230..942eb0e561c 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -17,7 +17,7 @@ #include #include #include -#include +#include #include #include #include @@ -3173,7 +3173,7 @@ bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::Sel }); } -bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobAssignee & executor) +bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & executor) { /// If replication queue is stopped exit immediately as we successfully executed the task if (queue.actions_blocker.isCancelled()) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index cf7fe5169fa..ddb14378f90 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -29,7 +29,7 @@ #include #include #include -#include +#include namespace DB @@ -218,7 +218,7 @@ public: const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger); /// Schedules job to execute in background pool (merge, mutate, drop range and so on) - bool scheduleDataProcessingJob(BackgroundJobAssignee & executor) override; + bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override; /// Checks that fetches are not disabled with action blocker and pool for fetches /// is not overloaded diff --git a/src/Storages/ya.make b/src/Storages/ya.make index 0daea968af5..1cbb611e8d3 100644 --- a/src/Storages/ya.make +++ b/src/Storages/ya.make @@ -27,7 +27,7 @@ SRCS( MemorySettings.cpp MergeTree/ActiveDataPartSet.cpp MergeTree/AllMergeSelector.cpp - MergeTree/BackgroundJobsExecutor.cpp + MergeTree/BackgroundJobsAssignee.cpp MergeTree/BoolMask.cpp MergeTree/DataPartsExchange.cpp MergeTree/DropPartsRanges.cpp @@ -40,6 +40,7 @@ SRCS( MergeTree/LevelMergeSelector.cpp MergeTree/MergeAlgorithm.cpp MergeTree/MergeList.cpp + MergeTree/MergeTreeBackgroundExecutor.cpp MergeTree/MergeTreeBaseSelectProcessor.cpp MergeTree/MergeTreeBlockReadUtils.cpp MergeTree/MergeTreeData.cpp