diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 218f0a61a48..0b96cc57274 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -2009,33 +2009,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons { auto result = getAnalysisResult(); - if (is_parallel_reading_from_replicas && context->canUseParallelReplicasOnInitiator() - && context->getSettingsRef().parallel_replicas_local_plan) - { - CoordinationMode mode = CoordinationMode::Default; - switch (result.read_type) - { - case ReadFromMergeTree::ReadType::Default: - mode = CoordinationMode::Default; - break; - case ReadFromMergeTree::ReadType::InOrder: - mode = CoordinationMode::WithOrder; - break; - case ReadFromMergeTree::ReadType::InReverseOrder: - mode = CoordinationMode::ReverseOrder; - break; - case ReadFromMergeTree::ReadType::ParallelReplicas: - throw Exception(ErrorCodes::LOGICAL_ERROR, "Read type can't be ParallelReplicas on initiator"); - } - - chassert(number_of_current_replica.has_value()); - chassert(all_ranges_callback.has_value()); - - /// initialize working set from local replica - all_ranges_callback.value()( - InitialAllRangesAnnouncement(mode, result.parts_with_ranges.getDescriptions(), number_of_current_replica.value())); - } - if (enable_remove_parts_from_snapshot_optimization) { /// Do not keep data parts in snapshot. diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml b/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml new file mode 100644 index 00000000000..734acf5f363 --- /dev/null +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/configs/remote_servers.xml @@ -0,0 +1,33 @@ + + + + + false + + node0 + 9000 + + + node1 + 9000 + + + node2 + 9000 + + + node3 + 9000 + + + node4 + 9000 + + + node5 + 9000 + + + + + diff --git a/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py new file mode 100644 index 00000000000..2b3b94f95dc --- /dev/null +++ b/tests/integration/test_parallel_replicas_snapshot_from_initiator/test.py @@ -0,0 +1,74 @@ +import pytest + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +nodes = [ + cluster.add_instance( + f"node{num}", main_configs=["configs/remote_servers.xml"], with_zookeeper=True + ) + for num in range(6) +] + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +def _create_tables(table_name): + for idx, node in enumerate(nodes): + node.query( + f"DROP TABLE IF EXISTS {table_name}", + settings={"database_atomic_wait_for_drop_and_detach_synchronously": True}, + ) + + node.query( + f""" + CREATE TABLE {table_name} (value Int64) + Engine=ReplicatedMergeTree('/test_parallel_replicas/shard/{table_name}', '{idx}') + ORDER BY () + """ + ) + + nodes[0].query( + f"INSERT INTO {table_name} SELECT * FROM numbers(1000)", + settings={"insert_deduplicate": 0}, + ) + nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}") + + for idx, node in enumerate(nodes): + node.query("SYSTEM STOP REPLICATED SENDS") + # the same data on all nodes except for a single value + node.query( + f"INSERT INTO {table_name} VALUES ({idx})", + settings={"insert_deduplicate": 0}, + ) + + +def test_number_of_marks_read(start_cluster): + if nodes[0].is_built_with_sanitizer(): + pytest.skip("Disabled for sanitizers (too slow)") + + table_name = "t" + _create_tables(table_name) + + for idx, node in enumerate(nodes): + expected = 499500 + idx # sum of all integers 0..999 + idx + assert ( + node.query( + "SELECT sum(value) FROM t", + settings={ + "allow_experimental_parallel_reading_from_replicas": 2, + "max_parallel_replicas": 100, + "cluster_for_parallel_replicas": "parallel_replicas", + "parallel_replicas_local_plan": True, + }, + ) + == f"{expected}\n" + )