From 42670a46d49719efc5caae1ca23b6d95360fd02d Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 17:00:46 +0100 Subject: [PATCH 1/4] impl --- .../QueryPlan/ReadFromMergeTree.cpp | 27 ------- .../__init__.py | 0 .../configs/remote_servers.xml | 33 +++++++++ .../test.py | 74 +++++++++++++++++++ 4 files changed, 107 insertions(+), 27 deletions(-) create mode 100644 tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py create mode 100644 tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml create mode 100644 tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 218f0a61a48..0b96cc57274 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -2009,33 +2009,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons { auto result = getAnalysisResult(); - if (is_parallel_reading_from_replicas && context->canUseParallelReplicasOnInitiator() - && context->getSettingsRef().parallel_replicas_local_plan) - { - CoordinationMode mode = CoordinationMode::Default; - switch (result.read_type) - { - case ReadFromMergeTree::ReadType::Default: - mode = CoordinationMode::Default; - break; - case ReadFromMergeTree::ReadType::InOrder: - mode = CoordinationMode::WithOrder; - break; - case ReadFromMergeTree::ReadType::InReverseOrder: - mode = CoordinationMode::ReverseOrder; - break; - case ReadFromMergeTree::ReadType::ParallelReplicas: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Read type can't be ParallelReplicas on initiator"); - } - - chassert(number_of_current_replica.has_value()); - chassert(all_ranges_callback.has_value()); - - /// initialize working set from local replica - all_ranges_callback.value()( - InitialAllRangesAnnouncement(mode, result.parts_with_ranges.getDescriptions(), number_of_current_replica.value())); - } - if (enable_remove_parts_from_snapshot_optimization) { /// Do not keep data parts in snapshot. diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml b/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml new file mode 100644 index 00000000000..734acf5f363 --- /dev/null +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml @@ -0,0 +1,33 @@ + + + + + false + + node0 + 9000 + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + node4 + 9000 + + + node5 + 9000 + + + + + diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py new file mode 100644 index 00000000000..2b3b94f95dc --- /dev/null +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -0,0 +1,74 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +nodes = [ + cluster.add_instance( + f"node{num}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True + ) + for num in range(6) +] + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def _create_tables(table_name): + for idx, node in enumerate(nodes): + node.query( + f"DROP TABLE IF EXISTS {table_name}", + settings={"database_atomic_wait_for_drop_and_detach_synchronously": True}, + ) + + node.query( + f""" + CREATE TABLE {table_name} (value Int64) + Engine=ReplicatedMergeTree('/test_parallel_replicas/shard/{table_name}', '{idx}') + ORDER BY () + """ + ) + + nodes[0].query( + f"INSERT INTO {table_name} SELECT * FROM numbers(1000)", + settings={"insert_deduplicate": 0}, + ) + nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}") + + for idx, node in enumerate(nodes): + node.query("SYSTEM STOP REPLICATED SENDS") + # the same data on all nodes except for a single value + node.query( + f"INSERT INTO {table_name} VALUES ({idx})", + settings={"insert_deduplicate": 0}, + ) + + +def test_number_of_marks_read(start_cluster): + if nodes[0].is_built_with_sanitizer(): + pytest.skip("Disabled for sanitizers (too slow)") + + table_name = "t" + _create_tables(table_name) + + for idx, node in enumerate(nodes): + expected = 499500 + idx # sum of all integers 0..999 + idx + assert ( + node.query( + "SELECT sum(value) FROM t", + settings={ + "allow_experimental_parallel_reading_from_replicas": 2, + "max_parallel_replicas": 100, + "cluster_for_parallel_replicas": "parallel_replicas", + "parallel_replicas_local_plan": True, + }, + ) + == f"{expected}\n" + ) From de78992966ac73825dd597e0f0d168b83f71360d Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 18:25:51 +0200 Subject: [PATCH 2/4] Update tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py Co-authored-by: Igor Nikonov <954088+devcrafter@users.noreply.github.com> --- .../test_parallel_replicas_snapshot_from_initiator/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py index 2b3b94f95dc..5da0a57ee7e 100644 --- a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -62,7 +62,7 @@ def test_number_of_marks_read(start_cluster): expected = 499500 + idx # sum of all integers 0..999 + idx assert ( node.query( - "SELECT sum(value) FROM t", + "SELECT sum(value) FROM {table_name}", settings={ "allow_experimental_parallel_reading_from_replicas": 2, "max_parallel_replicas": 100, From 57a6a64d8c622fc42b2e3cb16a0f9f23783bab46 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 17:31:28 +0100 Subject: [PATCH 3/4] fix --- .../test_parallel_replicas_snapshot_from_initiator/test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py index 5da0a57ee7e..ec962c7cb32 100644 --- a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -51,10 +51,9 @@ def _create_tables(table_name): ) -def test_number_of_marks_read(start_cluster): - if nodes[0].is_built_with_sanitizer(): - pytest.skip("Disabled for sanitizers (too slow)") - +# check that we use the state of data parts from the initiator node (for some sort of determinism of what is been read). +# currently it is implemented only when we build local plan for the initiator node (we aim to make this behavior default) +def test_initiator_snapshot_is_used_for_reading(start_cluster): table_name = "t" _create_tables(table_name) From 8fd9345d2d7fae6711b4733f658d330df2edffc2 Mon Sep 17 00:00:00 2001 From: Nikita Taranov Date: Fri, 13 Sep 2024 21:26:58 +0100 Subject: [PATCH 4/4] fix --- .../test_parallel_replicas_snapshot_from_initiator/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py index ec962c7cb32..a7e7e99455b 100644 --- a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -61,7 +61,7 @@ def test_initiator_snapshot_is_used_for_reading(start_cluster): expected = 499500 + idx # sum of all integers 0..999 + idx assert ( node.query( - "SELECT sum(value) FROM {table_name}", + f"SELECT sum(value) FROM {table_name}", settings={ "allow_experimental_parallel_reading_from_replicas": 2, "max_parallel_replicas": 100,