Better adjustment of the last mark

This commit is contained in:
alesapin 2020-12-21 11:24:52 +03:00
parent 23eebb3f90
commit 07b5c03c54
3 changed files with 44 additions and 30 deletions

View File

@ -171,8 +171,18 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
if (rows_written_in_last_mark > 0)
{
size_t rows_left_in_last_mark = index_granularity.getMarkRows(getCurrentMark()) - rows_written_in_last_mark;
/// Previous granularity was much bigger than our new block's
/// granularity let's adjust it, because we want add new
/// heavy-weight blocks into small old granule.
if (rows_left_in_last_mark > index_granularity_for_block)
adjustLastMarkIfNeedAndFlushToDisk();
{
/// We have already written more rows than granularity of our block.
/// adjust last mark rows and flush to disk.
if (rows_written_in_last_mark >= index_granularity_for_block)
adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark);
else /// We still can write some rows from new block into previous granule.
adjustLastMarkIfNeedAndFlushToDisk(index_granularity_for_block - rows_written_in_last_mark);
}
}
fillIndexGranularity(index_granularity_for_block, block.rows());
@ -476,7 +486,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
serialize_settings.low_cardinality_use_single_dictionary_for_part = global_settings.low_cardinality_use_single_dictionary_for_part != 0;
WrittenOffsetColumns offset_columns;
if (rows_written_in_last_mark > 0)
adjustLastMarkIfNeedAndFlushToDisk();
adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark);
bool write_final_mark = (with_final_mark && data_written);
@ -572,7 +582,7 @@ void MergeTreeDataPartWriterWide::fillIndexGranularity(size_t index_granularity_
}
void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk()
void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk(size_t new_rows_in_last_mark)
{
/// We can adjust marks only if we computed granularity for blocks.
/// Otherwise we cannot change granularity because it will differ from
@ -584,7 +594,7 @@ void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk()
getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()), rows_written_in_last_mark, index_granularity.getMarksCount());
index_granularity.popMark();
index_granularity.appendMark(rows_written_in_last_mark);
index_granularity.appendMark(new_rows_in_last_mark);
}
/// Last mark should be filled, otherwise it's a bug
@ -592,25 +602,28 @@ void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk()
throw Exception(ErrorCodes::LOGICAL_ERROR, "No saved marks for last mark {} having rows offset {}, total marks {}",
getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount());
for (const auto & [name, marks] : last_non_written_marks)
if (rows_written_in_last_mark == new_rows_in_last_mark)
{
for (const auto & mark : marks)
flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark()));
}
for (const auto & [name, marks] : last_non_written_marks)
{
for (const auto & mark : marks)
flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark()));
}
last_non_written_marks.clear();
last_non_written_marks.clear();
if (compute_granularity && settings.can_use_adaptive_granularity)
{
/// Also we add mark to each skip index because all of them
/// already accumulated all rows from current adjusting mark
for (size_t i = 0; i < skip_indices.size(); ++i)
++skip_index_accumulated_marks[i];
if (compute_granularity && settings.can_use_adaptive_granularity)
{
/// Also we add mark to each skip index because all of them
/// already accumulated all rows from current adjusting mark
for (size_t i = 0; i < skip_indices.size(); ++i)
++skip_index_accumulated_marks[i];
/// This mark completed, go further
setCurrentMark(getCurrentMark() + 1);
/// Without offset
rows_written_in_last_mark = 0;
/// This mark completed, go further
setCurrentMark(getCurrentMark() + 1);
/// Without offset
rows_written_in_last_mark = 0;
}
}
}

View File

@ -99,12 +99,13 @@ private:
/// in our index_granularity array.
void shiftCurrentMark(const Granules & granules_written);
/// Change rows in the last mark in index_granularity to rows_written_in_last_mark.
/// Flush all marks from last_non_written_marks to disk and increment current mark.
/// Change rows in the last mark in index_granularity to new_rows_in_last_mark.
/// Flush all marks from last_non_written_marks to disk and increment current mark if already written rows
/// (rows_written_in_last_granule) equal to new_rows_in_last_mark.
///
/// This function used when blocks change granularity drastically and we have unfinished mark.
/// Also useful to have exact amount of rows in last (non-final) mark.
void adjustLastMarkIfNeedAndFlushToDisk();
void adjustLastMarkIfNeedAndFlushToDisk(size_t new_rows_in_last_mark);
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const;

View File

@ -18,7 +18,7 @@ INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-0
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
SELECT '===test merge===';
INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
@ -27,7 +27,7 @@ OPTIMIZE TABLE mt_with_pk FINAL;
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
SELECT '===test alter===';
ALTER TABLE mt_with_pk MODIFY COLUMN y Array(String);
@ -38,7 +38,7 @@ OPTIMIZE TABLE mt_with_pk FINAL;
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
SELECT '===test mutation===';
ALTER TABLE mt_with_pk UPDATE w = 0 WHERE 1 SETTINGS mutations_sync = 2;
@ -58,7 +58,7 @@ OPTIMIZE TABLE mt_with_pk FINAL;
SELECT COUNT(*) FROM mt_with_pk WHERE z + w > 5000;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
DROP TABLE IF EXISTS mt_with_pk;
@ -119,7 +119,7 @@ INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-1
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database = currentDatabase();
INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
@ -127,7 +127,7 @@ OPTIMIZE TABLE mt_without_pk FINAL;
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database = currentDatabase();
DROP TABLE IF EXISTS mt_without_pk;
@ -149,7 +149,7 @@ INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (to
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database = currentDatabase();
INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
@ -157,6 +157,6 @@ OPTIMIZE TABLE mt_with_small_granularity FINAL;
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database = currentDatabase();
DROP TABLE IF EXISTS mt_with_small_granularity;