This commit is contained in:
Aleksei Filatov 2024-11-20 21:03:57 +03:00 committed by GitHub
commit df258e89ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 39 additions and 22 deletions

View File

@ -11,13 +11,24 @@ namespace DB
BackgroundJobsAssignee::BackgroundJobsAssignee(MergeTreeData & data_, BackgroundJobsAssignee::Type type_, ContextPtr global_context_)
: WithContext(global_context_)
, data(data_)
, sleep_settings(global_context_->getBackgroundMoveTaskSchedulingSettings())
, rng(randomSeed())
, type(type_)
, data(data_)
, rng(randomSeed())
, sleep_settings(getSettings())
{
}
BackgroundTaskSchedulingSettings BackgroundJobsAssignee::getSettings() const
{
switch (type)
{
case Type::DataProcessing:
return getContext()->getBackgroundProcessingTaskSchedulingSettings();
case Type::Moving:
return getContext()->getBackgroundMoveTaskSchedulingSettings();
}
}
void BackgroundJobsAssignee::trigger()
{
std::lock_guard lock(holder_mutex);

View File

@ -32,23 +32,6 @@ class MergeTreeData;
class BackgroundJobsAssignee : public WithContext
{
private:
MergeTreeData & data;
/// Settings for execution control of background scheduling task
BackgroundTaskSchedulingSettings sleep_settings;
/// Useful for random backoff timeouts generation
pcg64 rng;
/// How many times execution of background job failed or we have
/// no new jobs.
size_t no_work_done_count = 0;
/// Scheduling task which assign jobs in background pool
BackgroundSchedulePool::TaskHolder holder;
/// Mutex for thread safety
std::mutex holder_mutex;
public:
/// In case of ReplicatedMergeTree the first assignee will be responsible for
/// polling the replication queue and schedule operations according to the LogEntry type
@ -81,11 +64,28 @@ public:
ContextPtr global_context_);
private:
MergeTreeData & data;
/// Useful for random backoff timeouts generation
pcg64 rng;
/// How many times execution of background job failed or we have
/// no new jobs.
size_t no_work_done_count = 0;
/// Scheduling task which assign jobs in background pool
BackgroundSchedulePool::TaskHolder holder;
/// Mutex for thread safety
std::mutex holder_mutex;
/// Settings for execution control of background scheduling task
BackgroundTaskSchedulingSettings sleep_settings;
static String toString(Type type);
/// Function that executes in background scheduling pool
void threadFunc();
BackgroundTaskSchedulingSettings getSettings() const;
};
}

View File

@ -8179,6 +8179,9 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
++idx;
}
/// Force execution of inserted log entries, because it could be delayed at BackgroundPool.
background_operations_assignee.trigger();
for (const auto & entry : entries)
waitForLogEntryToBeProcessedIfNecessary(*entry, query_context);
}
@ -8719,6 +8722,9 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
parts_holder.clear();
cleanup_thread.wakeup();
/// Force execution of inserted log entries, because it could be delayed at BackgroundPool.
background_operations_assignee.trigger();
waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context);
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.