Merge pull request #40246 from ClickHouse/parallel_drop_ranges_execution

Allow parallel execution of disjoint drop ranges
This commit is contained in:
Alexander Tokmakov 2022-08-18 21:17:41 +03:00 committed by GitHub
commit e311d06eaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 54 additions and 34 deletions

View File

@ -73,6 +73,11 @@ bool BackgroundSchedulePoolTaskInfo::activateAndSchedule()
return true; return true;
} }
std::unique_lock<std::mutex> BackgroundSchedulePoolTaskInfo::getExecLock()
{
return std::unique_lock{exec_mutex};
}
void BackgroundSchedulePoolTaskInfo::execute() void BackgroundSchedulePoolTaskInfo::execute()
{ {
Stopwatch watch; Stopwatch watch;

View File

@ -121,6 +121,10 @@ public:
/// get Coordination::WatchCallback needed for notifications from ZooKeeper watches. /// get Coordination::WatchCallback needed for notifications from ZooKeeper watches.
Coordination::WatchCallback getWatchCallback(); Coordination::WatchCallback getWatchCallback();
/// Returns lock that protects from concurrent task execution.
/// This lock should not be held for a long time.
std::unique_lock<std::mutex> getExecLock();
private: private:
friend class TaskNotification; friend class TaskNotification;
friend class BackgroundSchedulePool; friend class BackgroundSchedulePool;

View File

@ -70,9 +70,8 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t
void ReplicatedMergeTreePartCheckThread::cancelRemovedPartsCheck(const MergeTreePartInfo & drop_range_info) void ReplicatedMergeTreePartCheckThread::cancelRemovedPartsCheck(const MergeTreePartInfo & drop_range_info)
{ {
/// Wait for running tasks to finish and temporarily stop checking /// Wait for running tasks to finish and temporarily stop checking
stop(); auto pause_checking_parts = task->getExecLock();
SCOPE_EXIT({ start(); });
{
std::lock_guard lock(parts_mutex); std::lock_guard lock(parts_mutex);
for (auto it = parts_queue.begin(); it != parts_queue.end();) for (auto it = parts_queue.begin(); it != parts_queue.end();)
{ {
@ -88,7 +87,6 @@ void ReplicatedMergeTreePartCheckThread::cancelRemovedPartsCheck(const MergeTree
++it; ++it;
} }
} }
}
} }
size_t ReplicatedMergeTreePartCheckThread::size() const size_t ReplicatedMergeTreePartCheckThread::size() const

View File

@ -1024,7 +1024,7 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
[[maybe_unused]] bool called_from_alter_query_directly = covering_entry && covering_entry->replace_range_entry [[maybe_unused]] bool called_from_alter_query_directly = covering_entry && covering_entry->replace_range_entry
&& covering_entry->replace_range_entry->columns_version < 0; && covering_entry->replace_range_entry->columns_version < 0;
[[maybe_unused]] bool called_for_broken_part = !covering_entry; [[maybe_unused]] bool called_for_broken_part = !covering_entry;
assert(currently_executing_drop_or_replace_range || called_from_alter_query_directly || called_for_broken_part); assert(currently_executing_drop_replace_ranges.contains(part_info) || called_from_alter_query_directly || called_for_broken_part);
for (Queue::iterator it = queue.begin(); it != queue.end();) for (Queue::iterator it = queue.begin(); it != queue.end();)
{ {
@ -1367,16 +1367,27 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
/// DROP_RANGE and REPLACE_RANGE entries remove other entries, which produce parts in the range. /// DROP_RANGE and REPLACE_RANGE entries remove other entries, which produce parts in the range.
/// If such part producing operations are currently executing, then DROP/REPLACE RANGE wait them to finish. /// If such part producing operations are currently executing, then DROP/REPLACE RANGE wait them to finish.
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other. /// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
/// But it should not happen if ranges are disjoint.
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting. /// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
if (currently_executing_drop_or_replace_range)
if (auto drop_range = entry.getDropRange(format_version))
{ {
auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, format_version);
for (const auto & info : currently_executing_drop_replace_ranges)
{
if (drop_range_info.isDisjoint(info))
continue;
out_postpone_reason = fmt::format( out_postpone_reason = fmt::format(
"Not executing log entry {} of type {} for part {} " "Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry are currently executing.", "because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.",
entry.znode_name, entry.typeToString(), entry.new_part_name); entry.znode_name,
entry.typeToString(),
entry.new_part_name,
info.getPartName());
LOG_TRACE(log, fmt::runtime(out_postpone_reason)); LOG_TRACE(log, fmt::runtime(out_postpone_reason));
return false; return false;
} }
}
if (entry.isDropPart(format_version)) if (entry.isDropPart(format_version))
{ {
@ -1442,10 +1453,11 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(
const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_, std::unique_lock<std::mutex> & /* state_lock */) const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_, std::unique_lock<std::mutex> & /* state_lock */)
: entry(entry_), queue(queue_) : entry(entry_), queue(queue_)
{ {
if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE) if (auto drop_range = entry->getDropRange(queue.format_version))
{ {
assert(!queue.currently_executing_drop_or_replace_range); auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, queue.format_version);
queue.currently_executing_drop_or_replace_range = true; [[maybe_unused]] bool inserted = queue.currently_executing_drop_replace_ranges.emplace(drop_range_info).second;
assert(inserted);
} }
entry->currently_executing = true; entry->currently_executing = true;
++entry->num_tries; ++entry->num_tries;
@ -1497,10 +1509,11 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
{ {
std::lock_guard lock(queue.state_mutex); std::lock_guard lock(queue.state_mutex);
if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE) if (auto drop_range = entry->getDropRange(queue.format_version))
{ {
assert(queue.currently_executing_drop_or_replace_range); auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, queue.format_version);
queue.currently_executing_drop_or_replace_range = false; [[maybe_unused]] bool removed = queue.currently_executing_drop_replace_ranges.erase(drop_range_info);
assert(removed);
} }
entry->currently_executing = false; entry->currently_executing = false;
entry->execution_complete.notify_all(); entry->execution_complete.notify_all();

View File

@ -96,7 +96,7 @@ private:
FuturePartsSet future_parts; FuturePartsSet future_parts;
/// Avoid parallel execution of queue enties, which may remove other entries from the queue. /// Avoid parallel execution of queue enties, which may remove other entries from the queue.
bool currently_executing_drop_or_replace_range = false; std::set<MergeTreePartInfo> currently_executing_drop_replace_ranges;
/** What will be the set of active parts after executing all log entries up to log_pointer. /** What will be the set of active parts after executing all log entries up to log_pointer.
* Used to determine which merges can be assigned (see ReplicatedMergeTreeMergePredicate) * Used to determine which merges can be assigned (see ReplicatedMergeTreeMergePredicate)

View File

@ -151,13 +151,13 @@ bool ReplicatedMergeTreeRestartingThread::runImpl()
setNotReadonly(); setNotReadonly();
/// Start queue processing /// Start queue processing
storage.part_check_thread.start();
storage.background_operations_assignee.start(); storage.background_operations_assignee.start();
storage.queue_updating_task->activateAndSchedule(); storage.queue_updating_task->activateAndSchedule();
storage.mutations_updating_task->activateAndSchedule(); storage.mutations_updating_task->activateAndSchedule();
storage.mutations_finalizing_task->activateAndSchedule(); storage.mutations_finalizing_task->activateAndSchedule();
storage.merge_selecting_task->activateAndSchedule(); storage.merge_selecting_task->activateAndSchedule();
storage.cleanup_thread.start(); storage.cleanup_thread.start();
storage.part_check_thread.start();
return true; return true;
} }
@ -356,6 +356,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shut
storage.mutations_finalizing_task->deactivate(); storage.mutations_finalizing_task->deactivate();
storage.cleanup_thread.stop(); storage.cleanup_thread.stop();
storage.part_check_thread.stop();
/// Stop queue processing /// Stop queue processing
{ {
@ -365,9 +366,6 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown(bool part_of_full_shut
storage.background_operations_assignee.finish(); storage.background_operations_assignee.finish();
} }
/// Stop part_check_thread after queue processing, because some queue tasks may restart part_check_thread
storage.part_check_thread.stop();
LOG_TRACE(log, "Threads finished"); LOG_TRACE(log, "Threads finished");
} }

View File

@ -61,6 +61,7 @@ function thread6()
done done
} }
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout # https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f thread1; export -f thread1;
export -f thread2; export -f thread2;

View File

@ -9,6 +9,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=./replication.lib # shellcheck source=./replication.lib
. "$CURDIR"/replication.lib . "$CURDIR"/replication.lib
declare -A engines declare -A engines
engines[0]="MergeTree" engines[0]="MergeTree"
engines[1]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}/src', '{replica}_' || toString(randConstant()))" engines[1]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}/src', '{replica}_' || toString(randConstant()))"