This commit is contained in:
alesapin 2020-02-12 13:00:24 +03:00
parent 9e0d4b0bd4
commit 526428ffe0
2 changed files with 24 additions and 18 deletions

View File

@ -288,7 +288,7 @@ void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name)
{
MutationStatus & status = *it->second;
LOG_DEBUG(log, "Removing part name:" << part_name << " from mutation:" << status.entry->znode_name);
LOG_DEBUG(log, "Removing part name:" << part_name << " from mutation:" << status.entry->znode_name << " block number :" << status.entry->block_numbers.begin()->second);
status.parts_to_do.removePartAndCoveredParts(part_name);
if (status.parts_to_do.size() == 0)
some_mutations_are_probably_done = true;
@ -995,19 +995,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
MergeTreeData & data,
std::lock_guard<std::mutex> & state_lock) const
{
if (entry.type == LogEntry::GET_PART)
{
if (!entry.actual_new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.actual_new_part_name, format_version, state_lock))
return false;
if (!entry.new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock))
return false;
}
if (entry.type == LogEntry::MERGE_PARTS
|| entry.type == LogEntry::GET_PART
|| entry.type == LogEntry::MUTATE_PART)
{
if (!entry.actual_new_part_name.empty()
&& !alter_sequence.canExecuteGetEntry(entry.actual_new_part_name, format_version, state_lock))
return false;
if (!entry.new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock))
return false;
for (const String & new_part_name : entry.getBlockingPartNames())
{
if (!isNotCoveredByFuturePartsImpl(new_part_name, out_postpone_reason, state_lock))

View File

@ -55,7 +55,7 @@ public:
void addMetadataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
LOG_DEBUG(log, "Adding meta with alter version:" << alter_version);
//LOG_DEBUG(log, "Adding meta with alter version:" << alter_version);
if (!queue_state.count(alter_version))
queue_state.emplace(alter_version, AlterInQueue({}, false));
else
@ -73,14 +73,17 @@ public:
for (const auto & [block_number, state] : queue_state)
{
LOG_DEBUG(log, "Looking at block:" << block_number << " with part name:" << part_name);
if (state.block_numbers.count(info.partition_id))
LOG_DEBUG(log, "Looking at alter:" << block_number << " with part name:" << part_name);
if (!state.block_numbers.empty())
{
LOG_DEBUG(log, "Block number:" << block_number << " has part name " << part_name << " version " << state.block_numbers.at(info.partition_id));
return info.getDataVersion() < state.block_numbers.at(info.partition_id);
LOG_DEBUG(log, "Block number:" << block_number << " has part name " << part_name << " version " << state.block_numbers.at(info.partition_id) << " metadata is done:" << state.metadata_finished);
if (!state.metadata_finished)
return info.getDataVersion() < state.block_numbers.at(info.partition_id);
else
return info.getDataVersion() <= state.block_numbers.at(info.partition_id);
}
}
LOG_DEBUG(log, "Nobody has block number for part " << part_name);
//LOG_DEBUG(log, "Nobody has block number for part " << part_name);
return true;
}
@ -95,10 +98,11 @@ public:
if (queue_state.begin()->first != alter_version)
{
LOG_DEBUG(log, "Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first));
//LOG_DEBUG(log, "Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first));
throw Exception("Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first), ErrorCodes::LOGICAL_ERROR);
}
LOG_DEBUG(log, "FINISH METADATA ALTER: " << alter_version);
if (!have_data_alter)
{
queue_state.erase(alter_version);
@ -115,12 +119,14 @@ public:
/// queue can be empty after load of finished mutation without move of mutation pointer
if (queue_state.empty())
{
LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE EMPTY");
//LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE EMPTY");
return;
}
LOG_DEBUG(log, "FINISH DATA ALTER: " << alter_version);
if (!queue_state.count(alter_version))
std::terminate();
queue_state.erase(alter_version);
}
@ -133,6 +139,8 @@ public:
}
if (alter_version < queue_state.begin()->first)
return true;
if (!queue_state.count(alter_version))
std::terminate();
return queue_state.at(alter_version).metadata_finished;
}
bool canExecuteMetaAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const