From 07b5c03c542e835ad8c1b0b905d9532b3dab4b20 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 21 Dec 2020 11:24:52 +0300 Subject: [PATCH] Better adjustment of the last mark --- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 51 ++++++++++++------- .../MergeTree/MergeTreeDataPartWriterWide.h | 7 +-- .../0_stateless/00955_test_final_mark.sql | 16 +++--- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index beb3646df17..be735104e99 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -171,8 +171,18 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm if (rows_written_in_last_mark > 0) { size_t rows_left_in_last_mark = index_granularity.getMarkRows(getCurrentMark()) - rows_written_in_last_mark; + /// Previous granularity was much bigger than our new block's + /// granularity let's adjust it, because we want add new + /// heavy-weight blocks into small old granule. if (rows_left_in_last_mark > index_granularity_for_block) - adjustLastMarkIfNeedAndFlushToDisk(); + { + /// We have already written more rows than granularity of our block. + /// adjust last mark rows and flush to disk. + if (rows_written_in_last_mark >= index_granularity_for_block) + adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark); + else /// We still can write some rows from new block into previous granule. + adjustLastMarkIfNeedAndFlushToDisk(index_granularity_for_block - rows_written_in_last_mark); + } } fillIndexGranularity(index_granularity_for_block, block.rows()); @@ -476,7 +486,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch serialize_settings.low_cardinality_use_single_dictionary_for_part = global_settings.low_cardinality_use_single_dictionary_for_part != 0; WrittenOffsetColumns offset_columns; if (rows_written_in_last_mark > 0) - adjustLastMarkIfNeedAndFlushToDisk(); + adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark); bool write_final_mark = (with_final_mark && data_written); @@ -572,7 +582,7 @@ void MergeTreeDataPartWriterWide::fillIndexGranularity(size_t index_granularity_ } -void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk() +void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk(size_t new_rows_in_last_mark) { /// We can adjust marks only if we computed granularity for blocks. /// Otherwise we cannot change granularity because it will differ from @@ -584,7 +594,7 @@ void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk() getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()), rows_written_in_last_mark, index_granularity.getMarksCount()); index_granularity.popMark(); - index_granularity.appendMark(rows_written_in_last_mark); + index_granularity.appendMark(new_rows_in_last_mark); } /// Last mark should be filled, otherwise it's a bug @@ -592,25 +602,28 @@ void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk() throw Exception(ErrorCodes::LOGICAL_ERROR, "No saved marks for last mark {} having rows offset {}, total marks {}", getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount()); - for (const auto & [name, marks] : last_non_written_marks) + if (rows_written_in_last_mark == new_rows_in_last_mark) { - for (const auto & mark : marks) - flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark())); - } + for (const auto & [name, marks] : last_non_written_marks) + { + for (const auto & mark : marks) + flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark())); + } - last_non_written_marks.clear(); + last_non_written_marks.clear(); - if (compute_granularity && settings.can_use_adaptive_granularity) - { - /// Also we add mark to each skip index because all of them - /// already accumulated all rows from current adjusting mark - for (size_t i = 0; i < skip_indices.size(); ++i) - ++skip_index_accumulated_marks[i]; + if (compute_granularity && settings.can_use_adaptive_granularity) + { + /// Also we add mark to each skip index because all of them + /// already accumulated all rows from current adjusting mark + for (size_t i = 0; i < skip_indices.size(); ++i) + ++skip_index_accumulated_marks[i]; - /// This mark completed, go further - setCurrentMark(getCurrentMark() + 1); - /// Without offset - rows_written_in_last_mark = 0; + /// This mark completed, go further + setCurrentMark(getCurrentMark() + 1); + /// Without offset + rows_written_in_last_mark = 0; + } } } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index b350e590d07..8c76c10abef 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -99,12 +99,13 @@ private: /// in our index_granularity array. void shiftCurrentMark(const Granules & granules_written); - /// Change rows in the last mark in index_granularity to rows_written_in_last_mark. - /// Flush all marks from last_non_written_marks to disk and increment current mark. + /// Change rows in the last mark in index_granularity to new_rows_in_last_mark. + /// Flush all marks from last_non_written_marks to disk and increment current mark if already written rows + /// (rows_written_in_last_granule) equal to new_rows_in_last_mark. /// /// This function used when blocks change granularity drastically and we have unfinished mark. /// Also useful to have exact amount of rows in last (non-final) mark. - void adjustLastMarkIfNeedAndFlushToDisk(); + void adjustLastMarkIfNeedAndFlushToDisk(size_t new_rows_in_last_mark); IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const; diff --git a/tests/queries/0_stateless/00955_test_final_mark.sql b/tests/queries/0_stateless/00955_test_final_mark.sql index 50ca3d008f9..e020f10b71a 100644 --- a/tests/queries/0_stateless/00955_test_final_mark.sql +++ b/tests/queries/0_stateless/00955_test_final_mark.sql @@ -18,7 +18,7 @@ INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-0 SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase(); SELECT '===test merge==='; INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']); @@ -27,7 +27,7 @@ OPTIMIZE TABLE mt_with_pk FINAL; SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase(); SELECT '===test alter==='; ALTER TABLE mt_with_pk MODIFY COLUMN y Array(String); @@ -38,7 +38,7 @@ OPTIMIZE TABLE mt_with_pk FINAL; SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase(); SELECT '===test mutation==='; ALTER TABLE mt_with_pk UPDATE w = 0 WHERE 1 SETTINGS mutations_sync = 2; @@ -58,7 +58,7 @@ OPTIMIZE TABLE mt_with_pk FINAL; SELECT COUNT(*) FROM mt_with_pk WHERE z + w > 5000; -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase(); DROP TABLE IF EXISTS mt_with_pk; @@ -119,7 +119,7 @@ INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-1 SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database = currentDatabase(); INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']); @@ -127,7 +127,7 @@ OPTIMIZE TABLE mt_without_pk FINAL; SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database = currentDatabase(); DROP TABLE IF EXISTS mt_without_pk; @@ -149,7 +149,7 @@ INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (to SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database = currentDatabase(); INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']); @@ -157,6 +157,6 @@ OPTIMIZE TABLE mt_with_small_granularity FINAL; SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database = currentDatabase(); DROP TABLE IF EXISTS mt_with_small_granularity;