This commit is contained in:
Aleksei Filatov 2024-11-20 19:50:34 -08:00 committed by GitHub
commit 55b5c082bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 39 additions and 22 deletions

View File

@ -11,13 +11,24 @@ namespace DB
BackgroundJobsAssignee::BackgroundJobsAssignee(MergeTreeData & data_, BackgroundJobsAssignee::Type type_, ContextPtr global_context_) BackgroundJobsAssignee::BackgroundJobsAssignee(MergeTreeData & data_, BackgroundJobsAssignee::Type type_, ContextPtr global_context_)
: WithContext(global_context_) : WithContext(global_context_)
, data(data_)
, sleep_settings(global_context_->getBackgroundMoveTaskSchedulingSettings())
, rng(randomSeed())
, type(type_) , type(type_)
, data(data_)
, rng(randomSeed())
, sleep_settings(getSettings())
{ {
} }
BackgroundTaskSchedulingSettings BackgroundJobsAssignee::getSettings() const
{
switch (type)
{
case Type::DataProcessing:
return getContext()->getBackgroundProcessingTaskSchedulingSettings();
case Type::Moving:
return getContext()->getBackgroundMoveTaskSchedulingSettings();
}
}
void BackgroundJobsAssignee::trigger() void BackgroundJobsAssignee::trigger()
{ {
std::lock_guard lock(holder_mutex); std::lock_guard lock(holder_mutex);

View File

@ -32,23 +32,6 @@ class MergeTreeData;
class BackgroundJobsAssignee : public WithContext class BackgroundJobsAssignee : public WithContext
{ {
private:
MergeTreeData & data;
/// Settings for execution control of background scheduling task
BackgroundTaskSchedulingSettings sleep_settings;
/// Useful for random backoff timeouts generation
pcg64 rng;
/// How many times execution of background job failed or we have
/// no new jobs.
size_t no_work_done_count = 0;
/// Scheduling task which assign jobs in background pool
BackgroundSchedulePool::TaskHolder holder;
/// Mutex for thread safety
std::mutex holder_mutex;
public: public:
/// In case of ReplicatedMergeTree the first assignee will be responsible for /// In case of ReplicatedMergeTree the first assignee will be responsible for
/// polling the replication queue and schedule operations according to the LogEntry type /// polling the replication queue and schedule operations according to the LogEntry type
@ -81,11 +64,28 @@ public:
ContextPtr global_context_); ContextPtr global_context_);
private: private:
MergeTreeData & data;
/// Useful for random backoff timeouts generation
pcg64 rng;
/// How many times execution of background job failed or we have
/// no new jobs.
size_t no_work_done_count = 0;
/// Scheduling task which assign jobs in background pool
BackgroundSchedulePool::TaskHolder holder;
/// Mutex for thread safety
std::mutex holder_mutex;
/// Settings for execution control of background scheduling task
BackgroundTaskSchedulingSettings sleep_settings;
static String toString(Type type); static String toString(Type type);
/// Function that executes in background scheduling pool /// Function that executes in background scheduling pool
void threadFunc(); void threadFunc();
BackgroundTaskSchedulingSettings getSettings() const;
}; };
} }

View File

@ -8179,6 +8179,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
++idx; ++idx;
} }
/// Force execution of inserted log entries, because it could be delayed at BackgroundPool.
background_operations_assignee.trigger();
for (const auto & entry : entries) for (const auto & entry : entries)
waitForLogEntryToBeProcessedIfNecessary(*entry, query_context); waitForLogEntryToBeProcessedIfNecessary(*entry, query_context);
} }
@ -8719,6 +8722,9 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
parts_holder.clear(); parts_holder.clear();
cleanup_thread.wakeup(); cleanup_thread.wakeup();
/// Force execution of inserted log entries, because it could be delayed at BackgroundPool.
background_operations_assignee.trigger();
waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context); waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context);
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper. /// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.