diff --git a/src/Storages/MergeTree/BackgroundJobsAssignee.cpp b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp index 0a69bf1109f..5d76afb5f71 100644 --- a/src/Storages/MergeTree/BackgroundJobsAssignee.cpp +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp @@ -11,13 +11,24 @@ namespace DB BackgroundJobsAssignee::BackgroundJobsAssignee(MergeTreeData & data_, BackgroundJobsAssignee::Type type_, ContextPtr global_context_) : WithContext(global_context_) - , data(data_) - , sleep_settings(global_context_->getBackgroundMoveTaskSchedulingSettings()) - , rng(randomSeed()) , type(type_) + , data(data_) + , rng(randomSeed()) + , sleep_settings(getSettings()) { } +BackgroundTaskSchedulingSettings BackgroundJobsAssignee::getSettings() const +{ + switch (type) + { + case Type::DataProcessing: + return getContext()->getBackgroundProcessingTaskSchedulingSettings(); + case Type::Moving: + return getContext()->getBackgroundMoveTaskSchedulingSettings(); + } +} + void BackgroundJobsAssignee::trigger() { std::lock_guard lock(holder_mutex); diff --git a/src/Storages/MergeTree/BackgroundJobsAssignee.h b/src/Storages/MergeTree/BackgroundJobsAssignee.h index 80ddead3e56..3d43de0097d 100644 --- a/src/Storages/MergeTree/BackgroundJobsAssignee.h +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.h @@ -32,23 +32,6 @@ class MergeTreeData; class BackgroundJobsAssignee : public WithContext { -private: - MergeTreeData & data; - - /// Settings for execution control of background scheduling task - BackgroundTaskSchedulingSettings sleep_settings; - /// Useful for random backoff timeouts generation - pcg64 rng; - - /// How many times execution of background job failed or we have - /// no new jobs. - size_t no_work_done_count = 0; - - /// Scheduling task which assign jobs in background pool - BackgroundSchedulePool::TaskHolder holder; - /// Mutex for thread safety - std::mutex holder_mutex; - public: /// In case of ReplicatedMergeTree the first assignee will be responsible for /// polling the replication queue and schedule operations according to the LogEntry type @@ -81,11 +64,28 @@ public: ContextPtr global_context_); private: + MergeTreeData & data; + + /// Useful for random backoff timeouts generation + pcg64 rng; + + /// How many times execution of background job failed or we have + /// no new jobs. + size_t no_work_done_count = 0; + + /// Scheduling task which assign jobs in background pool + BackgroundSchedulePool::TaskHolder holder; + /// Mutex for thread safety + std::mutex holder_mutex; + + /// Settings for execution control of background scheduling task + BackgroundTaskSchedulingSettings sleep_settings; + static String toString(Type type); /// Function that executes in background scheduling pool void threadFunc(); + + BackgroundTaskSchedulingSettings getSettings() const; }; - - } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 264644ffd28..8509cb7bb4a 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8179,6 +8179,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom( ++idx; } + /// Force execution of inserted log entries, because it could be delayed at BackgroundPool. + background_operations_assignee.trigger(); + for (const auto & entry : entries) waitForLogEntryToBeProcessedIfNecessary(*entry, query_context); } @@ -8719,6 +8722,9 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta parts_holder.clear(); cleanup_thread.wakeup(); + /// Force execution of inserted log entries, because it could be delayed at BackgroundPool. + background_operations_assignee.trigger(); + waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context); /// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.