Add alters only for new mutations

This commit is contained in:
alesapin 2020-02-14 23:13:57 +03:00
parent 86b4ce2f24
commit 88f46ac636

View File

@ -150,7 +150,10 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
}
}
if (entry->type == LogEntry::ALTER_METADATA)
{
LOG_TRACE(log, "Adding alter metadata version " << entry->alter_version << " to the queue");
alter_chain.addMetadataAlter(entry->alter_version, state_lock);
}
}
@ -226,7 +229,10 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
}
if (entry->type == LogEntry::ALTER_METADATA)
{
LOG_TRACE(log, "Finishing metadata alter with version " << entry->alter_version);
alter_chain.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock);
}
}
else
{
@ -243,7 +249,6 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name)
{
//LOG_DEBUG(log, "Removing part from mutations:" << part_name);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())
@ -256,7 +261,6 @@ void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name)
{
MutationStatus & status = *it->second;
//LOG_DEBUG(log, "Removing part name:" << part_name << " from mutation:" << status.entry->znode_name << " block number :" << status.entry->block_numbers.begin()->second);
status.parts_to_do.removePartAndCoveredParts(part_name);
if (status.parts_to_do.size() == 0)
some_mutations_are_probably_done = true;
@ -693,10 +697,16 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
}
if (mutation.parts_to_do.size() == 0)
{
some_mutations_are_probably_done = true;
}
if (entry->alter_version != -1)
/// otherwise it's already done
if (entry->alter_version != -1 && entry->znode_name > mutation_pointer)
{
LOG_TRACE(log, "Adding mutation " << entry->znode_name << " with alter version " << entry->alter_version << " to the queue");
alter_chain.addMutationForAlter(entry->alter_version, state_lock);
}
}
}