Backport #72109 to 24.10: PR: do index analysis only on coordinator

This commit is contained in:
robot-clickhouse 2024-11-28 15:07:26 +00:00
parent f2dc05605c
commit 66864938d8
5 changed files with 59 additions and 32 deletions

View File

@ -5747,6 +5747,9 @@ File/S3 engines/table function will parse paths with '::' as '\\<archive\\> :: \
)", 0) \
DECLARE(Bool, parallel_replicas_local_plan, true, R"(
Build local plan for local replica
)", 0) \
DECLARE(Bool, parallel_replicas_index_analysis_only_on_coordinator, true, R"(
Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan
)", 0) \
\
DECLARE(Bool, allow_experimental_inverted_index, false, R"(

View File

@ -113,6 +113,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"input_format_parquet_bloom_filter_push_down", false, true, "When reading Parquet files, skip whole row groups based on the WHERE/PREWHERE expressions and bloom filter in the Parquet metadata."},
{"date_time_64_output_format_cut_trailing_zeros_align_to_groups_of_thousands", false, false, "Dynamically trim the trailing zeros of datetime64 values to adjust the output scale to (0, 3, 6), corresponding to 'seconds', 'milliseconds', and 'microseconds'."},
{"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"},
{"parallel_replicas_index_analysis_only_on_coordinator", false, true, "Index analysis done only on replica-coordinator and skipped on other replicas. Effective only with enabled parallel_replicas_local_plan"},
}
},
{"24.9",

View File

@ -83,6 +83,8 @@ namespace Setting
extern const SettingsUInt64 parallel_replica_offset;
extern const SettingsUInt64 parallel_replicas_count;
extern const SettingsParallelReplicasMode parallel_replicas_mode;
extern const SettingsBool parallel_replicas_local_plan;
extern const SettingsBool parallel_replicas_index_analysis_only_on_coordinator;
}
namespace MergeTreeSetting
@ -634,10 +636,23 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
bool use_skip_indexes,
bool find_exact_ranges)
{
RangesInDataParts parts_with_ranges;
parts_with_ranges.resize(parts.size());
const Settings & settings = context->getSettingsRef();
if (context->canUseParallelReplicasOnFollower() && settings[Setting::parallel_replicas_local_plan]
&& settings[Setting::parallel_replicas_index_analysis_only_on_coordinator])
{
// Skip index analysis and return parts with all marks
// The coordinator will chose ranges to read for workers based on index analysis on its side
RangesInDataParts parts_with_ranges;
parts_with_ranges.reserve(parts.size());
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
const auto & part = parts[part_index];
parts_with_ranges.emplace_back(part, part_index, MarkRanges{{0, part->getMarksCount()}});
}
return parts_with_ranges;
}
if (use_skip_indexes && settings[Setting::force_data_skipping_indices].changed)
{
const auto & indices = settings[Setting::force_data_skipping_indices].toString();
@ -685,6 +700,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
std::atomic<size_t> sum_marks_pk = 0;
std::atomic<size_t> sum_parts_pk = 0;
RangesInDataParts parts_with_ranges(parts.size());
/// Let's find what range to read from each part.
{
auto mark_cache = context->getIndexMarkCache();

View File

@ -71,6 +71,7 @@ def _get_result_with_parallel_replicas(
"cluster_for_parallel_replicas": f"{cluster_name}",
"parallel_replicas_mark_segment_size": parallel_replicas_mark_segment_size,
"query_id": query_id,
"parallel_replicas_index_analysis_only_on_coordinator": False,
},
)

View File

@ -1,4 +1,5 @@
import re
import uuid
from random import randint
import pytest
@ -18,15 +19,6 @@ nodes = [
]
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def _create_tables(table_name):
nodes[0].query(
f"DROP TABLE IF EXISTS {table_name} ON CLUSTER 'parallel_replicas'",
@ -48,14 +40,28 @@ def _create_tables(table_name):
nodes[0].query(f"SYSTEM SYNC REPLICA ON CLUSTER 'parallel_replicas' {table_name}")
table_name = "t"
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
_create_tables(table_name)
yield cluster
finally:
cluster.shutdown()
# now mark_segment_size is part of the protocol and is communicated to the initiator.
# let's check that the correct value is actually used by the coordinator
def test_mark_segment_size_communicated_correctly(start_cluster):
table_name = "t"
_create_tables(table_name)
@pytest.mark.parametrize("local_plan", [0, 1])
@pytest.mark.parametrize("index_analysis_only_on_coordinator", [0, 1])
def test_mark_segment_size_communicated_correctly(
start_cluster, local_plan, index_analysis_only_on_coordinator
):
for local_plan in [0, 1]:
query_id = f"query_id_{randint(0, 1000000)}"
query_id = f"query_id_{str(uuid.uuid4())}"
nodes[0].query(
f"SELECT sum(value) FROM {table_name}",
settings={
@ -65,11 +71,10 @@ def test_mark_segment_size_communicated_correctly(start_cluster):
"parallel_replicas_mark_segment_size": 0,
"parallel_replicas_local_plan": local_plan,
"query_id": query_id,
"parallel_replicas_index_analysis_only_on_coordinator": index_analysis_only_on_coordinator,
},
)
nodes[0].query("SYSTEM FLUSH LOGS")
log_line = nodes[0].grep_in_log(
f"{query_id}.*Reading state is fully initialized"
)
log_line = nodes[0].grep_in_log(f"{query_id}.*Reading state is fully initialized")
assert re.search(r"mark_segment_size: (\d+)", log_line).group(1) == "16384"