diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index e7614a66761..e98a8d59155 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -18,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -331,8 +330,6 @@ struct ContextShared ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections. InterserverIOHandler interserver_io_handler; /// Handler for interserver communication. std::optional buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables. - std::optional background_pool; /// The thread pool for the background work performed by the tables. - std::optional background_move_pool; /// The thread pool for the background moves performed by the tables. std::optional schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) std::optional distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends) std::optional message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used in kafka streaming) @@ -433,8 +430,6 @@ struct ContextShared external_dictionaries_loader.reset(); external_models_loader.reset(); buffer_flush_schedule_pool.reset(); - background_pool.reset(); - background_move_pool.reset(); schedule_pool.reset(); distributed_schedule_pool.reset(); ddl_worker.reset(); @@ -1369,45 +1364,6 @@ void Context::dropCaches() const shared->mark_cache->reset(); } -BackgroundProcessingPool & Context::getBackgroundPool() -{ - auto lock = getLock(); - if (!shared->background_pool) - { - BackgroundProcessingPool::PoolSettings pool_settings; - const auto & config = getConfigRef(); - pool_settings.thread_sleep_seconds = config.getDouble("background_processing_pool_thread_sleep_seconds", 10); - pool_settings.thread_sleep_seconds_random_part = config.getDouble("background_processing_pool_thread_sleep_seconds_random_part", 1.0); - pool_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1); - pool_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_min", 10); - pool_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_max", 600); - pool_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1); - pool_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0); - shared->background_pool.emplace(settings.background_pool_size, pool_settings); - } - return *shared->background_pool; -} - -BackgroundProcessingPool & Context::getBackgroundMovePool() -{ - auto lock = getLock(); - if (!shared->background_move_pool) - { - BackgroundProcessingPool::PoolSettings pool_settings; - const auto & config = getConfigRef(); - pool_settings.thread_sleep_seconds = config.getDouble("background_move_processing_pool_thread_sleep_seconds", 10); - pool_settings.thread_sleep_seconds_random_part = config.getDouble("background_move_processing_pool_thread_sleep_seconds_random_part", 1.0); - pool_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_move_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1); - pool_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_min", 10); - pool_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_max", 600); - pool_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1); - pool_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0); - pool_settings.tasks_metric = CurrentMetrics::BackgroundMovePoolTask; - shared->background_move_pool.emplace(settings.background_move_pool_size, pool_settings, "BackgroundMovePool", "BgMoveProcPool"); - } - return *shared->background_move_pool; -} - BackgroundSchedulePool & Context::getBufferFlushSchedulePool() { auto lock = getLock(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index bd5e17fe2e4..155d8fbcd73 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -62,7 +62,6 @@ class EmbeddedDictionaries; class ExternalDictionariesLoader; class ExternalModelsLoader; class InterserverIOHandler; -class BackgroundProcessingPool; class BackgroundSchedulePool; class MergeList; class Cluster; @@ -508,8 +507,6 @@ public: void dropCaches() const; BackgroundSchedulePool & getBufferFlushSchedulePool(); - BackgroundProcessingPool & getBackgroundPool(); - BackgroundProcessingPool & getBackgroundMovePool(); BackgroundSchedulePool & getSchedulePool(); BackgroundSchedulePool & getMessageBrokerSchedulePool(); BackgroundSchedulePool & getDistributedSchedulePool(); diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.h b/src/Storages/MergeTree/BackgroundJobsExecutor.h index bf1fd0c5a03..b7bd63f7169 100644 --- a/src/Storages/MergeTree/BackgroundJobsExecutor.h +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.h @@ -3,11 +3,38 @@ #include #include #include -#include #include +namespace CurrentMetrics +{ + extern const Metric BackgroundPoolTask; +} + namespace DB { +enum PoolType +{ + MERGE_MUTATING, + MOVING, + +} + +struct PoolSettings +{ + double thread_sleep_seconds = 10; + double thread_sleep_seconds_random_part = 1.0; + double thread_sleep_seconds_if_nothing_to_do = 0.1; + + /// For exponential backoff. + double task_sleep_seconds_when_no_work_min = 10; + double task_sleep_seconds_when_no_work_max = 600; + double task_sleep_seconds_when_no_work_multiplier = 1.1; + double task_sleep_seconds_when_no_work_random_part = 1.0; + + CurrentMetrics::Metric tasks_metric = CurrentMetrics::BackgroundPoolTask; + + PoolSettings() noexcept {} +}; class BackgroundJobsExecutor { @@ -19,7 +46,7 @@ private: ThreadPool move_pool; std::atomic errors_count{0}; pcg64 rng; - BackgroundProcessingPool::PoolSettings settings; + PoolSettings settings; BackgroundSchedulePool::TaskHolder data_processing_task; BackgroundSchedulePool::TaskHolder data_moving_task; diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 5a1ce42d831..87dfbd4d879 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -762,23 +762,6 @@ bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & p return currently_merging_mutating_parts.count(part); } - -BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask() -{ - try - { - if (!selectPartsAndMove()) - return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; - - return BackgroundProcessingPoolTaskResult::SUCCESS; - } - catch (...) - { - tryLogCurrentException(log); - return BackgroundProcessingPoolTaskResult::ERROR; - } -} - std::optional StorageMergeTree::selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String */* disable_reason */) { auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 0c2b633fa70..b1946e48d4f 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -131,8 +130,6 @@ private: ActionLock stopMergesAndWait(); - BackgroundProcessingPoolTaskResult movePartsTask(); - /// Allocate block number for new mutation, write mutation to disk /// and into in-memory structures. Wake up merge-mutation task. Int64 startMutation(const MutationCommands & commands, String & mutation_file_name); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 1d1b72018e6..453956ffa8b 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2626,23 +2626,6 @@ bool StorageReplicatedMergeTree::partIsAssignedToBackgroundOperation(const DataP return queue.isVirtualPart(part); } -BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::movePartsTask() -{ - try - { - if (!selectPartsAndMove()) - return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO; - - return BackgroundProcessingPoolTaskResult::SUCCESS; - } - catch (...) - { - tryLogCurrentException(log); - return BackgroundProcessingPoolTaskResult::ERROR; - } -} - - void StorageReplicatedMergeTree::mergeSelectingTask() { if (!is_leader) diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 6c30d966afd..4ce3c0ad3c1 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -16,7 +16,6 @@ #include #include #include -#include #include #include #include @@ -284,10 +283,6 @@ private: BackgroundSchedulePool::TaskHolder mutations_updating_task; - /// A task which move parts to another disks/volumes - /// Transparent for replication. - BackgroundProcessingPool::TaskHandle move_parts_task_handle; - /// A task that selects parts to merge. BackgroundSchedulePool::TaskHolder merge_selecting_task; /// It is acquired for each iteration of the selection of parts to merge or each OPTIMIZE query. @@ -423,10 +418,6 @@ private: bool processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntry & entry); - /// Perform moves of parts to another disks. - /// Local operation, doesn't interact with replicationg queue. - BackgroundProcessingPoolTaskResult movePartsTask(); - /// Postcondition: /// either leader_election is fully initialized (node in ZK is created and the watching thread is launched) /// or an exception is thrown and leader_election is destroyed.