Merge pull request #24961 from ClickHouse/more_general_check_in_queue

Trying to fix 'Tagging already tagged part'
This commit is contained in:
alesapin 2021-06-07 11:52:05 +03:00 committed by GitHub
commit 170c49db69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 5 additions and 27 deletions

View File

@ -441,10 +441,6 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
if (type == DROP_RANGE) if (type == DROP_RANGE)
return {new_part_name}; return {new_part_name};
/// CLEAR_COLUMN and CLEAR_INDEX are deprecated since 20.3
if (type == CLEAR_COLUMN || type == CLEAR_INDEX)
return {};
if (type == REPLACE_RANGE) if (type == REPLACE_RANGE)
{ {
Strings res = replace_range_entry->new_part_names; Strings res = replace_range_entry->new_part_names;

View File

@ -140,18 +140,6 @@ struct ReplicatedMergeTreeLogEntryData
/// selection of merges. These parts are added to queue.virtual_parts. /// selection of merges. These parts are added to queue.virtual_parts.
Strings getVirtualPartNames(MergeTreeDataFormatVersion format_version) const; Strings getVirtualPartNames(MergeTreeDataFormatVersion format_version) const;
/// Returns set of parts that denote the block number ranges that should be blocked during the entry execution.
/// These parts are added to future_parts.
Strings getBlockingPartNames(MergeTreeDataFormatVersion format_version) const
{
Strings res = getVirtualPartNames(format_version);
if (type == CLEAR_COLUMN)
res.emplace_back(new_part_name);
return res;
}
/// Returns fake part for drop range (for DROP_RANGE and REPLACE_RANGE) /// Returns fake part for drop range (for DROP_RANGE and REPLACE_RANGE)
std::optional<String> getDropRange(MergeTreeDataFormatVersion format_version) const; std::optional<String> getDropRange(MergeTreeDataFormatVersion format_version) const;

View File

@ -1024,16 +1024,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{ {
/// If our entry produce part which is already covered by /// If our entry produce part which is already covered by
/// some other entry which is currently executing, then we can postpone this entry. /// some other entry which is currently executing, then we can postpone this entry.
if (entry.type == LogEntry::MERGE_PARTS for (const String & new_part_name : entry.getVirtualPartNames(format_version))
|| entry.type == LogEntry::GET_PART
|| entry.type == LogEntry::ATTACH_PART
|| entry.type == LogEntry::MUTATE_PART)
{ {
for (const String & new_part_name : entry.getBlockingPartNames(format_version)) if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
{ return false;
if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
return false;
}
} }
/// Check that fetches pool is not overloaded /// Check that fetches pool is not overloaded
@ -1247,7 +1241,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const Replicate
++entry->num_tries; ++entry->num_tries;
entry->last_attempt_time = time(nullptr); entry->last_attempt_time = time(nullptr);
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version)) for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
{ {
if (!queue.future_parts.emplace(new_part_name, entry).second) if (!queue.future_parts.emplace(new_part_name, entry).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. " throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. "
@ -1288,7 +1282,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
entry->currently_executing = false; entry->currently_executing = false;
entry->execution_complete.notify_all(); entry->execution_complete.notify_all();
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version)) for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
{ {
if (!queue.future_parts.erase(new_part_name)) if (!queue.future_parts.erase(new_part_name))
{ {