Merge pull request #21976 from ClickHouse/try-fix-excessive-rows-for-collapsing-merge

CollapsingSortedAlgorithm should not return more than index_granularity rows
This commit is contained in:
alesapin 2021-04-01 16:20:28 +03:00 committed by GitHub
commit 85fbb7eccf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 97 additions and 12 deletions

View File

@ -66,14 +66,16 @@ void CollapsingSortedAlgorithm::insertRow(RowRef & row)
merged_data.insertRow(*row.all_columns, row.row_num, row.owned_chunk->getNumRows());
}
void CollapsingSortedAlgorithm::insertRows()
std::optional<Chunk> CollapsingSortedAlgorithm::insertRows()
{
if (count_positive == 0 && count_negative == 0)
{
/// No input rows have been read.
return;
return {};
}
std::optional<Chunk> res;
if (last_is_positive || count_positive != count_negative)
{
if (count_positive <= count_negative && !only_positive_sign)
@ -86,6 +88,9 @@ void CollapsingSortedAlgorithm::insertRows()
if (count_positive >= count_negative)
{
if (merged_data.hasEnoughRows())
res = merged_data.pull();
insertRow(last_positive_row);
if (out_row_sources_buf)
@ -107,10 +112,16 @@ void CollapsingSortedAlgorithm::insertRows()
out_row_sources_buf->write(
reinterpret_cast<const char *>(current_row_sources.data()),
current_row_sources.size() * sizeof(RowSourcePart));
return res;
}
IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge()
{
/// Rare case, which may happen when index_granularity is 1, but we needed to insert 2 rows inside insertRows().
if (merged_data.hasEnoughRows())
return Status(merged_data.pull());
/// Take rows in required order and put them into `merged_data`, while the rows are no more than `max_block_size`
while (queue.isValid())
{
@ -132,15 +143,14 @@ IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge()
setRowRef(last_row, current);
bool key_differs = !last_row.hasEqualSortColumnsWith(current_row);
/// if there are enough rows and the last one is calculated completely
if (key_differs && merged_data.hasEnoughRows())
return Status(merged_data.pull());
if (key_differs)
{
/// if there are enough rows and the last one is calculated completely
if (merged_data.hasEnoughRows())
return Status(merged_data.pull());
/// We write data for the previous primary key.
insertRows();
auto res = insertRows();
current_row.swap(last_row);
@ -151,6 +161,12 @@ IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge()
first_negative_pos = 0;
last_positive_pos = 0;
current_row_sources.resize(0);
/// Here we can return ready chunk.
/// Next iteration, last_row == current_row, and all the counters are zeroed.
/// So, current_row should be correctly processed.
if (res)
return Status(std::move(*res));
}
/// Initially, skip all rows. On insert, unskip "corner" rows.
@ -194,7 +210,15 @@ IMergingAlgorithm::Status CollapsingSortedAlgorithm::merge()
}
}
insertRows();
if (auto res = insertRows())
{
/// Queue is empty, and we have inserted all the rows.
/// Set counter to zero so that insertRows() will return immediately next time.
count_positive = 0;
count_negative = 0;
return Status(std::move(*res));
}
return Status(merged_data.pull(), true);
}

View File

@ -66,7 +66,11 @@ private:
void reportIncorrectData();
void insertRow(RowRef & row);
void insertRows();
/// Insert ready rows into merged_data. We may want to insert 0, 1 or 2 rows.
/// It may happen that 2 rows is going to be inserted and, but merged data has free space only for 1 row.
/// In this case, Chunk with ready is pulled from merged_data before the second insertion.
std::optional<Chunk> insertRows();
};
}

View File

@ -58,7 +58,7 @@ OPTIMIZE TABLE four_rows_per_granule FINAL;
SELECT COUNT(*) FROM four_rows_per_granule;
SELECT distinct(marks) from system.parts WHERE table = 'four_rows_per_granule' and database=currentDatabase() and active=1;
SELECT sum(marks) from system.parts WHERE table = 'four_rows_per_granule' and database=currentDatabase() and active=1;
INSERT INTO four_rows_per_granule (p, k, v1, v2, Sign) VALUES ('2018-05-15', 1, 1000, 2000, 1), ('2018-05-16', 2, 3000, 4000, 1), ('2018-05-17', 3, 5000, 6000, 1), ('2018-05-18', 4, 7000, 8000, 1);

View File

@ -0,0 +1,4 @@
-8191 8193
-8191 8193
0 2
0 2

View File

@ -0,0 +1,53 @@
DROP TABLE IF EXISTS collapsing_table;
SET optimize_on_insert = 0;
CREATE TABLE collapsing_table
(
key UInt64,
value UInt64,
Sign Int8
)
ENGINE = CollapsingMergeTree(Sign)
ORDER BY key
SETTINGS
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0,
min_bytes_for_wide_part = 0;
INSERT INTO collapsing_table SELECT if(number == 8192, 8191, number), 1, if(number == 8192, +1, -1) FROM numbers(8193);
SELECT sum(Sign), count() from collapsing_table;
OPTIMIZE TABLE collapsing_table FINAL;
SELECT sum(Sign), count() from collapsing_table;
DROP TABLE IF EXISTS collapsing_table;
DROP TABLE IF EXISTS collapsing_suspicious_granularity;
CREATE TABLE collapsing_suspicious_granularity
(
key UInt64,
value UInt64,
Sign Int8
)
ENGINE = CollapsingMergeTree(Sign)
ORDER BY key
SETTINGS
vertical_merge_algorithm_min_rows_to_activate=0,
vertical_merge_algorithm_min_columns_to_activate=0,
min_bytes_for_wide_part = 0,
index_granularity = 1;
INSERT INTO collapsing_suspicious_granularity VALUES (1, 1, -1) (1, 1, 1);
SELECT sum(Sign), count() from collapsing_suspicious_granularity;
OPTIMIZE TABLE collapsing_suspicious_granularity FINAL;
SELECT sum(Sign), count() from collapsing_suspicious_granularity;
DROP TABLE IF EXISTS collapsing_suspicious_granularity;