From 92a5bc8d234d558932faa04ab971138484f27c29 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 20 Nov 2024 18:46:33 +0000 Subject: [PATCH] Try fix something --- src/Processors/QueryPlan/JoinStep.cpp | 2 +- .../Optimizations/optimizeFilterByJoinSet.cpp | 3 +++ src/Processors/QueryPlan/ReadFromMergeTree.cpp | 17 +++++++++++------ src/Processors/QueryPlan/ReadFromMergeTree.h | 3 +++ 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/Processors/QueryPlan/JoinStep.cpp b/src/Processors/QueryPlan/JoinStep.cpp index acb566e30dc..1523792fc63 100644 --- a/src/Processors/QueryPlan/JoinStep.cpp +++ b/src/Processors/QueryPlan/JoinStep.cpp @@ -128,7 +128,7 @@ void DynamicJoinFilters::filterDynamicPartsByFilledJoin(const IJoin & join) { prev_marks += range.getNumberOfMarks(); - // std::cerr << "Range " << range.begin << ' ' << range.end << std::endl; + // std::cerr << "Range " << range.begin << ' ' << range.end << " has final mark " << part_range.data_part->index_granularity->hasFinalMark() << std::endl; auto new_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange( part_range.data_part, range.begin, diff --git a/src/Processors/QueryPlan/Optimizations/optimizeFilterByJoinSet.cpp b/src/Processors/QueryPlan/Optimizations/optimizeFilterByJoinSet.cpp index 7f317f11f01..e17d0cf73f6 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizeFilterByJoinSet.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizeFilterByJoinSet.cpp @@ -143,6 +143,9 @@ void optimizeFilterByJoinSet(QueryPlan::Node & node) if (!reading) return; + if (reading->splitsRangesIntoIntersectionAndNonIntersecting() || reading->isQueryWithFinal()) + return; + // if (reading->getContext()->getSettingsRef()[Setting::allow_experimental_parallel_reading_from_replicas]) // return; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 07becb60f56..6e6a230b00c 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -386,6 +386,16 @@ ReadFromMergeTree::ReadFromMergeTree( setStepDescription(data.getStorageID().getFullNameNotQuoted()); enable_vertical_final = query_info.isFinal() && context->getSettingsRef()[Setting::enable_vertical_final] && data.merging_params.mode == MergeTreeData::MergingParams::Replacing; + + double read_split_ranges_into_intersecting_and_non_intersecting_injection_probability + = settings[Setting::merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability]; + if (read_split_ranges_into_intersecting_and_non_intersecting_injection_probability > 0.0) + { + std::bernoulli_distribution fault(read_split_ranges_into_intersecting_and_non_intersecting_injection_probability); + + if (fault(thread_local_rng)) + read_split_ranges_into_intersecting_and_non_intersecting_injection = true; + } } std::unique_ptr ReadFromMergeTree::createLocalParallelReplicasReadingStep( @@ -885,14 +895,9 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_ auto read_type = is_parallel_reading_from_replicas ? ReadType::ParallelReplicas : ReadType::Default; - double read_split_ranges_into_intersecting_and_non_intersecting_injection_probability - = settings[Setting::merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability]; - std::bernoulli_distribution fault(read_split_ranges_into_intersecting_and_non_intersecting_injection_probability); - if (read_type != ReadType::ParallelReplicas && num_streams > 1 && - read_split_ranges_into_intersecting_and_non_intersecting_injection_probability > 0.0 && - fault(thread_local_rng) && + read_split_ranges_into_intersecting_and_non_intersecting_injection && !isQueryWithFinal() && data.merging_params.is_deleted_column.empty() && !prewhere_info) diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.h b/src/Processors/QueryPlan/ReadFromMergeTree.h index 5466ecad1f9..82a473dc5f0 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.h +++ b/src/Processors/QueryPlan/ReadFromMergeTree.h @@ -220,6 +220,7 @@ public: void applyFilters(ActionDAGNodes added_filter_nodes) override; DynamiclyFilteredPartsRangesPtr useDynamiclyFilteredParts(); + bool splitsRangesIntoIntersectionAndNonIntersecting() const { return read_split_ranges_into_intersecting_and_non_intersecting_injection; } private: MergeTreeReaderSettings reader_settings; @@ -296,6 +297,8 @@ private: ExpressionActionsPtr virtual_row_conversion; std::optional number_of_current_replica; + + bool read_split_ranges_into_intersecting_and_non_intersecting_injection = false; }; }