From 93161f70d15e4a3587ddde494532a874c6b432d0 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Wed, 10 May 2023 10:45:43 +0000 Subject: [PATCH] Correct suffix handling for ranges --- .../Transforms/FillingTransform.cpp | 84 +++++++++++-------- src/Processors/Transforms/FillingTransform.h | 9 +- ...2730_with_fill_by_sorting_prefix.reference | 51 +++++++++++ .../02730_with_fill_by_sorting_prefix.sql | 19 +++++ 4 files changed, 126 insertions(+), 37 deletions(-) diff --git a/src/Processors/Transforms/FillingTransform.cpp b/src/Processors/Transforms/FillingTransform.cpp index b9c3341948a..5fa89e89a4b 100644 --- a/src/Processors/Transforms/FillingTransform.cpp +++ b/src/Processors/Transforms/FillingTransform.cpp @@ -246,6 +246,7 @@ FillingTransform::FillingTransform( sort_prefix.push_back(desc); } logDebug("sort prefix", dumpSortDescription(sort_prefix)); + last_range_sort_prefix.reserve(sort_prefix.size()); /// TODO: check conflict in positions between interpolate and sorting prefix columns @@ -334,6 +335,8 @@ using MutableColumnRawPtrs = std::vector; static void insertFromFillingRow(const MutableColumnRawPtrs & filling_columns, const MutableColumnRawPtrs & interpolate_columns, const MutableColumnRawPtrs & other_columns, const FillingRow & filling_row, const Block & interpolate_block) { + logDebug("insertFromFillingRow", filling_row); + for (size_t i = 0, size = filling_columns.size(); i < size; ++i) { if (filling_row[i].isNull()) @@ -410,27 +413,6 @@ void FillingTransform::initColumns( bool FillingTransform::generateSuffixIfNeeded(const Columns & input_columns, MutableColumns & result_columns) { - /// true means we'll generate rows for empty result set - const bool no_data_processed = last_row.empty(); - - logDebug("generateSuffixIfNeeded() filling_row", filling_row); - logDebug("generateSuffixIfNeeded() next_row", next_row); - logDebug("generateSuffixIfNeeded() no_data_processed", no_data_processed); - - /// Determines should we insert filling row before start generating next rows. - bool should_insert_first = next_row < filling_row || no_data_processed; - - for (size_t i = 0, size = filling_row.size(); i < size; ++i) - next_row[i] = filling_row.getFillDescription(i).fill_to; - - logDebug("generateSuffixIfNeeded() next_row updated", next_row); - - if (!no_data_processed && filling_row >= next_row) - { - logDebug("generateSuffixIfNeeded()", "no need to generate suffix"); - return false; - } - Columns input_fill_columns; Columns input_interpolate_columns; Columns input_sort_prefix_columns; @@ -452,16 +434,32 @@ bool FillingTransform::generateSuffixIfNeeded(const Columns & input_columns, Mut res_sort_prefix_columns, res_other_columns); - if (no_data_processed) - filling_row.initFromDefaults(); + return generateSuffixIfNeeded(result_columns, res_fill_columns, res_interpolate_columns, res_sort_prefix_columns, res_other_columns); +} - /// if any rows was processed and there is sort prefix, get last row sort prefix - Columns last_row_sort_prefix; - if (!last_row.empty()) +bool FillingTransform::generateSuffixIfNeeded( + const MutableColumns & result_columns, + MutableColumnRawPtrs res_fill_columns, + MutableColumnRawPtrs res_interpolate_columns, + MutableColumnRawPtrs res_sort_prefix_columns, + MutableColumnRawPtrs res_other_columns) +{ + logDebug("generateSuffixIfNeeded() filling_row", filling_row); + logDebug("generateSuffixIfNeeded() next_row", next_row); + + /// Determines should we insert filling row before start generating next rows. + bool should_insert_first = next_row < filling_row; + logDebug("should_insert_first", should_insert_first); + + for (size_t i = 0, size = filling_row.size(); i < size; ++i) + next_row[i] = filling_row.getFillDescription(i).fill_to; + + logDebug("generateSuffixIfNeeded() next_row updated", next_row); + + if (filling_row >= next_row) { - last_row_sort_prefix.reserve(sort_prefix_positions.size()); - for (const size_t pos : sort_prefix_positions) - last_row_sort_prefix.push_back(last_row[pos]); + logDebug("generateSuffixIfNeeded()", "no need to generate suffix"); + return false; } Block interpolate_block; @@ -470,8 +468,8 @@ bool FillingTransform::generateSuffixIfNeeded(const Columns & input_columns, Mut interpolate(result_columns, interpolate_block); insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block); /// fulfill sort prefix columns with last row values or defaults - if (!last_row_sort_prefix.empty()) - copyRowFromColumns(res_sort_prefix_columns, last_row_sort_prefix, 0); + if (!last_range_sort_prefix.empty()) + copyRowFromColumns(res_sort_prefix_columns, last_range_sort_prefix, 0); else for (auto * sort_prefix_column : res_sort_prefix_columns) sort_prefix_column->insertDefault(); @@ -482,8 +480,8 @@ bool FillingTransform::generateSuffixIfNeeded(const Columns & input_columns, Mut interpolate(result_columns, interpolate_block); insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block); /// fulfill sort prefix columns with last row values or defaults - if (!last_row_sort_prefix.empty()) - copyRowFromColumns(res_sort_prefix_columns, last_row_sort_prefix, 0); + if (!last_range_sort_prefix.empty()) + copyRowFromColumns(res_sort_prefix_columns, last_range_sort_prefix, 0); else for (auto * sort_prefix_column : res_sort_prefix_columns) sort_prefix_column->insertDefault(); @@ -591,7 +589,6 @@ void FillingTransform::transformRange( interpolate(result_columns, interpolate_block); insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block); copyRowFromColumns(res_sort_prefix_columns, input_sort_prefix_columns, row_ind); - logDebug("filling_row should_insert_first", filling_row); } while (filling_row.next(next_row)) @@ -599,7 +596,6 @@ void FillingTransform::transformRange( interpolate(result_columns, interpolate_block); insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block); copyRowFromColumns(res_sort_prefix_columns, input_sort_prefix_columns, row_ind); - logDebug("filling_row", filling_row); } copyRowFromColumns(res_fill_columns, input_fill_columns, row_ind); @@ -607,6 +603,15 @@ void FillingTransform::transformRange( copyRowFromColumns(res_sort_prefix_columns, input_sort_prefix_columns, row_ind); copyRowFromColumns(res_other_columns, input_other_columns, row_ind); } + + /// save sort prefix of last row in the range, it's used to generate suffix + last_range_sort_prefix.clear(); + for (const auto & sort_prefix_column : input_sort_prefix_columns) + { + auto column = sort_prefix_column->cloneEmpty(); + column->insertFrom(*sort_prefix_column, range_end - 1); + last_range_sort_prefix.push_back(std::move(column)); + } } void FillingTransform::transform(Chunk & chunk) @@ -638,6 +643,11 @@ void FillingTransform::transform(Chunk & chunk) /// if all chunks are processed, then we may need to generate suffix for the following cases: /// (1) when all data are processed and WITH FILL .. TO is provided /// (2) for empty result set when WITH FILL FROM .. TO is provided (see PR #30888) + + /// if no data was processed, then need to initialize filling_row + if (last_row.empty()) + filling_row.initFromDefaults(); + if (generateSuffixIfNeeded(input.getHeader().getColumns(), result_columns)) { size_t num_output_rows = result_columns[0]->size(); @@ -724,6 +734,10 @@ void FillingTransform::transform(Chunk & chunk) return true; }); + /// generate suffix for the previous range + if (!last_range_sort_prefix.empty()) + generateSuffixIfNeeded(result_columns, res_fill_columns, res_interpolate_columns, res_sort_prefix_columns, res_other_columns); + transformRange( input_fill_columns, input_interpolate_columns, diff --git a/src/Processors/Transforms/FillingTransform.h b/src/Processors/Transforms/FillingTransform.h index 0506bb92efb..def4e87c7ed 100644 --- a/src/Processors/Transforms/FillingTransform.h +++ b/src/Processors/Transforms/FillingTransform.h @@ -62,8 +62,12 @@ private: MutableColumnRawPtrs & output_other_columns); bool generateSuffixIfNeeded( - const Columns & input_columns, - MutableColumns & result_columns); + const MutableColumns & result_columns, + MutableColumnRawPtrs res_fill_columns, + MutableColumnRawPtrs res_interpolate_columns, + MutableColumnRawPtrs res_sort_prefix_columns, + MutableColumnRawPtrs res_other_columns); + bool generateSuffixIfNeeded(const Columns & input_columns, MutableColumns & result_columns); const SortDescription sort_description; const SortDescription fill_description; /// Contains only columns with WITH FILL. @@ -81,6 +85,7 @@ private: std::vector> input_positions; /// positions in result columns required for actions ExpressionActionsPtr interpolate_actions; Columns last_row; + Columns last_range_sort_prefix; bool all_chunks_processed = false; /// flag to determine if we have already processed all chunks }; diff --git a/tests/queries/0_stateless/02730_with_fill_by_sorting_prefix.reference b/tests/queries/0_stateless/02730_with_fill_by_sorting_prefix.reference index 121835a80e8..54a4faf46f2 100644 --- a/tests/queries/0_stateless/02730_with_fill_by_sorting_prefix.reference +++ b/tests/queries/0_stateless/02730_with_fill_by_sorting_prefix.reference @@ -21,3 +21,54 @@ select * from ts order by sensor_id, timestamp with fill step 1; 5 1 1 5 2 0 5 3 1 +drop table if exists ts; +create table ts (sensor_id UInt64, timestamp UInt64, value Float64) ENGINE=MergeTree() ORDER BY (sensor_id, timestamp); +system stop merges ts; +-- FillingTransform: 6 rows will be processed in 2 chunks with 3 rows each +insert into ts VALUES (1, 10, 1), (1, 12, 1), (3, 5, 1); +insert into ts VALUES (3, 7, 1), (5, 1, 1), (5, 3, 1); +select * from ts order by sensor_id, timestamp with fill step 1 settings max_block_size=3; +1 10 1 +1 11 0 +1 12 1 +3 5 1 +3 6 0 +3 7 1 +5 1 1 +5 2 0 +5 3 1 +drop table if exists ts; +create table ts (sensor_id UInt64, timestamp UInt64, value Float64) ENGINE=MergeTree() ORDER BY (sensor_id, timestamp); +system stop merges ts; +-- FillingTransform: 6 rows will be processed in 3 chunks with 2 rows each +insert into ts VALUES (1, 10, 1), (1, 12, 1); +insert into ts VALUES (3, 5, 1), (3, 7, 1); +insert into ts VALUES (5, 1, 1), (5, 3, 1); +select * from ts order by sensor_id, timestamp with fill step 1 settings max_block_size=2; +1 10 1 +1 11 0 +1 12 1 +3 5 1 +3 6 0 +3 7 1 +5 1 1 +5 2 0 +5 3 1 +select * from ts order by sensor_id, timestamp with fill from 6 to 10 step 1 interpolate (value as 9999); +1 6 9999 +1 7 9999 +1 8 9999 +1 9 9999 +1 10 1 +1 12 1 +3 5 1 +3 6 9999 +3 7 1 +3 8 9999 +3 9 9999 +5 1 1 +5 3 1 +5 6 9999 +5 7 9999 +5 8 9999 +5 9 9999 diff --git a/tests/queries/0_stateless/02730_with_fill_by_sorting_prefix.sql b/tests/queries/0_stateless/02730_with_fill_by_sorting_prefix.sql index 828572eb620..1b2288da323 100644 --- a/tests/queries/0_stateless/02730_with_fill_by_sorting_prefix.sql +++ b/tests/queries/0_stateless/02730_with_fill_by_sorting_prefix.sql @@ -11,3 +11,22 @@ create table ts (sensor_id UInt64, timestamp UInt64, value Float64) ENGINE=Merge insert into ts VALUES (1, 10, 1), (1, 12, 2), (3, 5, 1), (3, 7, 3), (5, 1, 1), (5, 3, 1); -- FillingTransform: 6 rows will be processed in 1 chunks select * from ts order by sensor_id, timestamp with fill step 1; + +drop table if exists ts; +create table ts (sensor_id UInt64, timestamp UInt64, value Float64) ENGINE=MergeTree() ORDER BY (sensor_id, timestamp); +system stop merges ts; +-- FillingTransform: 6 rows will be processed in 2 chunks with 3 rows each +insert into ts VALUES (1, 10, 1), (1, 12, 1), (3, 5, 1); +insert into ts VALUES (3, 7, 1), (5, 1, 1), (5, 3, 1); +select * from ts order by sensor_id, timestamp with fill step 1 settings max_block_size=3; + +drop table if exists ts; +create table ts (sensor_id UInt64, timestamp UInt64, value Float64) ENGINE=MergeTree() ORDER BY (sensor_id, timestamp); +system stop merges ts; +-- FillingTransform: 6 rows will be processed in 3 chunks with 2 rows each +insert into ts VALUES (1, 10, 1), (1, 12, 1); +insert into ts VALUES (3, 5, 1), (3, 7, 1); +insert into ts VALUES (5, 1, 1), (5, 3, 1); +select * from ts order by sensor_id, timestamp with fill step 1 settings max_block_size=2; + +select * from ts order by sensor_id, timestamp with fill from 6 to 10 step 1 interpolate (value as 9999);