This commit is contained in:
Igor Nikonov 2024-11-20 22:10:01 +00:00 committed by GitHub
commit 1c6cdfd0f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 25 additions and 2 deletions

View File

@ -5652,6 +5652,9 @@ Parts virtually divided into segments to be distributed between replicas for par
)", BETA) \
DECLARE(Bool, parallel_replicas_local_plan, true, R"(
Build local plan for local replica
)", BETA) \
DECLARE(Bool, parallel_replicas_skip_index_analysis_on_workers, true, R"(
Skip index analysis on workers. Effective only with enabled parallel_replicas_local_plan
)", BETA) \
\
DECLARE(Bool, allow_experimental_analyzer, true, R"(

View File

@ -80,6 +80,8 @@ namespace Setting
extern const SettingsUInt64 parallel_replica_offset;
extern const SettingsUInt64 parallel_replicas_count;
extern const SettingsParallelReplicasMode parallel_replicas_mode;
extern const SettingsBool parallel_replicas_local_plan;
extern const SettingsBool parallel_replicas_skip_index_analysis_on_workers;
}
namespace MergeTreeSetting
@ -631,10 +633,25 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
bool use_skip_indexes,
bool find_exact_ranges)
{
RangesInDataParts parts_with_ranges;
parts_with_ranges.resize(parts.size());
const Settings & settings = context->getSettingsRef();
if (context->canUseParallelReplicasOnFollower() && settings[Setting::parallel_replicas_local_plan]
&& settings[Setting::parallel_replicas_skip_index_analysis_on_workers])
{
RangesInDataParts parts_with_ranges;
parts_with_ranges.reserve(parts.size());
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
const auto & part = parts[part_index];
// LOG_DEBUG(getLogger(__PRETTY_FUNCTION__), "part {}", part->getNameWithState());
MarkRanges ranges;
ranges.emplace_back(0, part->getMarksCount());
parts_with_ranges.emplace_back(part, part_index, std::move(ranges));
}
return parts_with_ranges;
}
if (use_skip_indexes && settings[Setting::force_data_skipping_indices].changed)
{
const auto & indices_str = settings[Setting::force_data_skipping_indices].toString();
@ -673,6 +690,8 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
std::atomic<size_t> sum_marks_pk = 0;
std::atomic<size_t> sum_parts_pk = 0;
RangesInDataParts parts_with_ranges(parts.size());
/// Let's find what range to read from each part.
{
auto mark_cache = context->getIndexMarkCache();

View File

@ -71,6 +71,7 @@ def _get_result_with_parallel_replicas(
"cluster_for_parallel_replicas": f"{cluster_name}",
"parallel_replicas_mark_segment_size": parallel_replicas_mark_segment_size,
"query_id": query_id,
"parallel_replicas_skip_index_analysis_on_workers": True,
},
)