Correct suffix handling for ranges

This commit is contained in:
Igor Nikonov 2023-05-10 10:45:43 +00:00
parent 0dfeb880ce
commit 93161f70d1
4 changed files with 126 additions and 37 deletions

View File

@ -246,6 +246,7 @@ FillingTransform::FillingTransform(
sort_prefix.push_back(desc);
}
logDebug("sort prefix", dumpSortDescription(sort_prefix));
last_range_sort_prefix.reserve(sort_prefix.size());
/// TODO: check conflict in positions between interpolate and sorting prefix columns
@ -334,6 +335,8 @@ using MutableColumnRawPtrs = std::vector<IColumn*>;
static void insertFromFillingRow(const MutableColumnRawPtrs & filling_columns, const MutableColumnRawPtrs & interpolate_columns, const MutableColumnRawPtrs & other_columns,
const FillingRow & filling_row, const Block & interpolate_block)
{
logDebug("insertFromFillingRow", filling_row);
for (size_t i = 0, size = filling_columns.size(); i < size; ++i)
{
if (filling_row[i].isNull())
@ -410,27 +413,6 @@ void FillingTransform::initColumns(
bool FillingTransform::generateSuffixIfNeeded(const Columns & input_columns, MutableColumns & result_columns)
{
/// true means we'll generate rows for empty result set
const bool no_data_processed = last_row.empty();
logDebug("generateSuffixIfNeeded() filling_row", filling_row);
logDebug("generateSuffixIfNeeded() next_row", next_row);
logDebug("generateSuffixIfNeeded() no_data_processed", no_data_processed);
/// Determines should we insert filling row before start generating next rows.
bool should_insert_first = next_row < filling_row || no_data_processed;
for (size_t i = 0, size = filling_row.size(); i < size; ++i)
next_row[i] = filling_row.getFillDescription(i).fill_to;
logDebug("generateSuffixIfNeeded() next_row updated", next_row);
if (!no_data_processed && filling_row >= next_row)
{
logDebug("generateSuffixIfNeeded()", "no need to generate suffix");
return false;
}
Columns input_fill_columns;
Columns input_interpolate_columns;
Columns input_sort_prefix_columns;
@ -452,16 +434,32 @@ bool FillingTransform::generateSuffixIfNeeded(const Columns & input_columns, Mut
res_sort_prefix_columns,
res_other_columns);
if (no_data_processed)
filling_row.initFromDefaults();
return generateSuffixIfNeeded(result_columns, res_fill_columns, res_interpolate_columns, res_sort_prefix_columns, res_other_columns);
}
/// if any rows was processed and there is sort prefix, get last row sort prefix
Columns last_row_sort_prefix;
if (!last_row.empty())
bool FillingTransform::generateSuffixIfNeeded(
const MutableColumns & result_columns,
MutableColumnRawPtrs res_fill_columns,
MutableColumnRawPtrs res_interpolate_columns,
MutableColumnRawPtrs res_sort_prefix_columns,
MutableColumnRawPtrs res_other_columns)
{
logDebug("generateSuffixIfNeeded() filling_row", filling_row);
logDebug("generateSuffixIfNeeded() next_row", next_row);
/// Determines should we insert filling row before start generating next rows.
bool should_insert_first = next_row < filling_row;
logDebug("should_insert_first", should_insert_first);
for (size_t i = 0, size = filling_row.size(); i < size; ++i)
next_row[i] = filling_row.getFillDescription(i).fill_to;
logDebug("generateSuffixIfNeeded() next_row updated", next_row);
if (filling_row >= next_row)
{
last_row_sort_prefix.reserve(sort_prefix_positions.size());
for (const size_t pos : sort_prefix_positions)
last_row_sort_prefix.push_back(last_row[pos]);
logDebug("generateSuffixIfNeeded()", "no need to generate suffix");
return false;
}
Block interpolate_block;
@ -470,8 +468,8 @@ bool FillingTransform::generateSuffixIfNeeded(const Columns & input_columns, Mut
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
/// fulfill sort prefix columns with last row values or defaults
if (!last_row_sort_prefix.empty())
copyRowFromColumns(res_sort_prefix_columns, last_row_sort_prefix, 0);
if (!last_range_sort_prefix.empty())
copyRowFromColumns(res_sort_prefix_columns, last_range_sort_prefix, 0);
else
for (auto * sort_prefix_column : res_sort_prefix_columns)
sort_prefix_column->insertDefault();
@ -482,8 +480,8 @@ bool FillingTransform::generateSuffixIfNeeded(const Columns & input_columns, Mut
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
/// fulfill sort prefix columns with last row values or defaults
if (!last_row_sort_prefix.empty())
copyRowFromColumns(res_sort_prefix_columns, last_row_sort_prefix, 0);
if (!last_range_sort_prefix.empty())
copyRowFromColumns(res_sort_prefix_columns, last_range_sort_prefix, 0);
else
for (auto * sort_prefix_column : res_sort_prefix_columns)
sort_prefix_column->insertDefault();
@ -591,7 +589,6 @@ void FillingTransform::transformRange(
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
copyRowFromColumns(res_sort_prefix_columns, input_sort_prefix_columns, row_ind);
logDebug("filling_row should_insert_first", filling_row);
}
while (filling_row.next(next_row))
@ -599,7 +596,6 @@ void FillingTransform::transformRange(
interpolate(result_columns, interpolate_block);
insertFromFillingRow(res_fill_columns, res_interpolate_columns, res_other_columns, filling_row, interpolate_block);
copyRowFromColumns(res_sort_prefix_columns, input_sort_prefix_columns, row_ind);
logDebug("filling_row", filling_row);
}
copyRowFromColumns(res_fill_columns, input_fill_columns, row_ind);
@ -607,6 +603,15 @@ void FillingTransform::transformRange(
copyRowFromColumns(res_sort_prefix_columns, input_sort_prefix_columns, row_ind);
copyRowFromColumns(res_other_columns, input_other_columns, row_ind);
}
/// save sort prefix of last row in the range, it's used to generate suffix
last_range_sort_prefix.clear();
for (const auto & sort_prefix_column : input_sort_prefix_columns)
{
auto column = sort_prefix_column->cloneEmpty();
column->insertFrom(*sort_prefix_column, range_end - 1);
last_range_sort_prefix.push_back(std::move(column));
}
}
void FillingTransform::transform(Chunk & chunk)
@ -638,6 +643,11 @@ void FillingTransform::transform(Chunk & chunk)
/// if all chunks are processed, then we may need to generate suffix for the following cases:
/// (1) when all data are processed and WITH FILL .. TO is provided
/// (2) for empty result set when WITH FILL FROM .. TO is provided (see PR #30888)
/// if no data was processed, then need to initialize filling_row
if (last_row.empty())
filling_row.initFromDefaults();
if (generateSuffixIfNeeded(input.getHeader().getColumns(), result_columns))
{
size_t num_output_rows = result_columns[0]->size();
@ -724,6 +734,10 @@ void FillingTransform::transform(Chunk & chunk)
return true;
});
/// generate suffix for the previous range
if (!last_range_sort_prefix.empty())
generateSuffixIfNeeded(result_columns, res_fill_columns, res_interpolate_columns, res_sort_prefix_columns, res_other_columns);
transformRange(
input_fill_columns,
input_interpolate_columns,

View File

@ -62,8 +62,12 @@ private:
MutableColumnRawPtrs & output_other_columns);
bool generateSuffixIfNeeded(
const Columns & input_columns,
MutableColumns & result_columns);
const MutableColumns & result_columns,
MutableColumnRawPtrs res_fill_columns,
MutableColumnRawPtrs res_interpolate_columns,
MutableColumnRawPtrs res_sort_prefix_columns,
MutableColumnRawPtrs res_other_columns);
bool generateSuffixIfNeeded(const Columns & input_columns, MutableColumns & result_columns);
const SortDescription sort_description;
const SortDescription fill_description; /// Contains only columns with WITH FILL.
@ -81,6 +85,7 @@ private:
std::vector<std::pair<size_t, NameAndTypePair>> input_positions; /// positions in result columns required for actions
ExpressionActionsPtr interpolate_actions;
Columns last_row;
Columns last_range_sort_prefix;
bool all_chunks_processed = false; /// flag to determine if we have already processed all chunks
};

View File

@ -21,3 +21,54 @@ select * from ts order by sensor_id, timestamp with fill step 1;
5 1 1
5 2 0
5 3 1
drop table if exists ts;
create table ts (sensor_id UInt64, timestamp UInt64, value Float64) ENGINE=MergeTree() ORDER BY (sensor_id, timestamp);
system stop merges ts;
-- FillingTransform: 6 rows will be processed in 2 chunks with 3 rows each
insert into ts VALUES (1, 10, 1), (1, 12, 1), (3, 5, 1);
insert into ts VALUES (3, 7, 1), (5, 1, 1), (5, 3, 1);
select * from ts order by sensor_id, timestamp with fill step 1 settings max_block_size=3;
1 10 1
1 11 0
1 12 1
3 5 1
3 6 0
3 7 1
5 1 1
5 2 0
5 3 1
drop table if exists ts;
create table ts (sensor_id UInt64, timestamp UInt64, value Float64) ENGINE=MergeTree() ORDER BY (sensor_id, timestamp);
system stop merges ts;
-- FillingTransform: 6 rows will be processed in 3 chunks with 2 rows each
insert into ts VALUES (1, 10, 1), (1, 12, 1);
insert into ts VALUES (3, 5, 1), (3, 7, 1);
insert into ts VALUES (5, 1, 1), (5, 3, 1);
select * from ts order by sensor_id, timestamp with fill step 1 settings max_block_size=2;
1 10 1
1 11 0
1 12 1
3 5 1
3 6 0
3 7 1
5 1 1
5 2 0
5 3 1
select * from ts order by sensor_id, timestamp with fill from 6 to 10 step 1 interpolate (value as 9999);
1 6 9999
1 7 9999
1 8 9999
1 9 9999
1 10 1
1 12 1
3 5 1
3 6 9999
3 7 1
3 8 9999
3 9 9999
5 1 1
5 3 1
5 6 9999
5 7 9999
5 8 9999
5 9 9999

View File

@ -11,3 +11,22 @@ create table ts (sensor_id UInt64, timestamp UInt64, value Float64) ENGINE=Merge
insert into ts VALUES (1, 10, 1), (1, 12, 2), (3, 5, 1), (3, 7, 3), (5, 1, 1), (5, 3, 1);
-- FillingTransform: 6 rows will be processed in 1 chunks
select * from ts order by sensor_id, timestamp with fill step 1;
drop table if exists ts;
create table ts (sensor_id UInt64, timestamp UInt64, value Float64) ENGINE=MergeTree() ORDER BY (sensor_id, timestamp);
system stop merges ts;
-- FillingTransform: 6 rows will be processed in 2 chunks with 3 rows each
insert into ts VALUES (1, 10, 1), (1, 12, 1), (3, 5, 1);
insert into ts VALUES (3, 7, 1), (5, 1, 1), (5, 3, 1);
select * from ts order by sensor_id, timestamp with fill step 1 settings max_block_size=3;
drop table if exists ts;
create table ts (sensor_id UInt64, timestamp UInt64, value Float64) ENGINE=MergeTree() ORDER BY (sensor_id, timestamp);
system stop merges ts;
-- FillingTransform: 6 rows will be processed in 3 chunks with 2 rows each
insert into ts VALUES (1, 10, 1), (1, 12, 1);
insert into ts VALUES (3, 5, 1), (3, 7, 1);
insert into ts VALUES (5, 1, 1), (5, 3, 1);
select * from ts order by sensor_id, timestamp with fill step 1 settings max_block_size=2;
select * from ts order by sensor_id, timestamp with fill from 6 to 10 step 1 interpolate (value as 9999);