From 9dd9c227e511b014ad2dad55d941d639ea756249 Mon Sep 17 00:00:00 2001 From: Aleksei Filatov Date: Mon, 18 Nov 2024 08:57:15 +0000 Subject: [PATCH 1/2] Trigger queue extry processing after ALTER TABLE REPLACE/MOVE PARTITION --- src/Storages/StorageReplicatedMergeTree.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 793fd02c656..19ae72feb97 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -8180,6 +8180,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom( ++idx; } + /// Force execution of inserted log entries, because it could be delayed at BackgroundPool. + background_operations_assignee.trigger(); + for (const auto & entry : entries) waitForLogEntryToBeProcessedIfNecessary(*entry, query_context); } @@ -8720,6 +8723,9 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta parts_holder.clear(); cleanup_thread.wakeup(); + /// Force execution of inserted log entries, because it could be delayed at BackgroundPool. + background_operations_assignee.trigger(); + waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context); /// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper. From f7b6c6d3438e8e777f022786de5cd72ed814fb3d Mon Sep 17 00:00:00 2001 From: Aleksei Filatov Date: Mon, 18 Nov 2024 09:16:21 +0000 Subject: [PATCH 2/2] Retrieve correct settings for BackgroundJobsAssignee --- .../MergeTree/BackgroundJobsAssignee.cpp | 17 +++++++-- .../MergeTree/BackgroundJobsAssignee.h | 38 +++++++++---------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/src/Storages/MergeTree/BackgroundJobsAssignee.cpp b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp index 0a69bf1109f..5d76afb5f71 100644 --- a/src/Storages/MergeTree/BackgroundJobsAssignee.cpp +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.cpp @@ -11,13 +11,24 @@ namespace DB BackgroundJobsAssignee::BackgroundJobsAssignee(MergeTreeData & data_, BackgroundJobsAssignee::Type type_, ContextPtr global_context_) : WithContext(global_context_) - , data(data_) - , sleep_settings(global_context_->getBackgroundMoveTaskSchedulingSettings()) - , rng(randomSeed()) , type(type_) + , data(data_) + , rng(randomSeed()) + , sleep_settings(getSettings()) { } +BackgroundTaskSchedulingSettings BackgroundJobsAssignee::getSettings() const +{ + switch (type) + { + case Type::DataProcessing: + return getContext()->getBackgroundProcessingTaskSchedulingSettings(); + case Type::Moving: + return getContext()->getBackgroundMoveTaskSchedulingSettings(); + } +} + void BackgroundJobsAssignee::trigger() { std::lock_guard lock(holder_mutex); diff --git a/src/Storages/MergeTree/BackgroundJobsAssignee.h b/src/Storages/MergeTree/BackgroundJobsAssignee.h index 80ddead3e56..3d43de0097d 100644 --- a/src/Storages/MergeTree/BackgroundJobsAssignee.h +++ b/src/Storages/MergeTree/BackgroundJobsAssignee.h @@ -32,23 +32,6 @@ class MergeTreeData; class BackgroundJobsAssignee : public WithContext { -private: - MergeTreeData & data; - - /// Settings for execution control of background scheduling task - BackgroundTaskSchedulingSettings sleep_settings; - /// Useful for random backoff timeouts generation - pcg64 rng; - - /// How many times execution of background job failed or we have - /// no new jobs. - size_t no_work_done_count = 0; - - /// Scheduling task which assign jobs in background pool - BackgroundSchedulePool::TaskHolder holder; - /// Mutex for thread safety - std::mutex holder_mutex; - public: /// In case of ReplicatedMergeTree the first assignee will be responsible for /// polling the replication queue and schedule operations according to the LogEntry type @@ -81,11 +64,28 @@ public: ContextPtr global_context_); private: + MergeTreeData & data; + + /// Useful for random backoff timeouts generation + pcg64 rng; + + /// How many times execution of background job failed or we have + /// no new jobs. + size_t no_work_done_count = 0; + + /// Scheduling task which assign jobs in background pool + BackgroundSchedulePool::TaskHolder holder; + /// Mutex for thread safety + std::mutex holder_mutex; + + /// Settings for execution control of background scheduling task + BackgroundTaskSchedulingSettings sleep_settings; + static String toString(Type type); /// Function that executes in background scheduling pool void threadFunc(); + + BackgroundTaskSchedulingSettings getSettings() const; }; - - }