Merge pull request #28817 from ClickHouse/fix_part_checker

Fix rare replicas diverge
This commit is contained in:
alesapin 2021-09-17 18:06:27 +03:00 committed by GitHub
commit 4adc4e9152
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 162 additions and 37 deletions

View File

@ -111,6 +111,18 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
bool found_part_with_the_same_max_block = false;
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas");
/// Move our replica to the end of replicas
for (auto it = replicas.begin(); it != replicas.end(); ++it)
{
String replica_path = storage.zookeeper_path + "/replicas/" + *it;
if (replica_path == storage.replica_path)
{
std::iter_swap(it, replicas.rbegin());
break;
}
}
/// Check all replicas and our replica must be this last one
for (const String & replica : replicas)
{
String replica_path = storage.zookeeper_path + "/replicas/" + replica;
@ -146,7 +158,7 @@ ReplicatedMergeTreePartCheckThread::MissingPartSearchResult ReplicatedMergeTreeP
if (found_part_with_the_same_min_block && found_part_with_the_same_max_block)
{
/// FIXME It may never appear
LOG_WARNING(log, "Found parts with the same min block and with the same max block as the missing part {}. Hoping that it will eventually appear as a result of a merge.", part_name);
LOG_WARNING(log, "Found parts with the same min block and with the same max block as the missing part {} on replica {}. Hoping that it will eventually appear as a result of a merge.", part_name, replica);
return MissingPartSearchResult::FoundAndDontNeedFetch;
}
}

View File

@ -55,14 +55,31 @@ void ReplicatedMergeTreeQueue::clear()
mutation_pointer.clear();
}
void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts)
void ReplicatedMergeTreeQueue::initialize(zkutil::ZooKeeperPtr zookeeper)
{
std::lock_guard lock(state_mutex);
for (const auto & part : parts)
LOG_TRACE(log, "Initializing parts in queue");
/// Get current parts state from zookeeper
Strings parts = zookeeper->getChildren(replica_path + "/parts");
for (const auto & part_name : parts)
{
current_parts.add(part->name, nullptr);
virtual_parts.add(part->name, nullptr);
LOG_TEST(log, "Adding part {} to current and virtual parts", part_name);
current_parts.add(part_name, nullptr);
virtual_parts.add(part_name, nullptr);
}
/// Drop parts can negatively affect virtual parts. So when we load parts
/// from zookeeper we can break invariant with virtual parts. To fix this we
/// have it here.
for (const LogEntryPtr & entry : queue)
{
if (entry->isDropPart(format_version))
virtual_parts.removePartAndCoveredParts(*entry->getDropRange(format_version));
}
LOG_TRACE(log, "Queue initialized");
}
bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const
@ -163,7 +180,11 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
std::lock_guard<std::mutex> & state_lock)
{
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
auto entry_virtual_parts = entry->getVirtualPartNames(format_version);
LOG_TEST(log, "Insert entry {} to queue with type {} with virtual parts [{}]", entry->znode_name, entry->typeToString(), fmt::join(entry_virtual_parts, ", "));
for (const String & virtual_part_name : entry_virtual_parts)
{
virtual_parts.add(virtual_part_name, nullptr);
/// Don't add drop range parts to mutations
@ -227,6 +248,11 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
std::optional<time_t> & max_processed_insert_time_changed,
std::unique_lock<std::mutex> & state_lock)
{
auto entry_virtual_parts = entry->getVirtualPartNames(format_version);
LOG_TEST(log, "Removing {} entry {} from queue with type {} with virtual parts [{}]",
is_successful ? "successful" : "unsuccessful",
entry->znode_name, entry->typeToString(), fmt::join(entry_virtual_parts, ", "));
/// Update insert times.
if (entry->type == LogEntry::GET_PART || entry->type == LogEntry::ATTACH_PART)
{
@ -254,6 +280,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
{
if (!entry->actual_new_part_name.empty())
{
LOG_TEST(log, "Entry {} has actual new part name {}, removing it from mutations", entry->znode_name, entry->actual_new_part_name);
/// We don't add bigger fetched part to current_parts because we
/// have an invariant `virtual_parts` = `current_parts` + `queue`.
///
@ -264,7 +291,9 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
removeCoveredPartsFromMutations(entry->actual_new_part_name, /*remove_part = */ false, /*remove_covered_parts = */ true);
}
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
LOG_TEST(log, "Adding parts [{}] to current parts", fmt::join(entry_virtual_parts, ", "));
for (const String & virtual_part_name : entry_virtual_parts)
{
current_parts.add(virtual_part_name, nullptr);
@ -275,14 +304,21 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
if (auto drop_range_part_name = entry->getDropRange(format_version))
{
MergeTreePartInfo drop_range_info = MergeTreePartInfo::fromPartName(*drop_range_part_name, format_version);
/// DROP PART doesn't have virtual parts so remove from current
/// parts all covered parts.
if (entry->isDropPart(format_version))
{
LOG_TEST(log, "Removing drop part from current and virtual parts {}", *drop_range_part_name);
current_parts.removePartAndCoveredParts(*drop_range_part_name);
}
else
{
LOG_TEST(log, "Removing drop range from current and virtual parts {}", *drop_range_part_name);
current_parts.remove(*drop_range_part_name);
}
virtual_parts.remove(*drop_range_part_name);
@ -307,7 +343,9 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
drop_ranges.removeDropRange(entry);
}
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
LOG_TEST(log, "Removing unsuccessful entry {} virtual parts [{}]", entry->znode_name, fmt::join(entry_virtual_parts, ", "));
for (const String & virtual_part_name : entry_virtual_parts)
{
/// This part will never appear, so remove it from virtual parts
virtual_parts.remove(virtual_part_name);
@ -324,6 +362,9 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
void ReplicatedMergeTreeQueue::removeCoveredPartsFromMutations(const String & part_name, bool remove_part, bool remove_covered_parts)
{
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
LOG_TEST(log, "Removing part {} from mutations (remove_part: {}, remove_covered_parts: {})", part_name, remove_part, remove_covered_parts);
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())
return;
@ -361,11 +402,17 @@ void ReplicatedMergeTreeQueue::removeCoveredPartsFromMutations(const String & pa
void ReplicatedMergeTreeQueue::addPartToMutations(const String & part_name)
{
LOG_TEST(log, "Adding part {} to mutations", part_name);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
/// Do not add special virtual parts to parts_to_do
if (part_info.isFakeDropRangePart())
{
LOG_TEST(log, "Part {} is fake drop range part, will not add it to mutations", part_name);
return;
}
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())

View File

@ -279,8 +279,8 @@ public:
/// Clears queue state
void clear();
/// Put a set of (already existing) parts in virtual_parts.
void initialize(const MergeTreeData::DataParts & parts);
/// Get set of parts from zookeeper
void initialize(zkutil::ZooKeeperPtr zookeeper);
/** Inserts an action to the end of the queue.
* To restore broken parts during operation.

View File

@ -174,6 +174,8 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
try
{
storage.queue.initialize(zookeeper);
storage.queue.load(zookeeper);
/// pullLogsToQueue() after we mark replica 'is_active' (and after we repair if it was lost);

View File

@ -1223,34 +1223,24 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
Coordination::Requests ops;
String has_replica = findReplicaHavingPart(part_name, true);
if (!has_replica.empty())
LOG_ERROR(log, "Removing locally missing part from ZooKeeper and queueing a fetch: {}", part_name);
time_t part_create_time = 0;
Coordination::ExistsResponse exists_resp = exists_futures[i].get();
if (exists_resp.error == Coordination::Error::ZOK)
{
LOG_ERROR(log, "Removing locally missing part from ZooKeeper and queueing a fetch: {}", part_name);
time_t part_create_time = 0;
Coordination::ExistsResponse exists_resp = exists_futures[i].get();
if (exists_resp.error == Coordination::Error::ZOK)
{
part_create_time = exists_resp.stat.ctime / 1000;
removePartFromZooKeeper(part_name, ops, exists_resp.stat.numChildren > 0);
}
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = part_name;
log_entry.create_time = part_create_time;
/// We assume that this occurs before the queue is loaded (queue.initialize).
ops.emplace_back(zkutil::makeCreateRequest(
fs::path(replica_path) / "queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
enqueue_futures.emplace_back(zookeeper->asyncMulti(ops));
}
else
{
LOG_ERROR(log, "Not found active replica having part {}", part_name);
enqueuePartForCheck(part_name);
part_create_time = exists_resp.stat.ctime / 1000;
removePartFromZooKeeper(part_name, ops, exists_resp.stat.numChildren > 0);
}
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
log_entry.new_part_name = part_name;
log_entry.create_time = part_create_time;
/// We assume that this occurs before the queue is loaded (queue.initialize).
ops.emplace_back(zkutil::makeCreateRequest(
fs::path(replica_path) / "queue/queue-", log_entry.toString(), zkutil::CreateMode::PersistentSequential));
enqueue_futures.emplace_back(zookeeper->asyncMulti(ops));
}
for (auto & future : enqueue_futures)
@ -3936,8 +3926,6 @@ void StorageReplicatedMergeTree::startup()
try
{
queue.initialize(getDataParts());
InterserverIOEndpointPtr data_parts_exchange_ptr = std::make_shared<DataPartsExchange::Service>(*this);
[[maybe_unused]] auto prev_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, data_parts_exchange_ptr);
assert(prev_ptr == nullptr);

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,75 @@
#!/usr/bin/env python3
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True, stay_alive=True)
node2 = cluster.add_instance('node2', with_zookeeper=True, stay_alive=True)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
finally:
cluster.shutdown()
def remove_part_from_disk(node, table, part_name):
part_path = node.query(
"SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(table, part_name)).strip()
if not part_path:
raise Exception("Part " + part_name + "doesn't exist")
node.exec_in_container(['bash', '-c', 'rm -r {p}/*'.format(p=part_path)], privileged=True)
def test_lost_part_during_startup(start_cluster):
for i, node in enumerate([node1, node2]):
node.query(f"CREATE TABLE test_lost (value UInt64) Engine = ReplicatedMergeTree('/clickhouse/test_lost', '{i + 1}') ORDER BY tuple()")
for i in range(4):
node2.query(f"INSERT INTO test_lost VALUES({i})")
node2.query("OPTIMIZE TABLE test_lost FINAL")
node1.query("SYSTEM SYNC REPLICA test_lost")
assert node2.query("SELECT sum(value) FROM test_lost") == str(sum(i for i in range(4))) + '\n'
assert node1.query("SELECT sum(value) FROM test_lost") == str(sum(i for i in range(4))) + '\n'
remove_part_from_disk(node2, "test_lost", "all_0_3_1")
remove_part_from_disk(node2, "test_lost", "all_1_1_0")
remove_part_from_disk(node2, "test_lost", "all_2_2_0")
node2.stop_clickhouse()
node1.stop_clickhouse()
node2.start_clickhouse()
for i in range(10):
try:
node2.query("INSERT INTO test_lost VALUES(7)")
node2.query("INSERT INTO test_lost VALUES(8)")
node2.query("INSERT INTO test_lost VALUES(9)")
node2.query("INSERT INTO test_lost VALUES(10)")
node2.query("INSERT INTO test_lost VALUES(11)")
node2.query("INSERT INTO test_lost VALUES(12)")
node2.query("OPTIMIZE TABLE test_lost FINAL")
break
except Exception as ex:
print("Exception", ex)
time.sleep(0.5)
node1.start_clickhouse()
node2.query("SYSTEM SYNC REPLICA test_lost")
node1.query("SYSTEM SYNC REPLICA test_lost")
assert node2.query("SELECT sum(value) FROM test_lost") == str(sum(i for i in range(4)) + sum(i for i in range(7, 13))) + '\n'
assert node1.query("SELECT sum(value) FROM test_lost") == str(sum(i for i in range(4)) + sum(i for i in range(7, 13))) + '\n'