Merge pull request #51006 from ClickHouse/followup_50448

Follow-up to #50448
This commit is contained in:
Alexander Tokmakov 2023-06-16 15:32:12 +03:00 committed by GitHub
commit 9260a1bf2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 179 additions and 8 deletions

View File

@ -1916,7 +1916,10 @@ try
++num_loaded_parts;
if (res.is_broken)
{
forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(res.part->name);
res.part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
}
else if (res.part->is_duplicate)
res.part->remove();
else

View File

@ -646,6 +646,9 @@ public:
/// For active parts it's unsafe because this method modifies fields of part (rename) while some other thread can try to read it.
void forcefullyMovePartToDetachedAndRemoveFromMemory(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);
/// This method should not be here, but async loading of Outdated parts is implemented in MergeTreeData
virtual void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & /*part_name*/) {}
/// Outdate broken part, set remove time to zero (remove as fast as possible) and make clone in detached directory.
void outdateBrokenPartAndCloneToDetached(const DataPartPtr & part, const String & prefix);

View File

@ -1226,12 +1226,53 @@ static time_t tryGetPartCreateTime(zkutil::ZooKeeperPtr & zookeeper, const Strin
return res;
}
static void paranoidCheckForCoveredPartsInZooKeeperOnStart(const StorageReplicatedMergeTree * storage, const Strings & parts_in_zk,
MergeTreeDataFormatVersion format_version, Poco::Logger * log)
{
#ifdef ABORT_ON_LOGICAL_ERROR
constexpr bool paranoid_check_for_covered_parts_default = true;
#else
constexpr bool paranoid_check_for_covered_parts_default = false;
#endif
bool paranoid_check_for_covered_parts = Context::getGlobalContextInstance()->getConfigRef().getBool(
"replicated_merge_tree_paranoid_check_on_startup", paranoid_check_for_covered_parts_default);
if (!paranoid_check_for_covered_parts)
return;
ActiveDataPartSet active_set(format_version);
for (const auto & part_name : parts_in_zk)
active_set.add(part_name);
const auto disks = storage->getStoragePolicy()->getDisks();
auto path = storage->getRelativeDataPath();
for (const auto & part_name : parts_in_zk)
{
String covering_part = active_set.getContainingPart(part_name);
if (part_name == covering_part)
continue;
bool found = false;
for (const DiskPtr & disk : disks)
if (disk->exists(fs::path(path) / part_name))
found = true;
if (!found)
{
LOG_WARNING(log, "Part {} exists in ZooKeeper and covered by another part in ZooKeeper ({}), but doesn't exist on any disk. "
"It may cause false-positive 'part is lost forever' messages", part_name, covering_part);
chassert(false);
}
}
}
void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
{
auto zookeeper = getZooKeeper();
Strings expected_parts_vec = zookeeper->getChildren(fs::path(replica_path) / "parts");
paranoidCheckForCoveredPartsInZooKeeperOnStart(this, expected_parts_vec, format_version, log);
/// Parts in ZK.
NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());
@ -6805,6 +6846,31 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKee
}
void StorageReplicatedMergeTree::forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & part_name)
{
/// An outdated part is broken and we are going to move it do detached/
/// But we need to remove it from ZooKeeper as well. Otherwise it will be considered as "lost forever".
/// Since the part is Outdated, it should be safe to remove it, but it's still dangerous.
/// It could became Outdated because it was merged/mutated (so we have a covering part) or because it was dropped.
/// But DROP [PART]ITION waits for all Outdated parts to be loaded, so it's not the case.
auto zookeeper = getZooKeeper();
String part_path = replica_path + "/parts/" + part_name;
if (!zookeeper->exists(part_path))
return;
auto part = getActiveContainingPart(part_name);
if (!part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Outdated part {} is broken and going to be detached, "
"but there's no active covering part, so we are not sure that it can be safely removed from ZooKeeper "
"(path: {})", part_name, part_path);
LOG_WARNING(log, "Outdated part {} is broken and going to be detached, removing it from ZooKeeper. The part is covered by {}",
part_name, part->name);
removePartsFromZooKeeperWithRetries({part_name}, /* infinite retries */ 0);
}
void StorageReplicatedMergeTree::removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries)
{
Strings part_names_to_remove;
@ -7166,6 +7232,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
}
Coordination::Responses op_results;
DataPartsVector parts_holder;
try
{
@ -7215,7 +7282,10 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
auto data_parts_lock = lockParts();
transaction.commit(&data_parts_lock);
if (replace)
{
parts_holder = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, drop_range.partition_id, &data_parts_lock);
removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, data_parts_lock);
}
}
PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
@ -7235,11 +7305,15 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
cleanup_thread.wakeup();
lock2.reset();
lock1.reset();
/// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
parts_holder.clear();
cleanup_thread.wakeup();
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
return;
@ -7405,6 +7479,8 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
Coordination::Responses op_results;
/// We should hold replaced parts until we actually create DROP_RANGE in ZooKeeper
DataPartsVector parts_holder;
try
{
Coordination::Requests ops;
@ -7439,6 +7515,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
else
zkutil::KeeperMultiException::check(code, ops, op_results);
parts_holder = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, drop_range.partition_id, &src_data_parts_lock);
removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, src_data_parts_lock);
transaction.commit(&src_data_parts_lock);
}
@ -7461,7 +7538,6 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
cleanup_thread.wakeup();
lock2.reset();
dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
@ -7480,6 +7556,12 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
lock1.reset();
/// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
parts_holder.clear();
cleanup_thread.wakeup();
waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context);
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.

View File

@ -579,6 +579,8 @@ private:
void removePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
void removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries = 5);
void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & part_name) override;
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void removePartAndEnqueueFetch(const String & part_name, bool storage_init);

View File

@ -0,0 +1,4 @@
<clickhouse>
<replicated_merge_tree_paranoid_check_on_drop_range>0</replicated_merge_tree_paranoid_check_on_drop_range>
<replicated_merge_tree_paranoid_check_on_startup>0</replicated_merge_tree_paranoid_check_on_startup>
</clickhouse>

View File

@ -6,7 +6,14 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True, stay_alive=True)
node2 = cluster.add_instance("node2", with_zookeeper=True, stay_alive=True)
node2 = cluster.add_instance(
"node2",
with_zookeeper=True,
stay_alive=True,
main_configs=[
"configs/compat.xml",
],
)
@pytest.fixture(scope="module")

View File

@ -0,0 +1,4 @@
<clickhouse>
<replicated_merge_tree_paranoid_check_on_drop_range>0</replicated_merge_tree_paranoid_check_on_drop_range>
<replicated_merge_tree_paranoid_check_on_startup>0</replicated_merge_tree_paranoid_check_on_startup>
</clickhouse>

View File

@ -9,7 +9,7 @@ cluster = helpers.cluster.ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
main_configs=["configs/fast_background_pool.xml"],
main_configs=["configs/fast_background_pool.xml", "configs/compat.xml"],
with_zookeeper=True,
stay_alive=True,
)

View File

@ -0,0 +1,4 @@
<clickhouse>
<replicated_merge_tree_paranoid_check_on_drop_range>0</replicated_merge_tree_paranoid_check_on_drop_range>
<replicated_merge_tree_paranoid_check_on_startup>0</replicated_merge_tree_paranoid_check_on_startup>
</clickhouse>

View File

@ -19,6 +19,9 @@ node4 = cluster.add_instance(
tag="20.12.4.5",
stay_alive=True,
with_installed_binary=True,
main_configs=[
"configs/compat.xml",
],
)
node5 = cluster.add_instance(
@ -28,6 +31,9 @@ node5 = cluster.add_instance(
tag="20.12.4.5",
stay_alive=True,
with_installed_binary=True,
main_configs=[
"configs/compat.xml",
],
)
node6 = cluster.add_instance(
"node6",
@ -36,6 +42,9 @@ node6 = cluster.add_instance(
tag="20.12.4.5",
stay_alive=True,
with_installed_binary=True,
main_configs=[
"configs/compat.xml",
],
)
@ -517,7 +526,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_delete_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100, remove_empty_parts=0
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
""".format(
suff=num_run, replica=node.name
)
@ -529,7 +538,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_group_by_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND GROUP BY id SET val = sum(val)
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100, remove_empty_parts=0
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
""".format(
suff=num_run, replica=node.name
)
@ -541,7 +550,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_where_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND DELETE WHERE id % 2 = 1
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100, remove_empty_parts=0
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
""".format(
suff=num_run, replica=node.name
)

View File

@ -0,0 +1,4 @@
<clickhouse>
<replicated_merge_tree_paranoid_check_on_drop_range>0</replicated_merge_tree_paranoid_check_on_drop_range>
<replicated_merge_tree_paranoid_check_on_startup>0</replicated_merge_tree_paranoid_check_on_startup>
</clickhouse>

View File

@ -13,6 +13,9 @@ node1 = cluster.add_instance(
tag="20.4.9.110",
with_installed_binary=True,
stay_alive=True,
main_configs=[
"configs/compat.xml",
],
)
node2 = cluster.add_instance(
"node2",
@ -21,6 +24,9 @@ node2 = cluster.add_instance(
tag="20.4.9.110",
with_installed_binary=True,
stay_alive=True,
main_configs=[
"configs/compat.xml",
],
)
node3 = cluster.add_instance(
"node3",
@ -29,6 +35,9 @@ node3 = cluster.add_instance(
tag="20.4.9.110",
with_installed_binary=True,
stay_alive=True,
main_configs=[
"configs/compat.xml",
],
)

View File

@ -0,0 +1,5 @@
1 1 all_0_1_1
1 2 all_0_1_1
0
2 1 all_0_1_1
2 2 all_0_1_1

View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
# Tags: long, zookeeper
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists rmt sync;"
$CLICKHOUSE_CLIENT -q "create table rmt (n int) engine=ReplicatedMergeTree('/test/02444/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rmt', '1') order by n"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into rmt values (1);"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into rmt values (2);"
$CLICKHOUSE_CLIENT -q "system sync replica rmt pull;"
$CLICKHOUSE_CLIENT --optimize_throw_if_noop=1 -q "optimize table rmt final"
$CLICKHOUSE_CLIENT -q "system sync replica rmt;"
$CLICKHOUSE_CLIENT -q "select 1, *, _part from rmt order by n;"
path=$($CLICKHOUSE_CLIENT -q "select path from system.parts where database='$CLICKHOUSE_DATABASE' and table='rmt' and name='all_1_1_0'")
# ensure that path is absolute before removing
$CLICKHOUSE_CLIENT -q "select throwIf(substring('$path', 1, 1) != '/', 'Path is relative: $path')" || exit
rm -f "$path/*.bin"
$CLICKHOUSE_CLIENT -q "detach table rmt sync;"
$CLICKHOUSE_CLIENT -q "attach table rmt;"
$CLICKHOUSE_CLIENT -q "select 2, *, _part from rmt order by n;"
$CLICKHOUSE_CLIENT -q "truncate table rmt;"
$CLICKHOUSE_CLIENT -q "detach table rmt sync;"
$CLICKHOUSE_CLIENT -q "attach table rmt;"
$CLICKHOUSE_CLIENT -q "SELECT table, lost_part_count FROM system.replicas WHERE database=currentDatabase() AND lost_part_count!=0";
$CLICKHOUSE_CLIENT -q "drop table rmt sync;"