mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Trying to fix 'Tagging already tagged part'
This commit is contained in:
parent
7ca31261b0
commit
1e69128443
@ -441,10 +441,6 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
|
|||||||
if (type == DROP_RANGE)
|
if (type == DROP_RANGE)
|
||||||
return {new_part_name};
|
return {new_part_name};
|
||||||
|
|
||||||
/// CLEAR_COLUMN and CLEAR_INDEX are deprecated since 20.3
|
|
||||||
if (type == CLEAR_COLUMN || type == CLEAR_INDEX)
|
|
||||||
return {};
|
|
||||||
|
|
||||||
if (type == REPLACE_RANGE)
|
if (type == REPLACE_RANGE)
|
||||||
{
|
{
|
||||||
Strings res = replace_range_entry->new_part_names;
|
Strings res = replace_range_entry->new_part_names;
|
||||||
|
@ -140,18 +140,6 @@ struct ReplicatedMergeTreeLogEntryData
|
|||||||
/// selection of merges. These parts are added to queue.virtual_parts.
|
/// selection of merges. These parts are added to queue.virtual_parts.
|
||||||
Strings getVirtualPartNames(MergeTreeDataFormatVersion format_version) const;
|
Strings getVirtualPartNames(MergeTreeDataFormatVersion format_version) const;
|
||||||
|
|
||||||
/// Returns set of parts that denote the block number ranges that should be blocked during the entry execution.
|
|
||||||
/// These parts are added to future_parts.
|
|
||||||
Strings getBlockingPartNames(MergeTreeDataFormatVersion format_version) const
|
|
||||||
{
|
|
||||||
Strings res = getVirtualPartNames(format_version);
|
|
||||||
|
|
||||||
if (type == CLEAR_COLUMN)
|
|
||||||
res.emplace_back(new_part_name);
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns fake part for drop range (for DROP_RANGE and REPLACE_RANGE)
|
/// Returns fake part for drop range (for DROP_RANGE and REPLACE_RANGE)
|
||||||
std::optional<String> getDropRange(MergeTreeDataFormatVersion format_version) const;
|
std::optional<String> getDropRange(MergeTreeDataFormatVersion format_version) const;
|
||||||
|
|
||||||
|
@ -1024,16 +1024,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
|||||||
{
|
{
|
||||||
/// If our entry produce part which is already covered by
|
/// If our entry produce part which is already covered by
|
||||||
/// some other entry which is currently executing, then we can postpone this entry.
|
/// some other entry which is currently executing, then we can postpone this entry.
|
||||||
if (entry.type == LogEntry::MERGE_PARTS
|
for (const String & new_part_name : entry.getVirtualPartNames(format_version))
|
||||||
|| entry.type == LogEntry::GET_PART
|
|
||||||
|| entry.type == LogEntry::ATTACH_PART
|
|
||||||
|| entry.type == LogEntry::MUTATE_PART)
|
|
||||||
{
|
{
|
||||||
for (const String & new_part_name : entry.getBlockingPartNames(format_version))
|
if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
|
||||||
{
|
return false;
|
||||||
if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check that fetches pool is not overloaded
|
/// Check that fetches pool is not overloaded
|
||||||
@ -1247,7 +1241,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const Replicate
|
|||||||
++entry->num_tries;
|
++entry->num_tries;
|
||||||
entry->last_attempt_time = time(nullptr);
|
entry->last_attempt_time = time(nullptr);
|
||||||
|
|
||||||
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
|
for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
|
||||||
{
|
{
|
||||||
if (!queue.future_parts.emplace(new_part_name, entry).second)
|
if (!queue.future_parts.emplace(new_part_name, entry).second)
|
||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. "
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. "
|
||||||
@ -1288,7 +1282,7 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
|
|||||||
entry->currently_executing = false;
|
entry->currently_executing = false;
|
||||||
entry->execution_complete.notify_all();
|
entry->execution_complete.notify_all();
|
||||||
|
|
||||||
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
|
for (const String & new_part_name : entry->getVirtualPartNames(queue.format_version))
|
||||||
{
|
{
|
||||||
if (!queue.future_parts.erase(new_part_name))
|
if (!queue.future_parts.erase(new_part_name))
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user