From 66864938d86737a9707f26de878aaf41428ccfbb Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Thu, 28 Nov 2024 15:07:26 +0000 Subject: [PATCH] Backport #72109 to 24.10: PR: do index analysis only on coordinator --- src/Core/Settings.cpp | 3 + src/Core/SettingsChangesHistory.cpp | 1 + .../MergeTree/MergeTreeDataSelectExecutor.cpp | 21 +++++- .../test.py | 1 + .../test_parallel_replicas_protocol/test.py | 65 ++++++++++--------- 5 files changed, 59 insertions(+), 32 deletions(-) diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 60adda6247a..89954ec798f 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5747,6 +5747,9 @@ File/S3 engines/table function will parse paths with '::' as '\\ :: \ )", 0) \ DECLARE(Bool, parallel_replicas_local_plan, true, R"( Build local plan for local replica +)", 0) \ + DECLARE(Bool, parallel_replicas_index_analysis_only_on_coordinator, true, R"( +Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan )", 0) \ \ DECLARE(Bool, allow_experimental_inverted_index, false, R"( diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 3b597d1448f..4b630600d40 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -113,6 +113,7 @@ static std::initializer_listgetSettingsRef(); + if (context->canUseParallelReplicasOnFollower() && settings[Setting::parallel_replicas_local_plan] + && settings[Setting::parallel_replicas_index_analysis_only_on_coordinator]) + { + // Skip index analysis and return parts with all marks + // The coordinator will chose ranges to read for workers based on index analysis on its side + RangesInDataParts parts_with_ranges; + parts_with_ranges.reserve(parts.size()); + for (size_t part_index = 0; part_index < parts.size(); ++part_index) + { + const auto & part = parts[part_index]; + parts_with_ranges.emplace_back(part, part_index, MarkRanges{{0, part->getMarksCount()}}); + } + return parts_with_ranges; + } + if (use_skip_indexes && settings[Setting::force_data_skipping_indices].changed) { const auto & indices = settings[Setting::force_data_skipping_indices].toString(); @@ -685,6 +700,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd std::atomic sum_marks_pk = 0; std::atomic sum_parts_pk = 0; + RangesInDataParts parts_with_ranges(parts.size()); + /// Let's find what range to read from each part. { auto mark_cache = context->getIndexMarkCache(); diff --git a/tests/integration/test_parallel_replicas_all_marks_read/test.py b/tests/integration/test_parallel_replicas_all_marks_read/test.py index 593b98126df..c93e8c7c09b 100644 --- a/tests/integration/test_parallel_replicas_all_marks_read/test.py +++ b/tests/integration/test_parallel_replicas_all_marks_read/test.py @@ -71,6 +71,7 @@ def _get_result_with_parallel_replicas( "cluster_for_parallel_replicas": f"{cluster_name}", "parallel_replicas_mark_segment_size": parallel_replicas_mark_segment_size, "query_id": query_id, + "parallel_replicas_index_analysis_only_on_coordinator": False, }, ) diff --git a/tests/integration/test_parallel_replicas_protocol/test.py b/tests/integration/test_parallel_replicas_protocol/test.py index 2ed39a3273f..c9a8ffa287b 100644 --- a/tests/integration/test_parallel_replicas_protocol/test.py +++ b/tests/integration/test_parallel_replicas_protocol/test.py @@ -1,4 +1,5 @@ import re +import uuid from random import randint import pytest @@ -18,15 +19,6 @@ nodes = [ ] -@pytest.fixture(scope="module", autouse=True) -def start_cluster(): - try: - cluster.start() - yield cluster - finally: - cluster.shutdown() - - def _create_tables(table_name): nodes[0].query( f"DROP TABLE IF EXISTS {table_name} ON CLUSTER 'parallel_replicas'", @@ -48,28 +40,41 @@ def _create_tables(table_name): nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}") +table_name = "t" + + +@pytest.fixture(scope="module", autouse=True) +def start_cluster(): + try: + cluster.start() + _create_tables(table_name) + yield cluster + finally: + cluster.shutdown() + + # now mark_segment_size is part of the protocol and is communicated to the initiator. # let's check that the correct value is actually used by the coordinator -def test_mark_segment_size_communicated_correctly(start_cluster): - table_name = "t" - _create_tables(table_name) +@pytest.mark.parametrize("local_plan", [0, 1]) +@pytest.mark.parametrize("index_analysis_only_on_coordinator", [0, 1]) +def test_mark_segment_size_communicated_correctly( + start_cluster, local_plan, index_analysis_only_on_coordinator +): - for local_plan in [0, 1]: - query_id = f"query_id_{randint(0, 1000000)}" - nodes[0].query( - f"SELECT sum(value) FROM {table_name}", - settings={ - "allow_experimental_parallel_reading_from_replicas": 2, - "max_parallel_replicas": 100, - "cluster_for_parallel_replicas": "parallel_replicas", - "parallel_replicas_mark_segment_size": 0, - "parallel_replicas_local_plan": local_plan, - "query_id": query_id, - }, - ) + query_id = f"query_id_{str(uuid.uuid4())}" + nodes[0].query( + f"SELECT sum(value) FROM {table_name}", + settings={ + "allow_experimental_parallel_reading_from_replicas": 2, + "max_parallel_replicas": 100, + "cluster_for_parallel_replicas": "parallel_replicas", + "parallel_replicas_mark_segment_size": 0, + "parallel_replicas_local_plan": local_plan, + "query_id": query_id, + "parallel_replicas_index_analysis_only_on_coordinator": index_analysis_only_on_coordinator, + }, + ) - nodes[0].query("SYSTEM FLUSH LOGS") - log_line = nodes[0].grep_in_log( - f"{query_id}.*Reading state is fully initialized" - ) - assert re.search(r"mark_segment_size: (\d+)", log_line).group(1) == "16384" + nodes[0].query("SYSTEM FLUSH LOGS") + log_line = nodes[0].grep_in_log(f"{query_id}.*Reading state is fully initialized") + assert re.search(r"mark_segment_size: (\d+)", log_line).group(1) == "16384"