allow parallel execution of disjoint drop ranges

This commit is contained in:
Alexander Tokmakov 2022-08-15 20:32:03 +02:00
parent a0693c3a84
commit b3f3b60ac3
4 changed files with 29 additions and 15 deletions

View File

@ -1024,7 +1024,7 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
[[maybe_unused]] bool called_from_alter_query_directly = covering_entry && covering_entry->replace_range_entry
&& covering_entry->replace_range_entry->columns_version < 0;
[[maybe_unused]] bool called_for_broken_part = !covering_entry;
assert(currently_executing_drop_or_replace_range || called_from_alter_query_directly || called_for_broken_part);
assert(currently_executing_drop_replace_ranges.contains(part_info) || called_from_alter_query_directly || called_for_broken_part);
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
@ -1368,14 +1368,24 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
/// If such part producing operations are currently executing, then DROP/REPLACE RANGE wait them to finish.
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
if (currently_executing_drop_or_replace_range)
if (auto drop_range = entry.getDropRange(format_version))
{
out_postpone_reason = fmt::format(
"Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry are currently executing.",
entry.znode_name, entry.typeToString(), entry.new_part_name);
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
return false;
auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, format_version);
for (const auto & info : currently_executing_drop_replace_ranges)
{
if (drop_range_info.isDisjoint(info))
continue;
out_postpone_reason = fmt::format(
"Not executing log entry {} of type {} for part {} "
"because another DROP_RANGE or REPLACE_RANGE entry with not disjoint range {} is currently executing.",
entry.znode_name,
entry.typeToString(),
entry.new_part_name,
info.getPartName());
LOG_TRACE(log, fmt::runtime(out_postpone_reason));
return false;
}
}
if (entry.isDropPart(format_version))
@ -1442,10 +1452,11 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(
const ReplicatedMergeTreeQueue::LogEntryPtr & entry_, ReplicatedMergeTreeQueue & queue_, std::unique_lock<std::mutex> & /* state_lock */)
: entry(entry_), queue(queue_)
{
if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE)
if (auto drop_range = entry->getDropRange(queue.format_version))
{
assert(!queue.currently_executing_drop_or_replace_range);
queue.currently_executing_drop_or_replace_range = true;
auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, queue.format_version);
[[maybe_unused]] bool inserted = queue.currently_executing_drop_replace_ranges.emplace(drop_range_info).second;
assert(inserted);
}
entry->currently_executing = true;
++entry->num_tries;
@ -1497,10 +1508,11 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
{
std::lock_guard lock(queue.state_mutex);
if (entry->type == ReplicatedMergeTreeLogEntry::DROP_RANGE || entry->type == ReplicatedMergeTreeLogEntry::REPLACE_RANGE)
if (auto drop_range = entry->getDropRange(queue.format_version))
{
assert(queue.currently_executing_drop_or_replace_range);
queue.currently_executing_drop_or_replace_range = false;
auto drop_range_info = MergeTreePartInfo::fromPartName(*drop_range, queue.format_version);
[[maybe_unused]] bool removed = queue.currently_executing_drop_replace_ranges.erase(drop_range_info);
assert(removed);
}
entry->currently_executing = false;
entry->execution_complete.notify_all();

View File

@ -96,7 +96,7 @@ private:
FuturePartsSet future_parts;
/// Avoid parallel execution of queue enties, which may remove other entries from the queue.
bool currently_executing_drop_or_replace_range = false;
std::set<MergeTreePartInfo> currently_executing_drop_replace_ranges;
/** What will be the set of active parts after executing all log entries up to log_pointer.
* Used to determine which merges can be assigned (see ReplicatedMergeTreeMergePredicate)

View File

@ -61,6 +61,7 @@ function thread6()
done
}
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f thread1;
export -f thread2;

View File

@ -9,6 +9,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=./replication.lib
. "$CURDIR"/replication.lib
declare -A engines
engines[0]="MergeTree"
engines[1]="ReplicatedMergeTree('/test/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{shard}/src', '{replica}_' || toString(randConstant()))"