fix rename

This commit is contained in:
Anton Popov 2024-07-08 13:25:51 +00:00
parent 9382349238
commit 7e790d8084
6 changed files with 69 additions and 35 deletions

View File

@ -8668,33 +8668,59 @@ void MergeTreeData::verifySortingKey(const KeyDescription & sorting_key)
}
}
static void updateAlterConversionsCounter(Int64 & num_alter_conversions, const MutationCommands & commands, Int64 increment)
static void updateMutationsCounters(
Int64 & data_mutations_to_apply,
Int64 & metadata_mutations_to_apply,
const MutationCommands & commands,
Int64 increment)
{
if (num_alter_conversions < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly data alter conversions counter is negative ({})", num_alter_conversions);
if (data_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly data mutations counter is negative ({})", data_mutations_to_apply);
if (metadata_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly metadata mutations counter is negative ({})", metadata_mutations_to_apply);
bool has_data_mutation = false;
bool has_metadata_mutation = false;
for (const auto & command : commands)
{
if (AlterConversions::isSupportedDataMutation(command.type) || AlterConversions::isSupportedMetadataMutation(command.type))
if (!has_data_mutation && AlterConversions::isSupportedDataMutation(command.type))
{
num_alter_conversions += increment;
data_mutations_to_apply += increment;
has_data_mutation = true;
if (num_alter_conversions < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly data mutations counter is negative ({})", num_alter_conversions);
if (data_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly data mutations counter is negative ({})", data_mutations_to_apply);
}
return;
if (!has_metadata_mutation && AlterConversions::isSupportedMetadataMutation(command.type))
{
metadata_mutations_to_apply += increment;
has_metadata_mutation = true;
if (metadata_mutations_to_apply < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly metadata mutations counter is negative ({})", metadata_mutations_to_apply);
}
}
}
void incrementAlterConversionsCounter(Int64 & num_alter_conversions, const MutationCommands & commands, std::lock_guard<std::mutex> & /*lock*/)
void incrementMutationsCounters(
Int64 & data_mutations_to_apply,
Int64 & metadata_mutations_to_apply,
const MutationCommands & commands,
std::lock_guard<std::mutex> & /*lock*/)
{
updateAlterConversionsCounter(num_alter_conversions, commands, 1);
return updateMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, commands, 1);
}
void decrementAlterConversionsCounter(Int64 & num_alter_conversions, const MutationCommands & commands, std::lock_guard<std::mutex> & /*lock*/)
void decrementMutationsCounters(
Int64 & data_mutations_to_apply,
Int64 & metadata_mutations_to_apply,
const MutationCommands & commands,
std::lock_guard<std::mutex> & /*lock*/)
{
updateAlterConversionsCounter(num_alter_conversions, commands, -1);
return updateMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, commands, -1);
}
}

View File

@ -453,9 +453,6 @@ public:
Int64 metadata_version = -1;
Int64 min_part_metadata_version = -1;
bool need_data_mutations = false;
bool needAnyMutations() const { return need_data_mutations || needMetadataMutations(); }
bool needMetadataMutations() const { return min_part_metadata_version < metadata_version; }
};
Params params;
@ -1776,7 +1773,16 @@ struct CurrentlySubmergingEmergingTagger
/// Look at MutationCommands if it contains mutations for AlterConversions, update the counter.
/// Return true if the counter had been updated
void incrementAlterConversionsCounter(Int64 & num_alter_conversions, const MutationCommands & commands, std::lock_guard<std::mutex> & lock);
void decrementAlterConversionsCounter(Int64 & num_alter_conversions, const MutationCommands & commands, std::lock_guard<std::mutex> & lock);
void incrementMutationsCounters(
Int64 & data_mutations_to_apply,
Int64 & metadata_mutations_to_apply,
const MutationCommands & commands,
std::lock_guard<std::mutex> & lock);
void decrementMutationsCounters(
Int64 & data_mutations_to_apply,
Int64 & metadata_mutations_to_apply,
const MutationCommands & commands,
std::lock_guard<std::mutex> & lock);
}

View File

@ -951,7 +951,7 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
{
const auto commands = entry.commands;
it = mutations_by_znode.erase(it);
decrementAlterConversionsCounter(num_alter_conversions, commands, state_lock);
decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, commands, state_lock);
}
else
it = mutations_by_znode.erase(it);
@ -1001,7 +1001,7 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations)
{
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version)).first->second;
incrementAlterConversionsCounter(num_alter_conversions, entry->commands, lock);
incrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, entry->commands, lock);
NOEXCEPT_SCOPE({
for (const auto & pair : entry->block_numbers)
@ -1076,7 +1076,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
}
mutations_by_znode.erase(it);
/// decrementAlterConversionsCounter() will be called in updateMutations()
/// decrementMutationsCounters() will be called in updateMutations()
LOG_DEBUG(log, "Removed mutation {} from local state.", entry->znode_name);
}
@ -1913,7 +1913,7 @@ MutationCommands ReplicatedMergeTreeQueue::MutationsSnapshot::getAlterMutationCo
MutationCommands result;
bool seen_all_data_mutations = !params.need_data_mutations;
bool seen_all_metadata_mutations = !params.needMetadataMutations();
bool seen_all_metadata_mutations = part_metadata_version >= params.metadata_version;
if (seen_all_data_mutations && seen_all_metadata_mutations)
return {};
@ -1968,8 +1968,8 @@ MergeTreeData::MutationsSnapshotPtr ReplicatedMergeTreeQueue::getMutationsSnapsh
std::lock_guard lock(state_mutex);
bool need_data_mutations = res->params.need_data_mutations && num_alter_conversions > 0;
bool need_metatadata_mutations = res->params.needMetadataMutations();
bool need_data_mutations = params.need_data_mutations && data_mutations_to_apply > 0;
bool need_metatadata_mutations = params.min_part_metadata_version < params.metadata_version;
if (!need_data_mutations && !need_metatadata_mutations)
return res;
@ -2113,7 +2113,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
mutation.parts_to_do.clear();
}
decrementAlterConversionsCounter(num_alter_conversions, mutation.entry->commands, lock);
decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, mutation.entry->commands, lock);
}
else if (mutation.parts_to_do.size() == 0)
{
@ -2170,7 +2170,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
LOG_TRACE(log, "Finishing data alter with version {} for entry {}", entry->alter_version, entry->znode_name);
alter_sequence.finishDataAlter(entry->alter_version, lock);
}
decrementAlterConversionsCounter(num_alter_conversions, entry->commands, lock);
decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, entry->commands, lock);
}
}
}

View File

@ -154,7 +154,8 @@ private:
std::map<String, MutationStatus> mutations_by_znode;
/// Unfinished mutations that are required for AlterConversions.
Int64 num_alter_conversions = 0;
Int64 data_mutations_to_apply = 0;
Int64 metadata_mutations_to_apply = 0;
/// Partition -> (block_number -> MutationStatus)
std::unordered_map<String, std::map<Int64, MutationStatus *>> mutations_by_partition;

View File

@ -502,7 +502,7 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, Context
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", version);
incrementAlterConversionsCounter(num_alter_conversions, *it->second.commands, lock);
incrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *it->second.commands, lock);
LOG_INFO(log, "Added mutation: {}{}", mutation_id, additional_info);
}
background_operations_assignee.trigger();
@ -538,7 +538,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
mutation_backoff_policy.removePartFromFailed(failed_part->name);
decrementAlterConversionsCounter(num_alter_conversions, *entry.commands, lock);
decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *entry.commands, lock);
}
}
else
@ -819,7 +819,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
{
bool mutation_finished = *min_version > static_cast<Int64>(mutation_version);
if (!mutation_finished)
decrementAlterConversionsCounter(num_alter_conversions, *it->second.commands, lock);
decrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *it->second.commands, lock);
}
to_kill.emplace(std::move(it->second));
@ -904,7 +904,7 @@ void StorageMergeTree::loadMutations()
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", block_number);
incrementAlterConversionsCounter(num_alter_conversions, *entry_it->second.commands, lock);
incrementMutationsCounters(data_mutations_to_apply, metadata_mutations_to_apply, *entry_it->second.commands, lock);
}
else if (startsWith(it->name(), "tmp_mutation_"))
{
@ -2449,10 +2449,10 @@ MergeTreeData::MutationsSnapshotPtr StorageMergeTree::getMutationsSnapshot(const
std::lock_guard lock(currently_processing_in_background_mutex);
bool need_data_mutations = res->params.need_data_mutations && num_alter_conversions > 0;
bool need_metatadata_mutations = res->params.needMetadataMutations();
bool need_data_mutations = res->params.need_data_mutations && data_mutations_to_apply > 0;
bool need_metadata_mutations = metadata_mutations_to_apply > 0;
if (!need_data_mutations && !need_metatadata_mutations)
if (!need_data_mutations && !need_metadata_mutations)
return res;
for (const auto & [version, entry] : current_mutations_by_version)
@ -2462,7 +2462,7 @@ MergeTreeData::MutationsSnapshotPtr StorageMergeTree::getMutationsSnapshot(const
if (need_data_mutations && AlterConversions::isSupportedDataMutation(command.type))
return true;
if (need_metatadata_mutations && AlterConversions::isSupportedMetadataMutation(command.type))
if (need_metadata_mutations && AlterConversions::isSupportedMetadataMutation(command.type))
return true;
return false;

View File

@ -150,7 +150,8 @@ private:
std::map<UInt64, MergeTreeMutationEntry> current_mutations_by_version;
/// Unfinished mutations that are required for AlterConversions.
Int64 num_alter_conversions = 0;
Int64 data_mutations_to_apply = 0;
Int64 metadata_mutations_to_apply = 0;
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};