Try fix something

This commit is contained in:
Nikolai Kochetov 2024-11-20 18:46:33 +00:00
parent 587774b8ab
commit 92a5bc8d23
4 changed files with 18 additions and 7 deletions

View File

@ -128,7 +128,7 @@ void DynamicJoinFilters::filterDynamicPartsByFilledJoin(const IJoin & join)
{ {
prev_marks += range.getNumberOfMarks(); prev_marks += range.getNumberOfMarks();
// std::cerr << "Range " << range.begin << ' ' << range.end << std::endl; // std::cerr << "Range " << range.begin << ' ' << range.end << " has final mark " << part_range.data_part->index_granularity->hasFinalMark() << std::endl;
auto new_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange( auto new_ranges = MergeTreeDataSelectExecutor::markRangesFromPKRange(
part_range.data_part, part_range.data_part,
range.begin, range.begin,

View File

@ -143,6 +143,9 @@ void optimizeFilterByJoinSet(QueryPlan::Node & node)
if (!reading) if (!reading)
return; return;
if (reading->splitsRangesIntoIntersectionAndNonIntersecting() || reading->isQueryWithFinal())
return;
// if (reading->getContext()->getSettingsRef()[Setting::allow_experimental_parallel_reading_from_replicas]) // if (reading->getContext()->getSettingsRef()[Setting::allow_experimental_parallel_reading_from_replicas])
// return; // return;

View File

@ -386,6 +386,16 @@ ReadFromMergeTree::ReadFromMergeTree(
setStepDescription(data.getStorageID().getFullNameNotQuoted()); setStepDescription(data.getStorageID().getFullNameNotQuoted());
enable_vertical_final = query_info.isFinal() && context->getSettingsRef()[Setting::enable_vertical_final] enable_vertical_final = query_info.isFinal() && context->getSettingsRef()[Setting::enable_vertical_final]
&& data.merging_params.mode == MergeTreeData::MergingParams::Replacing; && data.merging_params.mode == MergeTreeData::MergingParams::Replacing;
double read_split_ranges_into_intersecting_and_non_intersecting_injection_probability
= settings[Setting::merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability];
if (read_split_ranges_into_intersecting_and_non_intersecting_injection_probability > 0.0)
{
std::bernoulli_distribution fault(read_split_ranges_into_intersecting_and_non_intersecting_injection_probability);
if (fault(thread_local_rng))
read_split_ranges_into_intersecting_and_non_intersecting_injection = true;
}
} }
std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplicasReadingStep( std::unique_ptr<ReadFromMergeTree> ReadFromMergeTree::createLocalParallelReplicasReadingStep(
@ -885,14 +895,9 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_
auto read_type = is_parallel_reading_from_replicas ? ReadType::ParallelReplicas : ReadType::Default; auto read_type = is_parallel_reading_from_replicas ? ReadType::ParallelReplicas : ReadType::Default;
double read_split_ranges_into_intersecting_and_non_intersecting_injection_probability
= settings[Setting::merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability];
std::bernoulli_distribution fault(read_split_ranges_into_intersecting_and_non_intersecting_injection_probability);
if (read_type != ReadType::ParallelReplicas && if (read_type != ReadType::ParallelReplicas &&
num_streams > 1 && num_streams > 1 &&
read_split_ranges_into_intersecting_and_non_intersecting_injection_probability > 0.0 && read_split_ranges_into_intersecting_and_non_intersecting_injection &&
fault(thread_local_rng) &&
!isQueryWithFinal() && !isQueryWithFinal() &&
data.merging_params.is_deleted_column.empty() && data.merging_params.is_deleted_column.empty() &&
!prewhere_info) !prewhere_info)

View File

@ -220,6 +220,7 @@ public:
void applyFilters(ActionDAGNodes added_filter_nodes) override; void applyFilters(ActionDAGNodes added_filter_nodes) override;
DynamiclyFilteredPartsRangesPtr useDynamiclyFilteredParts(); DynamiclyFilteredPartsRangesPtr useDynamiclyFilteredParts();
bool splitsRangesIntoIntersectionAndNonIntersecting() const { return read_split_ranges_into_intersecting_and_non_intersecting_injection; }
private: private:
MergeTreeReaderSettings reader_settings; MergeTreeReaderSettings reader_settings;
@ -296,6 +297,8 @@ private:
ExpressionActionsPtr virtual_row_conversion; ExpressionActionsPtr virtual_row_conversion;
std::optional<size_t> number_of_current_replica; std::optional<size_t> number_of_current_replica;
bool read_split_ranges_into_intersecting_and_non_intersecting_injection = false;
}; };
} }