Fix rare bug when some replicated operations (like mutation) cannot process some parts after data corruption

This commit is contained in:
alesapin 2021-01-27 13:07:18 +03:00
parent bfbb2c5943
commit 01c8b9e1b1
7 changed files with 172 additions and 34 deletions

View File

@ -74,20 +74,9 @@ size_t ReplicatedMergeTreePartCheckThread::size() const
}
void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & part_name)
ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreePartCheckThread::searchForMissingPartOnOtherReplicas(const String & part_name)
{
auto zookeeper = storage.getZooKeeper();
String part_path = storage.replica_path + "/parts/" + part_name;
/// If the part is in ZooKeeper, remove it from there and add the task to download it to the queue.
if (zookeeper->exists(part_path))
{
LOG_WARNING(log, "Part {} exists in ZooKeeper but not locally. Removing from ZooKeeper and queueing a fetch.", part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
storage.removePartAndEnqueueFetch(part_name);
return;
}
/// If the part is not in ZooKeeper, we'll check if it's at least somewhere.
auto part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version);
@ -115,7 +104,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
* and don't delete the queue entry when in doubt.
*/
LOG_WARNING(log, "Checking if anyone has a part covering {}.", part_name);
LOG_WARNING(log, "Checking if anyone has a part {} or covering part.", part_name);
bool found_part_with_the_same_min_block = false;
bool found_part_with_the_same_max_block = false;
@ -123,15 +112,27 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas");
for (const String & replica : replicas)
{
Strings parts = zookeeper->getChildren(storage.zookeeper_path + "/replicas/" + replica + "/parts");
String replica_path = storage.zookeeper_path + "/replicas/" + replica;
Strings parts = zookeeper->getChildren(replica_path + "/parts");
for (const String & part_on_replica : parts)
{
auto part_on_replica_info = MergeTreePartInfo::fromPartName(part_on_replica, storage.format_version);
if (part_info == part_on_replica_info)
{
/// Found missing part at ourself. If we are here than something wrong with this part, so skipping.
if (replica_path == storage.replica_path)
continue;
LOG_WARNING(log, "Found the missing part {} at {} on {}", part_name, part_on_replica, replica);
return MissingPartSearchResult::FoundAndNeedFetch;
}
if (part_on_replica_info.contains(part_info))
{
LOG_WARNING(log, "Found part {} on {} that covers the missing part {}", part_on_replica, replica, part_name);
return;
return MissingPartSearchResult::FoundAndDontNeedFetch;
}
if (part_info.contains(part_on_replica_info))
@ -144,7 +145,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
if (found_part_with_the_same_min_block && found_part_with_the_same_max_block)
{
LOG_WARNING(log, "Found parts with the same min block and with the same max block as the missing part {}. Hoping that it will eventually appear as a result of a merge.", part_name);
return;
return MissingPartSearchResult::FoundAndDontNeedFetch;
}
}
}
@ -160,21 +161,48 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPart(const String & par
not_found_msg = "smaller parts with either the same min block or the same max block.";
LOG_ERROR(log, "No replica has part covering {} and a merge is impossible: we didn't find {}", part_name, not_found_msg);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
return MissingPartSearchResult::LostForever;
}
/// Is it in the replication queue? If there is - delete, because the task can not be processed.
if (!storage.queue.remove(zookeeper, part_name))
void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible(const String & part_name)
{
auto zookeeper = storage.getZooKeeper();
String part_path = storage.replica_path + "/parts/" + part_name;
auto missing_part_search_result = searchForMissingPartOnOtherReplicas(part_name);
/// If the part is in ZooKeeper, remove it from there and add the task to download it to the queue.
if (zookeeper->exists(part_path))
{
/// The part was not in our queue. Why did it happen?
LOG_ERROR(log, "Missing part {} is not in our queue.", part_name);
return;
/// If part found on some other replica
if (missing_part_search_result == MissingPartSearchResult::FoundAndNeedFetch)
{
LOG_WARNING(log, "Part {} exists in ZooKeeper but not locally and found on other replica. Removing from ZooKeeper and queueing a fetch.", part_name);
storage.removePartAndEnqueueFetch(part_name);
}
else /// If we have covering part on other replica or part is lost forever we don't need to fetch anything
{
LOG_WARNING(log, "Part {} exists in ZooKeeper but not locally and not found on other replica. Removing it from ZooKeeper.", part_name);
storage.removePartFromZooKeeper(part_name);
}
}
/** This situation is possible if on all the replicas where the part was, it deteriorated.
* For example, a replica that has just written it has power turned off and the data has not been written from cache to disk.
*/
LOG_ERROR(log, "Part {} is lost forever.", part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
if (missing_part_search_result == MissingPartSearchResult::LostForever)
{
/// Is it in the replication queue? If there is - delete, because the task can not be processed.
if (!storage.queue.remove(zookeeper, part_name))
{
/// The part was not in our queue. Why did it happen?
LOG_ERROR(log, "Missing part {} is not in our queue.", part_name);
}
/** This situation is possible if on all the replicas where the part was, it deteriorated.
* For example, a replica that has just written it has power turned off and the data has not been written from cache to disk.
*/
LOG_ERROR(log, "Part {} is lost forever.", part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedDataLoss);
}
}
@ -193,7 +221,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
/// We do not have this or a covering part.
if (!part)
{
searchForMissingPart(part_name);
searchForMissingPartAndFetchIfPossible(part_name);
return {part_name, false, "Part is missing, will search for it"};
}
/// We have this part, and it's active. We will check whether we need this part and whether it has the right data.
@ -254,11 +282,11 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
tryLogCurrentException(log, __PRETTY_FUNCTION__);
String message = "Part " + part_name + " looks broken. Removing it and queueing a fetch.";
String message = "Part " + part_name + " looks broken. Removing it and will try to fetch.";
LOG_ERROR(log, message);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecksFailed);
storage.removePartAndEnqueueFetch(part_name);
/// Part is broken, let's try to find it and fetch.
searchForMissingPartAndFetchIfPossible(part_name);
/// Delete part locally.
storage.forgetPartAndMoveToDetached(part, "broken");

View File

@ -73,7 +73,24 @@ public:
private:
void run();
void searchForMissingPart(const String & part_name);
/// Search for missing part and queue fetch if possible. Otherwise
/// remove part from zookeeper and queue.
void searchForMissingPartAndFetchIfPossible(const String & part_name);
enum MissingPartSearchResult
{
/// We found this part on other replica, let's fetch it.
FoundAndNeedFetch,
/// We found covering part or source part with same min and max block number
/// don't need to fetch because we should do it during normal queue processing.
FoundAndDontNeedFetch,
/// Covering part not found anywhere and exact part_name doesn't found on other
/// replicas.
LostForever,
};
/// Search for missing part on other replicas or covering part on all replicas (including our replica).
MissingPartSearchResult searchForMissingPartOnOtherReplicas(const String & part_name);
StorageReplicatedMergeTree & storage;
String log_name;

View File

@ -420,13 +420,26 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
{
std::unique_lock lock(state_mutex);
virtual_parts.remove(part_name);
bool removed = virtual_parts.remove(part_name);
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
if ((*it)->new_part_name == part_name)
{
found = *it;
if (removed)
{
/// Preserve invariant `virtual_parts` = `current_parts` + `queue`.
/// We remove new_part from virtual parts and add all source parts
/// which present in current_parts.
for (const auto & source_part : found->source_parts)
{
auto part_in_current_parts = current_parts.getContainingPart(source_part);
if (part_in_current_parts == source_part)
virtual_parts.add(source_part);
}
}
updateStateOnQueueEntryRemoval(
found, /* is_successful = */ false,
min_unprocessed_insert_time_changed, max_processed_insert_time_changed, lock);
@ -1010,7 +1023,7 @@ bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & log_
/// NOTE The above is redundant, but left for a more convenient message in the log.
auto result_part = MergeTreePartInfo::fromPartName(new_part_name, format_version);
/// It can slow down when the size of `future_parts` is large. But it can not be large, since `BackgroundProcessingPool` is limited.
/// It can slow down when the size of `future_parts` is large. But it can not be large, since background pool is limited.
for (const auto & future_part_elem : future_parts)
{
auto future_part = MergeTreePartInfo::fromPartName(future_part_elem.first, format_version);

View File

@ -3008,6 +3008,21 @@ void StorageReplicatedMergeTree::removePartFromZooKeeper(const String & part_nam
ops.emplace_back(zkutil::makeRemoveRequest(part_path, -1));
}
void StorageReplicatedMergeTree::removePartFromZooKeeper(const String & part_name)
{
auto zookeeper = getZooKeeper();
String part_path = replica_path + "/parts/" + part_name;
Coordination::Stat stat;
/// Part doesn't exist, nothing to remove
if (!zookeeper->exists(part_path, &stat))
return;
Coordination::Requests ops;
removePartFromZooKeeper(part_name, ops, stat.numChildren > 0);
zookeeper->multi(ops);
}
void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name)
{

View File

@ -381,6 +381,9 @@ private:
/// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
void removePartFromZooKeeper(const String & part_name, Coordination::Requests & ops, bool has_children);
/// Just removes part from ZooKeeper using previous method
void removePartFromZooKeeper(const String & part_name);
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names,
NameSet * parts_should_be_retried = nullptr);

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,61 @@
import pytest
from helpers.cluster import ClickHouseCluster
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node1.query('''
CREATE TABLE replicated_mt(date Date, id UInt32, value Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{replica}') ORDER BY id;
'''.format(replica=node1.name))
yield cluster
finally:
cluster.shutdown()
def corrupt_data_part_on_disk(node, table, part_name):
part_path = node.query(
"SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(table, part_name)).strip()
node.exec_in_container(['bash', '-c',
'cd {p} && ls *.bin | head -n 1 | xargs -I{{}} sh -c \'echo "1" >> $1\' -- {{}}'.format(
p=part_path)], privileged=True)
def test_merge_and_part_corruption(started_cluster):
node1.query("SYSTEM STOP REPLICATION QUEUES replicated_mt")
for i in range(4):
node1.query("INSERT INTO replicated_mt SELECT toDate('2019-10-01'), number, number * number FROM numbers ({f}, 100000)".format(f=i*100000))
assert node1.query("SELECT COUNT() FROM system.parts WHERE table='replicated_mt' AND active=1") == "4\n"
# Need to corrupt "border part" (left or right). If we will corrupt something in the middle
# clickhouse will not consider merge as broken, because we have parts with the same min and max
# block numbers.
corrupt_data_part_on_disk(node1, 'replicated_mt', 'all_3_3_0')
with Pool(1) as p:
def optimize_with_delay(x):
node1.query("OPTIMIZE TABLE replicated_mt FINAL", timeout=30)
# corrupt part after merge already assigned, but not started
res_opt = p.apply_async(optimize_with_delay, (1,))
node1.query("CHECK TABLE replicated_mt", settings={"check_query_single_value_result": 0})
# start merge
node1.query("SYSTEM START REPLICATION QUEUES replicated_mt")
res_opt.get()
# will hung if checked bug not fixed
node1.query("ALTER TABLE replicated_mt UPDATE value = 7 WHERE 1", settings={"mutations_sync": 2}, timeout=30)
assert node1.query("SELECT sum(value) FROM replicated_mt") == "2100000\n"