diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp
index ff0b77f641b..ac81c72c656 100644
--- a/src/Storages/MergeTree/MergeTreeData.cpp
+++ b/src/Storages/MergeTree/MergeTreeData.cpp
@@ -1916,7 +1916,10 @@ try
++num_loaded_parts;
if (res.is_broken)
+ {
+ forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(res.part->name);
res.part->renameToDetached("broken-on-start"); /// detached parts must not have '_' in prefixes
+ }
else if (res.part->is_duplicate)
res.part->remove();
else
diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h
index 9aa07998745..29c4ccbeffa 100644
--- a/src/Storages/MergeTree/MergeTreeData.h
+++ b/src/Storages/MergeTree/MergeTreeData.h
@@ -646,6 +646,9 @@ public:
/// For active parts it's unsafe because this method modifies fields of part (rename) while some other thread can try to read it.
void forcefullyMovePartToDetachedAndRemoveFromMemory(const DataPartPtr & part, const String & prefix = "", bool restore_covered = false);
+ /// This method should not be here, but async loading of Outdated parts is implemented in MergeTreeData
+ virtual void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & /*part_name*/) {}
+
/// Outdate broken part, set remove time to zero (remove as fast as possible) and make clone in detached directory.
void outdateBrokenPartAndCloneToDetached(const DataPartPtr & part, const String & prefix);
diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp
index c5e61f8a244..a2953230d98 100644
--- a/src/Storages/StorageReplicatedMergeTree.cpp
+++ b/src/Storages/StorageReplicatedMergeTree.cpp
@@ -1226,12 +1226,53 @@ static time_t tryGetPartCreateTime(zkutil::ZooKeeperPtr & zookeeper, const Strin
return res;
}
+static void paranoidCheckForCoveredPartsInZooKeeperOnStart(const StorageReplicatedMergeTree * storage, const Strings & parts_in_zk,
+ MergeTreeDataFormatVersion format_version, Poco::Logger * log)
+{
+#ifdef ABORT_ON_LOGICAL_ERROR
+ constexpr bool paranoid_check_for_covered_parts_default = true;
+#else
+ constexpr bool paranoid_check_for_covered_parts_default = false;
+#endif
+
+ bool paranoid_check_for_covered_parts = Context::getGlobalContextInstance()->getConfigRef().getBool(
+ "replicated_merge_tree_paranoid_check_on_startup", paranoid_check_for_covered_parts_default);
+ if (!paranoid_check_for_covered_parts)
+ return;
+
+ ActiveDataPartSet active_set(format_version);
+ for (const auto & part_name : parts_in_zk)
+ active_set.add(part_name);
+
+ const auto disks = storage->getStoragePolicy()->getDisks();
+ auto path = storage->getRelativeDataPath();
+
+ for (const auto & part_name : parts_in_zk)
+ {
+ String covering_part = active_set.getContainingPart(part_name);
+ if (part_name == covering_part)
+ continue;
+
+ bool found = false;
+ for (const DiskPtr & disk : disks)
+ if (disk->exists(fs::path(path) / part_name))
+ found = true;
+
+ if (!found)
+ {
+ LOG_WARNING(log, "Part {} exists in ZooKeeper and covered by another part in ZooKeeper ({}), but doesn't exist on any disk. "
+ "It may cause false-positive 'part is lost forever' messages", part_name, covering_part);
+ chassert(false);
+ }
+ }
+}
void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
{
auto zookeeper = getZooKeeper();
Strings expected_parts_vec = zookeeper->getChildren(fs::path(replica_path) / "parts");
+ paranoidCheckForCoveredPartsInZooKeeperOnStart(this, expected_parts_vec, format_version, log);
/// Parts in ZK.
NameSet expected_parts(expected_parts_vec.begin(), expected_parts_vec.end());
@@ -6805,6 +6846,31 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZKImpl(zkutil::ZooKee
}
+void StorageReplicatedMergeTree::forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & part_name)
+{
+ /// An outdated part is broken and we are going to move it do detached/
+ /// But we need to remove it from ZooKeeper as well. Otherwise it will be considered as "lost forever".
+
+ /// Since the part is Outdated, it should be safe to remove it, but it's still dangerous.
+ /// It could became Outdated because it was merged/mutated (so we have a covering part) or because it was dropped.
+ /// But DROP [PART]ITION waits for all Outdated parts to be loaded, so it's not the case.
+
+ auto zookeeper = getZooKeeper();
+ String part_path = replica_path + "/parts/" + part_name;
+ if (!zookeeper->exists(part_path))
+ return;
+
+ auto part = getActiveContainingPart(part_name);
+ if (!part)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Outdated part {} is broken and going to be detached, "
+ "but there's no active covering part, so we are not sure that it can be safely removed from ZooKeeper "
+ "(path: {})", part_name, part_path);
+
+ LOG_WARNING(log, "Outdated part {} is broken and going to be detached, removing it from ZooKeeper. The part is covered by {}",
+ part_name, part->name);
+ removePartsFromZooKeeperWithRetries({part_name}, /* infinite retries */ 0);
+}
+
void StorageReplicatedMergeTree::removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries)
{
Strings part_names_to_remove;
@@ -7166,6 +7232,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
}
Coordination::Responses op_results;
+ DataPartsVector parts_holder;
try
{
@@ -7215,7 +7282,10 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
auto data_parts_lock = lockParts();
transaction.commit(&data_parts_lock);
if (replace)
+ {
+ parts_holder = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, drop_range.partition_id, &data_parts_lock);
removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, data_parts_lock);
+ }
}
PartLog::addNewParts(getContext(), PartLog::createPartLogEntries(dst_parts, watch.elapsed(), profile_events_scope.getSnapshot()));
@@ -7235,11 +7305,15 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
- cleanup_thread.wakeup();
-
lock2.reset();
lock1.reset();
+ /// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
+ queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
+ parts_holder.clear();
+ cleanup_thread.wakeup();
+
+
waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
return;
@@ -7405,6 +7479,8 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
Coordination::Responses op_results;
+ /// We should hold replaced parts until we actually create DROP_RANGE in ZooKeeper
+ DataPartsVector parts_holder;
try
{
Coordination::Requests ops;
@@ -7439,6 +7515,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
else
zkutil::KeeperMultiException::check(code, ops, op_results);
+ parts_holder = getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, drop_range.partition_id, &src_data_parts_lock);
removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, src_data_parts_lock);
transaction.commit(&src_data_parts_lock);
}
@@ -7461,7 +7538,6 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
for (auto & lock : ephemeral_locks)
lock.assumeUnlocked();
- cleanup_thread.wakeup();
lock2.reset();
dest_table_storage->waitForLogEntryToBeProcessedIfNecessary(entry, query_context);
@@ -7480,6 +7556,12 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
entry_delete.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
lock1.reset();
+
+ /// We need to pull the DROP_RANGE before cleaning the replaced parts (otherwise CHeckThread may decide that parts are lost)
+ queue.pullLogsToQueue(getZooKeeperAndAssertNotReadonly(), {}, ReplicatedMergeTreeQueue::SYNC);
+ parts_holder.clear();
+ cleanup_thread.wakeup();
+
waitForLogEntryToBeProcessedIfNecessary(entry_delete, query_context);
/// Cleaning possibly stored information about parts from /quorum/last_part node in ZooKeeper.
diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h
index c0d9e36a8a7..2d736a4e015 100644
--- a/src/Storages/StorageReplicatedMergeTree.h
+++ b/src/Storages/StorageReplicatedMergeTree.h
@@ -579,6 +579,8 @@ private:
void removePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
void removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries = 5);
+ void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & part_name) override;
+
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void removePartAndEnqueueFetch(const String & part_name, bool storage_init);
diff --git a/tests/integration/test_lost_part_during_startup/configs/compat.xml b/tests/integration/test_lost_part_during_startup/configs/compat.xml
new file mode 100644
index 00000000000..c919eaf6146
--- /dev/null
+++ b/tests/integration/test_lost_part_during_startup/configs/compat.xml
@@ -0,0 +1,4 @@
+
+ 0
+ 0
+
\ No newline at end of file
diff --git a/tests/integration/test_lost_part_during_startup/test.py b/tests/integration/test_lost_part_during_startup/test.py
index de21d64c8aa..a013ec5d48d 100644
--- a/tests/integration/test_lost_part_during_startup/test.py
+++ b/tests/integration/test_lost_part_during_startup/test.py
@@ -6,7 +6,14 @@ from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", with_zookeeper=True, stay_alive=True)
-node2 = cluster.add_instance("node2", with_zookeeper=True, stay_alive=True)
+node2 = cluster.add_instance(
+ "node2",
+ with_zookeeper=True,
+ stay_alive=True,
+ main_configs=[
+ "configs/compat.xml",
+ ],
+)
@pytest.fixture(scope="module")
diff --git a/tests/integration/test_merge_tree_load_parts/configs/compat.xml b/tests/integration/test_merge_tree_load_parts/configs/compat.xml
new file mode 100644
index 00000000000..c919eaf6146
--- /dev/null
+++ b/tests/integration/test_merge_tree_load_parts/configs/compat.xml
@@ -0,0 +1,4 @@
+
+ 0
+ 0
+
\ No newline at end of file
diff --git a/tests/integration/test_merge_tree_load_parts/test.py b/tests/integration/test_merge_tree_load_parts/test.py
index 118c31ea864..968225887ad 100644
--- a/tests/integration/test_merge_tree_load_parts/test.py
+++ b/tests/integration/test_merge_tree_load_parts/test.py
@@ -9,7 +9,7 @@ cluster = helpers.cluster.ClickHouseCluster(__file__)
node1 = cluster.add_instance(
"node1",
- main_configs=["configs/fast_background_pool.xml"],
+ main_configs=["configs/fast_background_pool.xml", "configs/compat.xml"],
with_zookeeper=True,
stay_alive=True,
)
diff --git a/tests/integration/test_ttl_replicated/configs/compat.xml b/tests/integration/test_ttl_replicated/configs/compat.xml
new file mode 100644
index 00000000000..c919eaf6146
--- /dev/null
+++ b/tests/integration/test_ttl_replicated/configs/compat.xml
@@ -0,0 +1,4 @@
+
+ 0
+ 0
+
\ No newline at end of file
diff --git a/tests/integration/test_ttl_replicated/test.py b/tests/integration/test_ttl_replicated/test.py
index 39d66d857ff..05eab7a952f 100644
--- a/tests/integration/test_ttl_replicated/test.py
+++ b/tests/integration/test_ttl_replicated/test.py
@@ -19,6 +19,9 @@ node4 = cluster.add_instance(
tag="20.12.4.5",
stay_alive=True,
with_installed_binary=True,
+ main_configs=[
+ "configs/compat.xml",
+ ],
)
node5 = cluster.add_instance(
@@ -28,6 +31,9 @@ node5 = cluster.add_instance(
tag="20.12.4.5",
stay_alive=True,
with_installed_binary=True,
+ main_configs=[
+ "configs/compat.xml",
+ ],
)
node6 = cluster.add_instance(
"node6",
@@ -36,6 +42,9 @@ node6 = cluster.add_instance(
tag="20.12.4.5",
stay_alive=True,
with_installed_binary=True,
+ main_configs=[
+ "configs/compat.xml",
+ ],
)
@@ -517,7 +526,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_delete_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND
- SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100, remove_empty_parts=0
+ SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
""".format(
suff=num_run, replica=node.name
)
@@ -529,7 +538,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_group_by_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND GROUP BY id SET val = sum(val)
- SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100, remove_empty_parts=0
+ SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
""".format(
suff=num_run, replica=node.name
)
@@ -541,7 +550,7 @@ def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_where_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND DELETE WHERE id % 2 = 1
- SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100, remove_empty_parts=0
+ SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
""".format(
suff=num_run, replica=node.name
)
diff --git a/tests/integration/test_version_update_after_mutation/configs/compat.xml b/tests/integration/test_version_update_after_mutation/configs/compat.xml
new file mode 100644
index 00000000000..c919eaf6146
--- /dev/null
+++ b/tests/integration/test_version_update_after_mutation/configs/compat.xml
@@ -0,0 +1,4 @@
+
+ 0
+ 0
+
\ No newline at end of file
diff --git a/tests/integration/test_version_update_after_mutation/test.py b/tests/integration/test_version_update_after_mutation/test.py
index 17b70ba7c84..c80205d48c1 100644
--- a/tests/integration/test_version_update_after_mutation/test.py
+++ b/tests/integration/test_version_update_after_mutation/test.py
@@ -13,6 +13,9 @@ node1 = cluster.add_instance(
tag="20.4.9.110",
with_installed_binary=True,
stay_alive=True,
+ main_configs=[
+ "configs/compat.xml",
+ ],
)
node2 = cluster.add_instance(
"node2",
@@ -21,6 +24,9 @@ node2 = cluster.add_instance(
tag="20.4.9.110",
with_installed_binary=True,
stay_alive=True,
+ main_configs=[
+ "configs/compat.xml",
+ ],
)
node3 = cluster.add_instance(
"node3",
@@ -29,6 +35,9 @@ node3 = cluster.add_instance(
tag="20.4.9.110",
with_installed_binary=True,
stay_alive=True,
+ main_configs=[
+ "configs/compat.xml",
+ ],
)
diff --git a/tests/queries/0_stateless/02444_async_broken_outdated_part_loading.reference b/tests/queries/0_stateless/02444_async_broken_outdated_part_loading.reference
new file mode 100644
index 00000000000..389b1b4ec25
--- /dev/null
+++ b/tests/queries/0_stateless/02444_async_broken_outdated_part_loading.reference
@@ -0,0 +1,5 @@
+1 1 all_0_1_1
+1 2 all_0_1_1
+0
+2 1 all_0_1_1
+2 2 all_0_1_1
diff --git a/tests/queries/0_stateless/02444_async_broken_outdated_part_loading.sh b/tests/queries/0_stateless/02444_async_broken_outdated_part_loading.sh
new file mode 100755
index 00000000000..d24c6afcef3
--- /dev/null
+++ b/tests/queries/0_stateless/02444_async_broken_outdated_part_loading.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+# Tags: long, zookeeper
+
+CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
+# shellcheck source=../shell_config.sh
+. "$CURDIR"/../shell_config.sh
+
+$CLICKHOUSE_CLIENT -q "drop table if exists rmt sync;"
+$CLICKHOUSE_CLIENT -q "create table rmt (n int) engine=ReplicatedMergeTree('/test/02444/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/rmt', '1') order by n"
+
+$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into rmt values (1);"
+$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into rmt values (2);"
+
+$CLICKHOUSE_CLIENT -q "system sync replica rmt pull;"
+$CLICKHOUSE_CLIENT --optimize_throw_if_noop=1 -q "optimize table rmt final"
+$CLICKHOUSE_CLIENT -q "system sync replica rmt;"
+$CLICKHOUSE_CLIENT -q "select 1, *, _part from rmt order by n;"
+
+path=$($CLICKHOUSE_CLIENT -q "select path from system.parts where database='$CLICKHOUSE_DATABASE' and table='rmt' and name='all_1_1_0'")
+# ensure that path is absolute before removing
+$CLICKHOUSE_CLIENT -q "select throwIf(substring('$path', 1, 1) != '/', 'Path is relative: $path')" || exit
+rm -f "$path/*.bin"
+
+$CLICKHOUSE_CLIENT -q "detach table rmt sync;"
+$CLICKHOUSE_CLIENT -q "attach table rmt;"
+$CLICKHOUSE_CLIENT -q "select 2, *, _part from rmt order by n;"
+
+$CLICKHOUSE_CLIENT -q "truncate table rmt;"
+
+$CLICKHOUSE_CLIENT -q "detach table rmt sync;"
+$CLICKHOUSE_CLIENT -q "attach table rmt;"
+
+$CLICKHOUSE_CLIENT -q "SELECT table, lost_part_count FROM system.replicas WHERE database=currentDatabase() AND lost_part_count!=0";
+
+$CLICKHOUSE_CLIENT -q "drop table rmt sync;"