From f8b3987d5292ed1e2acfc7cab2b7bfcd80f1aee1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Tue, 25 Jun 2024 03:26:17 +0300 Subject: [PATCH 1/9] Delete attaching prefix for deduplicated parts --- .../MergeTree/ReplicatedMergeTreeSink.cpp | 9 ++- .../__init__.py | 0 .../test.py | 61 +++++++++++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_deduplicated_attached_part_rename/__init__.py create mode 100644 tests/integration/test_deduplicated_attached_part_rename/test.py diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 4b4f4c33e7d..4190e3cce5e 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -561,8 +561,15 @@ bool ReplicatedMergeTreeSinkImpl::writeExistingPart(MergeTreeData::Mutabl String block_id = deduplicate ? fmt::format("{}_{}", part->info.partition_id, part->checksums.getTotalChecksumHex()) : ""; bool deduplicated = commitPart(zookeeper, part, block_id, replicas_num).second; + int error = 0; /// Set a special error code if the block is duplicate - int error = (deduplicate && deduplicated) ? ErrorCodes::INSERT_WAS_DEDUPLICATED : 0; + /// And remove attaching_ prefix + if (deduplicate && deduplicated) + { + error = ErrorCodes::INSERT_WAS_DEDUPLICATED; + fs::path new_relative_path = fs::path("detached") / part->getNewName(part->info); + part->renameTo(new_relative_path, false); + } PartLog::addNewPart(storage.getContext(), PartLog::PartLogEntry(part, watch.elapsed(), profile_events_scope.getSnapshot()), ExecutionStatus(error)); return deduplicated; } diff --git a/tests/integration/test_deduplicated_attached_part_rename/__init__.py b/tests/integration/test_deduplicated_attached_part_rename/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_deduplicated_attached_part_rename/test.py b/tests/integration/test_deduplicated_attached_part_rename/test.py new file mode 100644 index 00000000000..362b2bad37a --- /dev/null +++ b/tests/integration/test_deduplicated_attached_part_rename/test.py @@ -0,0 +1,61 @@ +import pytest +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +ch1 = cluster.add_instance( + "ch1", + with_zookeeper=True, + macros={"replica": "node1"}, + stay_alive=True, +) + +database_name = "dedup_attach" + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def q(query): + return ch1.query(database=database_name, sql=query) + + +def test_deduplicated_attached_part_renamed_after_attach(started_cluster): + ch1.query(f"CREATE DATABASE {database_name}") + + q("CREATE TABLE dedup (id UInt32) ENGINE=ReplicatedMergeTree('/clickhouse/tables/dedup_attach/dedup/s1', 'r1') ORDER BY id;") + q("INSERT INTO dedup VALUES (1),(2),(3);") + + table_data_path = q("SELECT data_paths FROM system.tables WHERE database=currentDatabase() AND table='dedup'").strip("'[]\n") + + ch1.exec_in_container( + [ + "bash", + "-c", + f"cp -r {table_data_path}/all_0_0_0 {table_data_path}/detached/all_0_0_0", + ] + ) + # Part is attached as all_1_1_0 + q("ALTER TABLE dedup ATTACH PART 'all_0_0_0'") + + assert 2 == int(q(f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'").strip()) + + ch1.exec_in_container( + [ + "bash", + "-c", + f"cp -r {table_data_path}/all_1_1_0 {table_data_path}/detached/all_1_1_0", + ] + ) + # Part is deduplicated and not attached + q("ALTER TABLE dedup ATTACH PART 'all_1_1_0'") + + assert 2 == int(q(f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'").strip()) + assert 1 == int(q(f"SELECT count() FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'").strip()) + # Check that it is not 'attaching_all_1_1_0' + assert "all_1_1_0" == q(f"SELECT name FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'").strip() From 6601ded4a1332548ae4cfe35c7ba8f276214d153 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Wed, 10 Jul 2024 23:02:11 +0300 Subject: [PATCH 2/9] Fix black --- .../test.py | 34 +++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/tests/integration/test_deduplicated_attached_part_rename/test.py b/tests/integration/test_deduplicated_attached_part_rename/test.py index 362b2bad37a..2b7ab0934d1 100644 --- a/tests/integration/test_deduplicated_attached_part_rename/test.py +++ b/tests/integration/test_deduplicated_attached_part_rename/test.py @@ -11,6 +11,7 @@ ch1 = cluster.add_instance( database_name = "dedup_attach" + @pytest.fixture(scope="module") def started_cluster(): try: @@ -28,10 +29,14 @@ def q(query): def test_deduplicated_attached_part_renamed_after_attach(started_cluster): ch1.query(f"CREATE DATABASE {database_name}") - q("CREATE TABLE dedup (id UInt32) ENGINE=ReplicatedMergeTree('/clickhouse/tables/dedup_attach/dedup/s1', 'r1') ORDER BY id;") + q( + "CREATE TABLE dedup (id UInt32) ENGINE=ReplicatedMergeTree('/clickhouse/tables/dedup_attach/dedup/s1', 'r1') ORDER BY id;" + ) q("INSERT INTO dedup VALUES (1),(2),(3);") - table_data_path = q("SELECT data_paths FROM system.tables WHERE database=currentDatabase() AND table='dedup'").strip("'[]\n") + table_data_path = q( + "SELECT data_paths FROM system.tables WHERE database=currentDatabase() AND table='dedup'" + ).strip("'[]\n") ch1.exec_in_container( [ @@ -43,7 +48,11 @@ def test_deduplicated_attached_part_renamed_after_attach(started_cluster): # Part is attached as all_1_1_0 q("ALTER TABLE dedup ATTACH PART 'all_0_0_0'") - assert 2 == int(q(f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'").strip()) + assert 2 == int( + q( + f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'" + ).strip() + ) ch1.exec_in_container( [ @@ -55,7 +64,20 @@ def test_deduplicated_attached_part_renamed_after_attach(started_cluster): # Part is deduplicated and not attached q("ALTER TABLE dedup ATTACH PART 'all_1_1_0'") - assert 2 == int(q(f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'").strip()) - assert 1 == int(q(f"SELECT count() FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'").strip()) + assert 2 == int( + q( + f"SELECT count() FROM system.parts WHERE database='{database_name}' AND table = 'dedup'" + ).strip() + ) + assert 1 == int( + q( + f"SELECT count() FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'" + ).strip() + ) # Check that it is not 'attaching_all_1_1_0' - assert "all_1_1_0" == q(f"SELECT name FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'").strip() + assert ( + "all_1_1_0" + == q( + f"SELECT name FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'" + ).strip() + ) From 384617cfdf26539d5478120caa25f7e57a28d6b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Fri, 6 Sep 2024 18:12:16 +0300 Subject: [PATCH 3/9] Check for unexpected relative path --- src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index cf5537452f3..68aa370959c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -582,6 +582,8 @@ bool ReplicatedMergeTreeSinkImpl::writeExistingPart(MergeTreeData::Mutabl if (deduplicate && deduplicated) { error = ErrorCodes::INSERT_WAS_DEDUPLICATED; + if (!startsWith(part->getDataPartStorage().getRelativePath(), "detached/attaching_")) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected relative path for a part: {}", part->getDataPartStorage().getRelativePath()); fs::path new_relative_path = fs::path("detached") / part->getNewName(part->info); part->renameTo(new_relative_path, false); } From 35e263a4205afa405a5f819fdf91e102ad0cd088 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Fri, 6 Sep 2024 18:12:44 +0300 Subject: [PATCH 4/9] Cleanup for flaky tests --- .../test_deduplicated_attached_part_rename/test.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/integration/test_deduplicated_attached_part_rename/test.py b/tests/integration/test_deduplicated_attached_part_rename/test.py index 2b7ab0934d1..7afd85c62dc 100644 --- a/tests/integration/test_deduplicated_attached_part_rename/test.py +++ b/tests/integration/test_deduplicated_attached_part_rename/test.py @@ -81,3 +81,7 @@ def test_deduplicated_attached_part_renamed_after_attach(started_cluster): f"SELECT name FROM system.detached_parts WHERE database='{database_name}' AND table = 'dedup'" ).strip() ) + + q("DROP TABLE dedup") + q("SYSTEM DROP REPLICA 'r1' FROM ZKPATH '/clickhouse/tables/dedup_attach/dedup/s1'") + ch1.query(f"DROP DATABASE {database_name}") From b232205b4407e185b3a17bc261c9fd977d0c0e11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Sun, 8 Sep 2024 22:22:06 +0300 Subject: [PATCH 5/9] Fix unexpected part path check --- src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 68aa370959c..fb2bc2fada7 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -582,7 +582,7 @@ bool ReplicatedMergeTreeSinkImpl::writeExistingPart(MergeTreeData::Mutabl if (deduplicate && deduplicated) { error = ErrorCodes::INSERT_WAS_DEDUPLICATED; - if (!startsWith(part->getDataPartStorage().getRelativePath(), "detached/attaching_")) + if (!endsWith(part->getDataPartStorage().getRelativePath(), "detached/attaching_" + part->name + "/")) throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected relative path for a part: {}", part->getDataPartStorage().getRelativePath()); fs::path new_relative_path = fs::path("detached") / part->getNewName(part->info); part->renameTo(new_relative_path, false); From 926e28e35cb1d17d0bb66c06b613671d3eeeeac2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Thu, 19 Sep 2024 02:52:23 +0300 Subject: [PATCH 6/9] Rollback part rename if it was deduplicated --- .../MergeTree/ReplicatedMergeTreeSink.cpp | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index fb2bc2fada7..98c46edda25 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -583,7 +583,7 @@ bool ReplicatedMergeTreeSinkImpl::writeExistingPart(MergeTreeData::Mutabl { error = ErrorCodes::INSERT_WAS_DEDUPLICATED; if (!endsWith(part->getDataPartStorage().getRelativePath(), "detached/attaching_" + part->name + "/")) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected relative path for a part: {}", part->getDataPartStorage().getRelativePath()); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected relative path for a deduplicated part: {}", part->getDataPartStorage().getRelativePath()); fs::path new_relative_path = fs::path("detached") / part->getNewName(part->info); part->renameTo(new_relative_path, false); } @@ -1013,16 +1013,6 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: } } - transaction.rollback(); - - if (!Coordination::isUserError(multi_code)) - throw Exception( - ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR, - "Unexpected ZooKeeper error while adding block {} with ID '{}': {}", - block_number, - toString(block_id), - multi_code); - auto failed_op_idx = zkutil::getFailedOpIndex(multi_code, responses); String failed_op_path = ops[failed_op_idx]->getPath(); @@ -1032,6 +1022,10 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: LOG_INFO(log, "Block with ID {} already exists (it was just appeared) for part {}. Ignore it.", toString(block_id), part->name); + transaction.rollbackPartsToTemporaryState(); + part->is_temp = true; + part->renameTo(temporary_part_relative_path, false); + if constexpr (async_insert) { retry_context.conflict_block_ids = std::vector({failed_op_path}); @@ -1043,6 +1037,16 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: return CommitRetryContext::DUPLICATED_PART; } + transaction.rollback(); // Not in working set (data_parts) + + if (!Coordination::isUserError(multi_code)) + throw Exception( + ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR, + "Unexpected ZooKeeper error while adding block {} with ID '{}': {}", + block_number, + toString(block_id), + multi_code); + if (multi_code == Coordination::Error::ZNONODE && failed_op_idx == block_unlock_op_idx) throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Insert query (for block {}) was canceled by concurrent ALTER PARTITION or TRUNCATE", From f570e8e2c0715001ac0f1633c898699700068edb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Thu, 19 Sep 2024 13:34:51 +0300 Subject: [PATCH 7/9] Remove debug comment --- src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 98c46edda25..3f5c70adb64 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -1037,7 +1037,7 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: return CommitRetryContext::DUPLICATED_PART; } - transaction.rollback(); // Not in working set (data_parts) + transaction.rollback(); if (!Coordination::isUserError(multi_code)) throw Exception( From 93620886f689d3b6ed59a9a36539c2902158e6b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Sun, 6 Oct 2024 22:16:06 +0300 Subject: [PATCH 8/9] Revert part actual name to pass the check --- src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 3f5c70adb64..4a994bc38e2 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -844,8 +844,9 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: } } - /// Save the current temporary path in case we need to revert the change to retry (ZK connection loss) + /// Save the current temporary path and name in case we need to revert the change to retry (ZK connection loss) or in case part is deduplicated. const String temporary_part_relative_path = part->getDataPartStorage().getPartDirectory(); + const String initial_part_name = part->name; /// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem. /// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned. @@ -1024,6 +1025,7 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: transaction.rollbackPartsToTemporaryState(); part->is_temp = true; + part->setName(initial_part_name); part->renameTo(temporary_part_relative_path, false); if constexpr (async_insert) From 91931b5b3cccabdc94231a8a07ad4d2e8de8d8b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9A=D0=B8=D1=80=D0=B8=D0=BB=D0=BB=20=D0=93=D0=B0=D1=80?= =?UTF-8?q?=D0=B1=D0=B0=D1=80?= Date: Sun, 6 Oct 2024 22:56:48 +0300 Subject: [PATCH 9/9] Fix style --- tests/integration/test_deduplicated_attached_part_rename/test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_deduplicated_attached_part_rename/test.py b/tests/integration/test_deduplicated_attached_part_rename/test.py index 7afd85c62dc..02fa2c9d4a3 100644 --- a/tests/integration/test_deduplicated_attached_part_rename/test.py +++ b/tests/integration/test_deduplicated_attached_part_rename/test.py @@ -1,4 +1,5 @@ import pytest + from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__)