Better logic on part checker

This commit is contained in:
alesapin 2021-09-09 18:19:12 +03:00
parent a5f1185197
commit 59edf6b5f6
4 changed files with 37 additions and 33 deletions

View File

@ -111,6 +111,18 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
bool found_part_with_the_same_max_block = false;
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas");
/// Move our replica to the end of replicas
for (auto it = replicas.begin(); it != replicas.end(); ++it)
{
String replica_path = storage.zookeeper_path + "/replicas/" + *it;
if (replica_path == storage.replica_path)
{
std::iter_swap(it, replicas.begin() + replicas.size() - 1);
break;
}
}
/// Check all replicas and our replica must be this last one
for (const String & replica : replicas)
{
String replica_path = storage.zookeeper_path + "/replicas/" + replica;
@ -146,7 +158,7 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
if (found_part_with_the_same_min_block && found_part_with_the_same_max_block)
{
/// FIXME It may never appear
LOG_WARNING(log, "Found parts with the same min block and with the same max block as the missing part {}. Hoping that it will eventually appear as a result of a merge.", part_name);
LOG_WARNING(log, "Found parts with the same min block and with the same max block as the missing part {} on replica {}. Hoping that it will eventually appear as a result of a merge.", part_name, replica);
return MissingPartSearchResult::FoundAndDontNeedFetch;
}
}

View File

@ -55,13 +55,15 @@ void ReplicatedMergeTreeQueue::clear()
mutation_pointer.clear();
}
void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts)
void ReplicatedMergeTreeQueue::initialize(zkutil::ZooKeeperPtr zookeeper)
{
std::lock_guard lock(state_mutex);
for (const auto & part : parts)
Strings parts = zookeeper->getChildren(replica_path + "/parts");
for (const auto & part_name : parts)
{
current_parts.add(part->name, nullptr);
virtual_parts.add(part->name, nullptr);
current_parts.add(part_name, nullptr);
virtual_parts.add(part_name, nullptr);
}
}

View File

@ -277,8 +277,8 @@ public:
/// Clears queue state
void clear();
/// Put a set of (already existing) parts in virtual_parts.
void initialize(const MergeTreeData::DataParts & parts);
/// Get set of parts from zookeeper
void initialize(zkutil::ZooKeeperPtr zookeeper);
/** Inserts an action to the end of the queue.
* To restore broken parts during operation.

View File

@ -1224,34 +1224,24 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
Coordination::Requests ops;
String has_replica = findReplicaHavingPart(part_name, true);
if (!has_replica.empty())
LOG_ERROR(log, "Removing locally missing part from ZooKeeper and queueing a fetch: {}", part_name);
time_t part_create_time = 0;
Coordination::ExistsResponse exists_resp = exists_futures[i].get();
if (exists_resp.error == Coordination::Error::ZOK)
{
LOG_ERROR(log, "Removing locally missing part from ZooKeeper and queueing a fetch: {}", part_name);
time_t part_create_time = 0;
Coordination::ExistsResponse exists_resp = exists_futures[i].get();
if (exists_resp.error == Coordination::Error::ZOK)
{
part_create_time = exists_resp.stat.ctime / 1000;
removePartFromZooKeeper(part_name, ops, exists_resp.stat.numChildren > 0);
}
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = part_name;
log_entry.create_time = part_create_time;
/// We assume that this occurs before the queue is loaded (queue.initialize).
ops.emplace_back(zkutil::makeCreateRequest(
fs::path(replica_path) / "queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
enqueue_futures.emplace_back(zookeeper->asyncMulti(ops));
}
else
{
LOG_ERROR(log, "Not found active replica having part {}", part_name);
enqueuePartForCheck(part_name);
part_create_time = exists_resp.stat.ctime / 1000;
removePartFromZooKeeper(part_name, ops, exists_resp.stat.numChildren > 0);
}
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = part_name;
log_entry.create_time = part_create_time;
/// We assume that this occurs before the queue is loaded (queue.initialize).
ops.emplace_back(zkutil::makeCreateRequest(
fs::path(replica_path) / "queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
enqueue_futures.emplace_back(zookeeper->asyncMulti(ops));
}
for (auto & future : enqueue_futures)
@ -4298,7 +4288,7 @@ void StorageReplicatedMergeTree::startup()
try
{
queue.initialize(getDataParts());
queue.initialize(getZooKeeper());
InterserverIOEndpointPtr data_parts_exchange_ptr = std::make_shared<DataPartsExchange::Service>(*this);
[[maybe_unused]] auto prev_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, data_parts_exchange_ptr);