mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
try fix intersecting virtual parts
This commit is contained in:
parent
16647fe8ce
commit
2571ad7d43
@ -152,6 +152,10 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
|
||||
out << "sync_pinned_part_uuids\n";
|
||||
break;
|
||||
|
||||
case PART_IS_LOST:
|
||||
out << "lost\n" << new_part_name;
|
||||
break;
|
||||
|
||||
default:
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown log entry type: {}", static_cast<int>(type));
|
||||
}
|
||||
@ -326,6 +330,11 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
|
||||
in >> new_part_name;
|
||||
in >> "\nsource_shard: " >> source_shard;
|
||||
}
|
||||
else if (type_str == "lost")
|
||||
{
|
||||
type = PART_IS_LOST;
|
||||
in >> new_part_name;
|
||||
}
|
||||
|
||||
if (!trailing_newline_found)
|
||||
in >> "\n";
|
||||
@ -389,7 +398,6 @@ void ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry::readText(ReadBuffer & i
|
||||
|
||||
bool ReplicatedMergeTreeLogEntryData::ReplaceRangeEntry::isMovePartitionOrAttachFrom(const MergeTreePartInfo & drop_range_info)
|
||||
{
|
||||
assert(drop_range_info.getBlocksCount() != 0);
|
||||
return drop_range_info.getBlocksCount() == 1;
|
||||
}
|
||||
|
||||
@ -413,6 +421,24 @@ ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String
|
||||
return res;
|
||||
}
|
||||
|
||||
std::optional<String> ReplicatedMergeTreeLogEntryData::getDropRange(MergeTreeDataFormatVersion format_version) const
|
||||
{
|
||||
if (type == DROP_RANGE)
|
||||
return new_part_name;
|
||||
|
||||
if (type == REPLACE_RANGE)
|
||||
{
|
||||
auto drop_range_info = MergeTreePartInfo::fromPartName(replace_range_entry->drop_range_part_name, format_version);
|
||||
if (!ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range_info))
|
||||
{
|
||||
/// It's REPLACE, not MOVE or ATTACH, so drop range is real
|
||||
return replace_range_entry->drop_range_part_name;
|
||||
}
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormatVersion format_version) const
|
||||
{
|
||||
/// Doesn't produce any part
|
||||
@ -430,12 +456,8 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
|
||||
if (type == REPLACE_RANGE)
|
||||
{
|
||||
Strings res = replace_range_entry->new_part_names;
|
||||
auto drop_range_info = MergeTreePartInfo::fromPartName(replace_range_entry->drop_range_part_name, format_version);
|
||||
if (!ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range_info))
|
||||
{
|
||||
/// It's REPLACE, not MOVE or ATTACH, so drop range is real
|
||||
res.emplace_back(replace_range_entry->drop_range_part_name);
|
||||
}
|
||||
if (auto drop_range = getDropRange(format_version))
|
||||
res.emplace_back(*drop_range);
|
||||
return res;
|
||||
}
|
||||
|
||||
@ -447,6 +469,10 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
|
||||
if (type == CLONE_PART_FROM_SHARD)
|
||||
return {};
|
||||
|
||||
/// Doesn't produce any part.
|
||||
if (type == PART_IS_LOST)
|
||||
return {};
|
||||
|
||||
return {new_part_name};
|
||||
}
|
||||
|
||||
|
@ -45,6 +45,7 @@ struct ReplicatedMergeTreeLogEntryData
|
||||
ALTER_METADATA, /// Apply alter modification according to global /metadata and /columns paths
|
||||
SYNC_PINNED_PART_UUIDS, /// Synchronization point for ensuring that all replicas have up to date in-memory state.
|
||||
CLONE_PART_FROM_SHARD, /// Clone part from another shard.
|
||||
PART_IS_LOST, /// Cancels previous operations with lost data part. Kinda "anti-merge".
|
||||
};
|
||||
|
||||
static String typeToString(Type type)
|
||||
@ -62,6 +63,7 @@ struct ReplicatedMergeTreeLogEntryData
|
||||
case ReplicatedMergeTreeLogEntryData::ALTER_METADATA: return "ALTER_METADATA";
|
||||
case ReplicatedMergeTreeLogEntryData::SYNC_PINNED_PART_UUIDS: return "SYNC_PINNED_PART_UUIDS";
|
||||
case ReplicatedMergeTreeLogEntryData::CLONE_PART_FROM_SHARD: return "CLONE_PART_FROM_SHARD";
|
||||
case ReplicatedMergeTreeLogEntryData::PART_IS_LOST: return "PART_IS_LOST";
|
||||
default:
|
||||
throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
@ -140,6 +142,18 @@ struct ReplicatedMergeTreeLogEntryData
|
||||
/// selection of merges. These parts are added to queue.virtual_parts.
|
||||
Strings getVirtualPartNames(MergeTreeDataFormatVersion format_version) const;
|
||||
|
||||
/// Returns name of part that will never appear and should be removed from virtual parts set.
|
||||
/// It's required to correctly cancel merge which cannot be executed, because some source part is lost forever.
|
||||
/// Do not use it for other purposes, it can be dangerous.
|
||||
std::optional<String> getAntiVirtualPartName() const
|
||||
{
|
||||
if (type == PART_IS_LOST)
|
||||
return new_part_name;
|
||||
return {};
|
||||
}
|
||||
|
||||
std::optional<String> getDropRange(MergeTreeDataFormatVersion format_version) const;
|
||||
|
||||
/// Returns set of parts that denote the block number ranges that should be blocked during the entry execution.
|
||||
/// These parts are added to future_parts.
|
||||
Strings getBlockingPartNames(MergeTreeDataFormatVersion format_version) const
|
||||
|
@ -33,14 +33,20 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree &
|
||||
log = &Poco::Logger::get(logger_name);
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts)
|
||||
void ReplicatedMergeTreeQueue::clear()
|
||||
{
|
||||
addVirtualParts(parts);
|
||||
auto locks = lockQueue();
|
||||
assert(future_parts.empty());
|
||||
current_parts.clear();
|
||||
virtual_parts.clear();
|
||||
queue.clear();
|
||||
inserts_by_time.clear();
|
||||
mutations_by_znode.clear();
|
||||
mutations_by_partition.clear();
|
||||
mutation_pointer.clear();
|
||||
}
|
||||
|
||||
|
||||
void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts)
|
||||
void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts)
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
|
||||
@ -74,9 +80,6 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
|
||||
/// Reset batch size on initialization to recover from possible errors of too large batch size.
|
||||
current_multi_batch_size = 1;
|
||||
|
||||
String log_pointer_str = zookeeper->get(replica_path + "/log_pointer");
|
||||
log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
|
||||
|
||||
std::unordered_set<String> already_loaded_paths;
|
||||
{
|
||||
std::lock_guard lock(state_mutex);
|
||||
@ -132,6 +135,45 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
|
||||
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
|
||||
std::lock_guard<std::mutex> & state_lock)
|
||||
{
|
||||
if (auto lost_part = entry->getAntiVirtualPartName())
|
||||
{
|
||||
LOG_TRACE(log, "Reading {}: PART_IS_LOST for {}", entry->znode_name, *lost_part);
|
||||
QueueIters entries = findEntriesByNewPartName(*lost_part);
|
||||
bool removed = virtual_parts.remove(*lost_part);
|
||||
if (!removed && !entries.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Found {} entries for lost part {}, "
|
||||
"but nothing is removed from virtual parts", entries.size(), *lost_part);
|
||||
for (const auto & entry_it : entries)
|
||||
{
|
||||
for (const auto & source_part : (*entry_it)->source_parts)
|
||||
{
|
||||
String containing_current_part = current_parts.getContainingPart(source_part);
|
||||
if (containing_current_part.empty())
|
||||
{
|
||||
bool has_source_entry = std::any_of(queue.begin(), queue.end(), [&lost_part](auto e)
|
||||
{
|
||||
return e->new_part_name == *lost_part;
|
||||
});
|
||||
if (!has_source_entry)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Source entry ({}) of lost virtual part {} does not exist",
|
||||
source_part, *lost_part);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (containing_current_part != source_part)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Source part {} of lost part {} is covered by current part {}. Entry: {}",
|
||||
source_part, *lost_part, containing_current_part, (*entry_it)->toString());
|
||||
}
|
||||
|
||||
virtual_parts.add(source_part);
|
||||
}
|
||||
}
|
||||
|
||||
lost_forever_parts.insert(*lost_part);
|
||||
}
|
||||
|
||||
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
|
||||
{
|
||||
virtual_parts.add(virtual_part_name);
|
||||
@ -228,16 +270,10 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
|
||||
removeCoveredPartsFromMutations(virtual_part_name, /*remove_part = */ false, /*remove_covered_parts = */ true);
|
||||
}
|
||||
|
||||
String drop_range_part_name;
|
||||
if (entry->type == LogEntry::DROP_RANGE)
|
||||
drop_range_part_name = entry->new_part_name;
|
||||
else if (entry->type == LogEntry::REPLACE_RANGE)
|
||||
drop_range_part_name = entry->replace_range_entry->drop_range_part_name;
|
||||
|
||||
if (!drop_range_part_name.empty())
|
||||
if (auto drop_range_part_name = entry->getDropRange(format_version))
|
||||
{
|
||||
current_parts.remove(drop_range_part_name);
|
||||
virtual_parts.remove(drop_range_part_name);
|
||||
current_parts.remove(*drop_range_part_name);
|
||||
virtual_parts.remove(*drop_range_part_name);
|
||||
}
|
||||
|
||||
if (entry->type == LogEntry::ALTER_METADATA)
|
||||
@ -301,10 +337,7 @@ void ReplicatedMergeTreeQueue::addPartToMutations(const String & part_name)
|
||||
{
|
||||
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
|
||||
|
||||
/// Do not add special virtual parts to parts_to_do
|
||||
auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION
|
||||
auto another_max_level = std::numeric_limits<decltype(part_info.level)>::max(); /// REPLACE/MOVE PARTITION
|
||||
if (part_info.level == max_level || part_info.level == another_max_level)
|
||||
if (part_info.isFakeDropRangePart())
|
||||
return;
|
||||
|
||||
auto in_partition = mutations_by_partition.find(part_info.partition_id);
|
||||
@ -392,7 +425,8 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
|
||||
}
|
||||
|
||||
if (!found && need_remove_from_zk)
|
||||
throw Exception("Can't find " + entry->znode_name + " in the memory queue. It is a bug", ErrorCodes::LOGICAL_ERROR);
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find {} in the memory queue. It is a bug. Entry info: {}",
|
||||
entry->znode_name, entry->toString());
|
||||
|
||||
notifySubscribers(queue_size);
|
||||
|
||||
@ -461,9 +495,162 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo & part_info)
|
||||
bool ReplicatedMergeTreeQueue::markPartAsLostForever(zkutil::ZooKeeperPtr zookeeper, const String & part_name)
|
||||
{
|
||||
LogEntry entry;
|
||||
entry.type = LogEntry::PART_IS_LOST;
|
||||
entry.source_replica = storage.replica_name;
|
||||
entry.new_part_name = part_name;
|
||||
|
||||
size_t max_iters = 100;
|
||||
while (--max_iters)
|
||||
{
|
||||
ReplicatedMergeTreeMergePredicate merge_pred = getMergePredicate(zookeeper);
|
||||
|
||||
if (!merge_pred.ensurePartIsLost(part_name, log))
|
||||
return false;
|
||||
|
||||
Coordination::Requests ops;
|
||||
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/log", merge_pred.getVersion()));
|
||||
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/log", "", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
Coordination::Responses responses;
|
||||
Coordination::Error rc = zookeeper->tryMulti(ops, responses);
|
||||
|
||||
if (rc == Coordination::Error::ZBADVERSION)
|
||||
{
|
||||
LOG_TRACE(log, "A new log entry appeared while trying to commit PART_IS_LOST. Retry.");
|
||||
continue;
|
||||
} else
|
||||
zkutil::KeeperMultiException::check(rc, ops, responses);
|
||||
|
||||
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*responses.back()).path_created;
|
||||
LOG_TRACE(log, "Created PART_IS_LOST entry ({}) for part {}", path_created, part_name);
|
||||
return true;
|
||||
}
|
||||
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot create PART_IS_LOST entry for {}", part_name);
|
||||
}
|
||||
|
||||
ReplicatedMergeTreeQueue::QueueIters ReplicatedMergeTreeQueue::findEntriesByNewPartName(const String & part_name) const
|
||||
{
|
||||
QueueIters entries;
|
||||
size_t get_entries_num = 0;
|
||||
for (auto it = queue.begin(); it != queue.end(); ++it)
|
||||
{
|
||||
if ((*it)->new_part_name == part_name && (*it)->type != LogEntry::PART_IS_LOST)
|
||||
{
|
||||
entries.push_back(it);
|
||||
get_entries_num += (*it)->type == LogEntry::GET_PART;
|
||||
}
|
||||
}
|
||||
|
||||
if (!entries.empty())
|
||||
{
|
||||
if (entries.size() != 1)
|
||||
{
|
||||
/// Replication queue may contain multiple part producing entries with the same new_part_name
|
||||
/// if replica was recently cloned. At first cloneReplica(...) copies source replica queue,
|
||||
/// and after that it reads data parts set from source_path + "/parts"
|
||||
/// to create GET_PART entries in own queue and download missing parts.
|
||||
/// Therefore, some MERGE_PART (or MUTATE_PART, or even GET_PART) entry
|
||||
/// may be duplicated with GET_PART with the same new_part_name.
|
||||
if (1 < entries.size() - get_entries_num)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Found {} queue entries for {}, only {} are GET_PART",
|
||||
entries.size(), part_name, get_entries_num);
|
||||
|
||||
LOG_WARNING(log, "Found {} queue entries for {}, including {} GET_PART entries. "
|
||||
"It may rarely happen after replica cloning", entries.size(), part_name, get_entries_num);
|
||||
}
|
||||
}
|
||||
|
||||
return entries;
|
||||
}
|
||||
|
||||
void ReplicatedMergeTreeQueue::executePartIsLost(zkutil::ZooKeeperPtr zookeeper, LogEntry & entry_lost)
|
||||
{
|
||||
LOG_TRACE(log, "Executing {}: PART_IS_LOST for part {} ({})", entry_lost.znode_name, entry_lost.new_part_name);
|
||||
|
||||
/// There might be multiple cancelled parts if replica was cloned...
|
||||
|
||||
size_t queue_size = 0;
|
||||
std::optional<time_t> min_unprocessed_insert_time_changed;
|
||||
std::optional<time_t> max_processed_insert_time_changed;
|
||||
Coordination::Requests ops;
|
||||
{
|
||||
std::unique_lock lock(state_mutex);
|
||||
QueueIters cancelled_entries = findEntriesByNewPartName(entry_lost.new_part_name);
|
||||
|
||||
for (const auto entry_it : cancelled_entries)
|
||||
{
|
||||
LogEntryPtr canceled_entry;
|
||||
canceled_entry = *entry_it;
|
||||
LOG_TRACE(log, "Removing cancelled log entry {}: {}", canceled_entry->znode_name, canceled_entry->toString());
|
||||
ops.emplace_back(zkutil::makeRemoveRequest(replica_path + "/queue/" + canceled_entry->znode_name, -1));
|
||||
updateStateOnQueueEntryRemoval(
|
||||
canceled_entry, /* is_successful = */ false,
|
||||
min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
|
||||
queue.erase(entry_it);
|
||||
queue_size = queue.size();
|
||||
|
||||
LOG_TRACE(log, "Waiting for {} to finish", canceled_entry->znode_name);
|
||||
canceled_entry->execution_complete.wait(lock, [&canceled_entry]
|
||||
{ return !canceled_entry->currently_executing; });
|
||||
}
|
||||
}
|
||||
|
||||
notifySubscribers(queue_size);
|
||||
|
||||
zookeeper->multi(ops);
|
||||
|
||||
updateTimesInZooKeeper(zookeeper, min_unprocessed_insert_time_changed, max_processed_insert_time_changed);
|
||||
|
||||
{
|
||||
std::unique_lock lock(state_mutex);
|
||||
lost_forever_parts.erase(entry_lost.new_part_name);
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Executed {}: PART_IS_LOST for part {}", entry_lost.znode_name, entry_lost.new_part_name);
|
||||
}
|
||||
|
||||
bool ReplicatedMergeTreeMergePredicate::ensurePartIsLost(const String & part_name, Poco::Logger * log) const
|
||||
{
|
||||
String containing_part;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(queue.state_mutex);
|
||||
containing_part = queue.virtual_parts.getContainingPart(part_name);
|
||||
}
|
||||
|
||||
if (containing_part.empty())
|
||||
{
|
||||
LOG_WARNING(log, "Cannot find lost part {} in virtual parts set. "
|
||||
"Probably it's already marked as lost by another replica.", part_name);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (containing_part != part_name)
|
||||
{
|
||||
auto info = MergeTreePartInfo::fromPartName(containing_part, queue.format_version);
|
||||
if (info.isFakeDropRangePart())
|
||||
{
|
||||
/// It does not matter anymore if part is lost, it should be removed anyway
|
||||
LOG_INFO(log, "Lost part {} is covered by drop range {}, ignoring it", part_name, containing_part);
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Lost part is covered by real part. It means that merging/mutating operation
|
||||
/// was assigned on lost part by some replica. It probably means that part is not lost.
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Lost part {} is covered by real part {}", part_name, containing_part);
|
||||
}
|
||||
|
||||
/// We can mark part as lost
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ReplicatedMergeTreeQueue::removeFailedQuorumPart(const MergeTreePartInfo & part_info)
|
||||
{
|
||||
if (part_info.level != 0)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot remove part with non-zero level: {}", part_info.getPartName());
|
||||
std::lock_guard lock(state_mutex);
|
||||
return virtual_parts.remove(part_info);
|
||||
}
|
||||
@ -587,8 +774,6 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
|
||||
{
|
||||
std::lock_guard state_lock(state_mutex);
|
||||
|
||||
log_pointer = last_entry_index + 1;
|
||||
|
||||
for (size_t copied_entry_idx = 0, num_copied_entries = copied_entries.size(); copied_entry_idx < num_copied_entries; ++copied_entry_idx)
|
||||
{
|
||||
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*responses[copied_entry_idx]).path_created;
|
||||
@ -758,9 +943,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
|
||||
/// Such parts do not exist and will never appear, so we should not add virtual parts to parts_to_do list.
|
||||
/// Fortunately, it's easy to distinguish virtual parts from normal parts by part level.
|
||||
/// See StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(...)
|
||||
auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION
|
||||
auto another_max_level = std::numeric_limits<decltype(part_info.level)>::max(); /// REPLACE/MOVE PARTITION
|
||||
if (part_info.level == max_level || part_info.level == another_max_level)
|
||||
if (part_info.isFakeDropRangePart())
|
||||
continue;
|
||||
|
||||
auto it = entry->block_numbers.find(part_info.partition_id);
|
||||
@ -941,9 +1124,6 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
|
||||
if ((*it)->currently_executing)
|
||||
to_wait.push_back(*it);
|
||||
auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
|
||||
/// FIXME it's probably unsafe to remove entries non-atomically
|
||||
/// when this method called directly from alter query (not from replication queue task),
|
||||
/// because entries will be lost if ALTER fails.
|
||||
if (code != Coordination::Error::ZOK)
|
||||
LOG_INFO(log, "Couldn't remove {}: {}", replica_path + "/queue/" + (*it)->znode_name, Coordination::errorMessage(code));
|
||||
|
||||
@ -1038,11 +1218,26 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
||||
|| entry.type == LogEntry::ATTACH_PART
|
||||
|| entry.type == LogEntry::MUTATE_PART)
|
||||
{
|
||||
assert(entry.getBlockingPartNames(format_version).size() == 1);
|
||||
for (const String & new_part_name : entry.getBlockingPartNames(format_version))
|
||||
{
|
||||
if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
|
||||
return false;
|
||||
}
|
||||
|
||||
String covering_part = virtual_parts.getContainingPart(entry.new_part_name);
|
||||
if (covering_part.empty())
|
||||
{
|
||||
if (lost_forever_parts.count(entry.new_part_name))
|
||||
return false;
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} not found in virtual parts, but it's not lost. "
|
||||
"Entry {}: {}\nVirtual: {}\nLost:{}",
|
||||
entry.new_part_name, entry.znode_name, entry.toString(),
|
||||
fmt::join(virtual_parts.getParts(), ", "), fmt::join(lost_forever_parts, ", "));
|
||||
}
|
||||
|
||||
/// NOTE: It's possible that (covering_part == entry.new_part_name), because isNotCoveredByFuturePartsImpl(...)
|
||||
/// checks for future parts only, not for virtual parts.
|
||||
}
|
||||
|
||||
/// Check that fetches pool is not overloaded
|
||||
@ -1259,7 +1454,8 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const Replicate
|
||||
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
|
||||
{
|
||||
if (!queue.future_parts.emplace(new_part_name, entry).second)
|
||||
throw Exception("Tagging already tagged future part " + new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. "
|
||||
"It happened on attempt to execute {}: {}", new_part_name, entry->znode_name, entry->toString());
|
||||
}
|
||||
}
|
||||
|
||||
@ -1277,7 +1473,8 @@ void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(ReplicatedM
|
||||
return;
|
||||
|
||||
if (!queue.future_parts.emplace(entry.actual_new_part_name, entry.shared_from_this()).second)
|
||||
throw Exception("Attaching already existing future part " + entry.actual_new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attaching already existing future part {}. This is a bug. "
|
||||
"It happened on attempt to execute {}: {}", entry.actual_new_part_name, entry.znode_name, entry.toString());
|
||||
}
|
||||
|
||||
|
||||
@ -1296,13 +1493,19 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
|
||||
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
|
||||
{
|
||||
if (!queue.future_parts.erase(new_part_name))
|
||||
{
|
||||
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", new_part_name);
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
if (!entry->actual_new_part_name.empty())
|
||||
{
|
||||
if (entry->actual_new_part_name != entry->new_part_name && !queue.future_parts.erase(entry->actual_new_part_name))
|
||||
{
|
||||
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", entry->actual_new_part_name);
|
||||
assert(false);
|
||||
}
|
||||
|
||||
entry->actual_new_part_name.clear();
|
||||
}
|
||||
|
@ -92,8 +92,8 @@ private:
|
||||
using FuturePartsSet = std::map<String, LogEntryPtr>;
|
||||
FuturePartsSet future_parts;
|
||||
|
||||
/// Index of the first log entry that we didn't see yet.
|
||||
Int64 log_pointer = 0;
|
||||
using LostPartsSet = std::set<String>;
|
||||
LostPartsSet lost_forever_parts;
|
||||
|
||||
/// Avoid parallel execution of queue enties, which may remove other entries from the queue.
|
||||
bool currently_executing_drop_or_replace_range = false;
|
||||
@ -275,6 +275,7 @@ public:
|
||||
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_);
|
||||
~ReplicatedMergeTreeQueue();
|
||||
|
||||
void clear();
|
||||
|
||||
void initialize(const MergeTreeData::DataParts & parts);
|
||||
|
||||
@ -289,13 +290,20 @@ public:
|
||||
*/
|
||||
bool remove(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
|
||||
|
||||
using QueueIters = std::vector<Queue::const_iterator>;
|
||||
QueueIters findEntriesByNewPartName(const String & part_name) const;
|
||||
|
||||
bool markPartAsLostForever(zkutil::ZooKeeperPtr zookeeper, const String & part_name);
|
||||
|
||||
void executePartIsLost(zkutil::ZooKeeperPtr zookeeper, LogEntry & entry_lost);
|
||||
|
||||
/** Load (initialize) a queue from ZooKeeper (/replicas/me/queue/).
|
||||
* If queue was not empty load() would not load duplicate records.
|
||||
* return true, if we update queue.
|
||||
*/
|
||||
bool load(zkutil::ZooKeeperPtr zookeeper);
|
||||
|
||||
bool removeFromVirtualParts(const MergeTreePartInfo & part_info);
|
||||
bool removeFailedQuorumPart(const MergeTreePartInfo & part_info);
|
||||
|
||||
/** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.
|
||||
* If watch_callback is not empty, will call it when new entries appear in the log.
|
||||
@ -478,6 +486,8 @@ public:
|
||||
/// The version of "log" node that is used to check that no new merges have appeared.
|
||||
int32_t getVersion() const { return merges_version; }
|
||||
|
||||
bool ensurePartIsLost(const String & part_name, Poco::Logger * log) const;
|
||||
|
||||
private:
|
||||
const ReplicatedMergeTreeQueue & queue;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user