More review fixes.

This commit is contained in:
Nikolai Kochetov 2024-09-17 13:08:52 +00:00
parent ca4b366cc0
commit d31c9dd3ef
4 changed files with 12 additions and 16 deletions

View File

@ -76,14 +76,14 @@ void enableMemoryBoundMerging(QueryPlan::Node & node)
for (auto & reading : reading_steps)
{
reading->enforceSorting(sort_description);
reading->enableMemoryBoundMerging();
if (enforce_aggregation_in_order)
reading->enforceAggregationInOrder();
}
for (auto & reading : async_reading_steps)
{
reading->enforceSorting(sort_description);
reading->enableMemoryBoundMerging();
if (enforce_aggregation_in_order)
reading->enforceAggregationInOrder();
}

View File

@ -781,7 +781,7 @@ InputOrder buildInputOrderFromUnorderedKeys(
InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & node)
{
QueryPlan::Node * reading_node = findReadingStep(node, false);
QueryPlan::Node * reading_node = findReadingStep(node, /*allow_existing_order=*/ false);
if (!reading_node)
return nullptr;
@ -835,7 +835,7 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
InputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPlan::Node & node)
{
QueryPlan::Node * reading_node = findReadingStep(node, false);
QueryPlan::Node * reading_node = findReadingStep(node, /*allow_existing_order=*/ false);
if (!reading_node)
return {};
@ -923,7 +923,7 @@ InputOrder buildInputOrderInfo(DistinctStep & distinct, QueryPlan::Node & node)
/// Example: SELECT DISTINCT a, b FROM t ORDER BY a; -- sorting key: a, b
/// If read in order for ORDER BY is already applied, then output sort description will contain only column `a`,
/// but we need columns `a, b`, applying read in order for distinct will still benefit `order by`
QueryPlan::Node * reading_node = findReadingStep(node, true);
QueryPlan::Node * reading_node = findReadingStep(node, /*allow_existing_order=*/ true);
if (!reading_node)
return {};
@ -1083,7 +1083,6 @@ void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
/// Use buffering only if have filter or don't have limit.
bool use_buffering = order_info->limit == 0;
sorting->convertToFinishSorting(order_info->sort_description_for_merging, use_buffering);
// updateStepsDataStreams(steps_to_update);
}
}

View File

@ -62,7 +62,7 @@ static void addConvertingActions(Pipe & pipe, const Block & header, bool use_pos
});
}
static void enforceSorting(QueryProcessingStage::Enum stage, Context & context)
static void enableMemoryBoundMerging(QueryProcessingStage::Enum stage, Context & context)
{
if (stage != QueryProcessingStage::WithMergeableState)
throw Exception(
@ -71,9 +71,6 @@ static void enforceSorting(QueryProcessingStage::Enum stage, Context & context)
QueryProcessingStage::toString(stage));
context.setSetting("enable_memory_bound_merging_of_aggregation_results", true);
// output_stream.sort_description = std::move(output_sort_description);
// output_stream.sort_scope = DataStream::SortScope::Stream;
}
static void enforceAggregationInOrder(QueryProcessingStage::Enum stage, Context & context)
@ -129,9 +126,9 @@ ReadFromRemote::ReadFromRemote(
{
}
void ReadFromRemote::enforceSorting([[maybe_unused]] SortDescription output_sort_description)
void ReadFromRemote::enableMemoryBoundMerging()
{
DB::enforceSorting(stage, *context); //, *output_stream, *context, output_sort_description);
DB::enableMemoryBoundMerging(stage, *context);
}
void ReadFromRemote::enforceAggregationInOrder()
@ -405,9 +402,9 @@ ReadFromParallelRemoteReplicasStep::ReadFromParallelRemoteReplicasStep(
setStepDescription(std::move(description));
}
void ReadFromParallelRemoteReplicasStep::enforceSorting([[maybe_unused]] SortDescription output_sort_description)
void ReadFromParallelRemoteReplicasStep::enableMemoryBoundMerging()
{
DB::enforceSorting(stage, *context);
DB::enableMemoryBoundMerging(stage, *context);
}
void ReadFromParallelRemoteReplicasStep::enforceAggregationInOrder()

View File

@ -40,7 +40,7 @@ public:
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void enforceSorting(SortDescription output_sort_description);
void enableMemoryBoundMerging();
void enforceAggregationInOrder();
private:
@ -86,7 +86,7 @@ public:
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void enforceSorting(SortDescription output_sort_description);
void enableMemoryBoundMerging();
void enforceAggregationInOrder();
private: