check steps before mergesort

This commit is contained in:
jsc0218 2024-09-18 20:01:11 +00:00
parent a48bd922d9
commit fd021f658d
5 changed files with 70 additions and 12 deletions

View File

@ -94,6 +94,17 @@ static QueryPlan::Node * findReadingStep(QueryPlan::Node & node, StepStack & bac
return nullptr;
}
static bool checkVirtualRowSupport(const StepStack & backward_path)
{
for (size_t i = 0; i < backward_path.size() - 1; i++)
{
IQueryPlanStep * step = backward_path[i];
if (!typeid_cast<ExpressionStep *>(step) && !typeid_cast<FilterStep *>(step))
return false;
}
return true;
}
void updateStepsDataStreams(StepStack & steps_to_update)
{
/// update data stream's sorting properties for found transforms
@ -825,8 +836,10 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
if (!can_read)
return nullptr;
if (!order_info->first_prefix_fixed)
reading->enableVirtualRow();
if (!checkVirtualRowSupport(backward_path))
reading->setVirtualRowStatus(ReadFromMergeTree::VirtualRowStatus::No);
else if (!order_info->first_prefix_fixed)
reading->setVirtualRowStatus(ReadFromMergeTree::VirtualRowStatus::Possible);
}
return order_info;

View File

@ -1109,17 +1109,18 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
}
}
/// If enabled in the optimization stage, check whether there are more than one branch.
if (enable_virtual_row)
enable_virtual_row = splitted_parts_and_ranges.size() > 1
|| (splitted_parts_and_ranges.size() == 1 && splitted_parts_and_ranges[0].size() > 1);
/// If possible in the optimization stage, check whether there are more than one branch.
if (virtual_row_status == VirtualRowStatus::Possible)
virtual_row_status = splitted_parts_and_ranges.size() > 1
|| (splitted_parts_and_ranges.size() == 1 && splitted_parts_and_ranges[0].size() > 1)
? VirtualRowStatus::Yes : VirtualRowStatus::NoConsiderInLogicalPlan;
for (auto && item : splitted_parts_and_ranges)
{
/// If not enabled before, try to enable it when conditions meet, as in the following section of preliminary merge,
/// only ExpressionTransform is added between MergingSortedTransform and readFromMergeTree.
bool enable_current_virtual_row = enable_virtual_row;
if (!enable_virtual_row)
bool enable_current_virtual_row = false;
if (virtual_row_status == VirtualRowStatus::Yes)
enable_current_virtual_row = true;
else if (virtual_row_status == VirtualRowStatus::NoConsiderInLogicalPlan)
enable_current_virtual_row = (need_preliminary_merge || output_each_partition_through_separate_port) && item.size() > 1;
pipes.emplace_back(readInOrder(std::move(item), column_names, pool_settings, read_type, input_order_info->limit,

View File

@ -108,6 +108,14 @@ public:
using AnalysisResultPtr = std::shared_ptr<AnalysisResult>;
enum class VirtualRowStatus
{
NoConsiderInLogicalPlan,
Possible,
No,
Yes,
};
ReadFromMergeTree(
MergeTreeData::DataPartsVector parts_,
MergeTreeData::MutationsSnapshotPtr mutations_snapshot_,
@ -210,7 +218,7 @@ public:
void applyFilters(ActionDAGNodes added_filter_nodes) override;
void enableVirtualRow() { enable_virtual_row = true; }
void setVirtualRowStatus(VirtualRowStatus virtual_row_status_) { virtual_row_status = virtual_row_status_; }
private:
int getSortDirection() const
@ -284,7 +292,9 @@ private:
std::optional<MergeTreeReadTaskCallback> read_task_callback;
bool enable_vertical_final = false;
bool enable_remove_parts_from_snapshot_optimization = true;
bool enable_virtual_row = false;
VirtualRowStatus virtual_row_status = VirtualRowStatus::NoConsiderInLogicalPlan;
std::optional<size_t> number_of_current_replica;
};

View File

@ -38,3 +38,5 @@
1 3
1 2
1 1
-- test distinct ----
0

View File

@ -182,3 +182,35 @@ optimize_read_in_order = 1,
read_in_order_two_level_merge_threshold = 0; --force preliminary merge
DROP TABLE function_pk;
-- modified from 02317_distinct_in_order_optimization
SELECT '-- test distinct ----';
DROP TABLE IF EXISTS distinct_in_order SYNC;
CREATE TABLE distinct_in_order
(
`a` int,
`b` int,
`c` int
)
ENGINE = MergeTree
ORDER BY (a, b)
SETTINGS index_granularity = 8192, index_granularity_bytes = '10Mi';
SYSTEM STOP MERGES distinct_in_order;
INSERT INTO distinct_in_order SELECT
number % number,
number % 5,
number % 10
FROM numbers(1, 1000000);
SELECT DISTINCT a
FROM distinct_in_order
ORDER BY a ASC
SETTINGS read_in_order_two_level_merge_threshold = 0,
optimize_read_in_order = 1,
max_threads = 2;
DROP TABLE distinct_in_order;