Merge pull request #17120 from ClickHouse/fix_granularity_on_block_borders

Fix index granularity calculation on block borders
This commit is contained in:
alesapin 2020-11-19 18:36:07 +03:00 committed by GitHub
commit 2623d35f68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 135 additions and 27 deletions

View File

@ -44,6 +44,29 @@ void IMergeTreeDataPartWriter::next()
index_offset = next_index_offset;
}
bool IMergeTreeDataPartWriter::adjustLastUnfinishedMark(size_t new_block_index_granularity)
{
/// If amount of rest rows in the last granule more then granularity of the new block
/// than finish it.
if (!index_granularity.empty() && index_offset > new_block_index_granularity)
{
size_t already_written_rows_in_last_granule = index_granularity.getLastMarkRows() - index_offset;
/// We can still write some rows to the last granule
if (already_written_rows_in_last_granule < new_block_index_granularity)
{
index_granularity.setLastMarkRows(new_block_index_granularity);
index_offset = new_block_index_granularity - already_written_rows_in_last_granule;
}
else /// Our last granule is already full, let's start the new one
{
index_granularity.setLastMarkRows(already_written_rows_in_last_granule);
index_offset = 0;
}
return true;
}
return false;
}
IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default;
}

View File

@ -62,28 +62,41 @@ public:
protected:
size_t getCurrentMark() const { return current_mark; }
size_t getIndexOffset() const { return index_offset; }
/// Finishes our current unfinished mark if we have already written more rows for it
/// than granularity in the new block. Return true if last mark actually was adjusted.
/// Example:
/// __|________|___. <- previous block with granularity 8 and last unfinished mark with 3 rows
/// new_block_index_granularity = 2, so
/// __|________|___|__|__|__|
/// ^ finish last unfinished mark, new marks will have granularity 2
bool adjustLastUnfinishedMark(size_t new_block_index_granularity);
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
using SerializationStates = std::unordered_map<String, SerializationState>;
MergeTreeData::DataPartPtr data_part;
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
NamesAndTypesList columns_list;
MergeTreeIndices skip_indices;
const StorageMetadataPtr metadata_snapshot;
const NamesAndTypesList columns_list;
const MergeTreeIndices skip_indices;
MergeTreeIndexGranularity index_granularity;
MergeTreeWriterSettings settings;
bool with_final_mark;
const MergeTreeWriterSettings settings;
const bool with_final_mark;
size_t next_mark = 0;
size_t next_index_offset = 0;
/// When we were writing fresh block granularity of the last mark was adjusted
/// See adjustLastUnfinishedMark
bool last_granule_was_adjusted = false;
MutableColumns index_columns;
private:
/// Data is already written up to this mark.
size_t current_mark = 0;
/// The offset to the first row of the block for which you want to write the index.
/// Or how many rows we have to write for this last unfinished mark.
size_t index_offset = 0;
};

View File

@ -184,7 +184,8 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices()
default_codec, settings.max_compress_block_size,
0, settings.aio_threshold));
skip_indices_aggregators.push_back(index_helper->createIndexAggregator());
skip_index_filling.push_back(0);
marks_in_skip_index_aggregator.push_back(0);
rows_in_skip_index_aggregator_last_mark.push_back(0);
}
skip_indices_initialized = true;
@ -256,9 +257,11 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
skip_index_current_data_mark = skip_index_data_mark;
while (prev_pos < rows)
{
bool new_block_started = prev_pos == 0;
UInt64 limit = 0;
size_t current_index_offset = getIndexOffset();
if (prev_pos == 0 && current_index_offset != 0)
/// We start new block, but have an offset from previous one
if (new_block_started && current_index_offset != 0)
{
limit = current_index_offset;
}
@ -270,10 +273,15 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
else
{
limit = index_granularity.getMarkRows(skip_index_current_data_mark);
/// We just started new block serialization but last unfinished mark was shrinked to it's current_size
/// it may happen that we have already aggregated current_size of rows of more for skip_index, but not flushed it to disk
/// because previous granule size was bigger. So do it here.
if (new_block_started && last_granule_was_adjusted && rows_in_skip_index_aggregator_last_mark[i] >= limit)
accountMarkForSkipIdxAndFlushIfNeeded(i);
if (skip_indices_aggregators[i]->empty())
{
skip_indices_aggregators[i] = index_helper->createIndexAggregator();
skip_index_filling[i] = 0;
if (stream.compressed.offset() >= settings.min_compress_block_size)
stream.compressed.next();
@ -285,24 +293,19 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
if (settings.can_use_adaptive_granularity)
writeIntBinary(1UL, stream.marks);
}
/// this mark is aggregated, go to the next one
skip_index_current_data_mark++;
}
size_t pos = prev_pos;
skip_indices_aggregators[i]->update(skip_indexes_block, &pos, limit);
rows_in_skip_index_aggregator_last_mark[i] = (pos - prev_pos);
if (pos == prev_pos + limit)
{
++skip_index_filling[i];
/// write index if it is filled
if (skip_index_filling[i] == index_helper->index.granularity)
{
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
skip_index_filling[i] = 0;
}
}
/// We just aggregated all rows in current mark, add new mark to skip_index marks counter
/// and flush on disk if we already aggregated required amount of marks.
if (rows_in_skip_index_aggregator_last_mark[i] == limit)
accountMarkForSkipIdxAndFlushIfNeeded(i);
prev_pos = pos;
}
}
@ -360,7 +363,21 @@ void MergeTreeDataPartWriterOnDisk::finishSkipIndicesSerialization(
skip_indices_streams.clear();
skip_indices_aggregators.clear();
skip_index_filling.clear();
marks_in_skip_index_aggregator.clear();
rows_in_skip_index_aggregator_last_mark.clear();
}
void MergeTreeDataPartWriterOnDisk::accountMarkForSkipIdxAndFlushIfNeeded(size_t skip_index_pos)
{
++marks_in_skip_index_aggregator[skip_index_pos];
/// write index if it is filled
if (marks_in_skip_index_aggregator[skip_index_pos] == skip_indices[skip_index_pos]->index.granularity)
{
skip_indices_aggregators[skip_index_pos]->getGranuleAndReset()->serializeBinary(skip_indices_streams[skip_index_pos]->compressed);
marks_in_skip_index_aggregator[skip_index_pos] = 0;
rows_in_skip_index_aggregator_last_mark[skip_index_pos] = 0;
}
}
}

View File

@ -97,8 +97,7 @@ protected:
const String marks_file_extension;
CompressionCodecPtr default_codec;
bool compute_granularity;
bool need_finish_last_granule;
const bool compute_granularity;
/// Number of marsk in data from which skip indices have to start
/// aggregation. I.e. it's data mark number, not skip indices mark.
@ -106,7 +105,10 @@ protected:
std::vector<StreamPtr> skip_indices_streams;
MergeTreeIndexAggregators skip_indices_aggregators;
std::vector<size_t> skip_index_filling;
/// Amount of marks currently serialized in skip index aggregator
std::vector<size_t> marks_in_skip_index_aggregator;
/// Amount of rows currently serialized in skip index aggregator for last mark
std::vector<size_t> rows_in_skip_index_aggregator_last_mark;
std::unique_ptr<WriteBufferFromFileBase> index_file_stream;
std::unique_ptr<HashingWriteBuffer> index_stream;
@ -125,6 +127,11 @@ protected:
private:
/// Index is already serialized up to this mark.
size_t index_mark = 0;
/// Increment corresponding marks_in_skip_index_aggregator[skip_index_pos]
/// value and flush skip_indices_streams[skip_index_pos] to disk if we have
/// aggregated enough marks
void accountMarkForSkipIdxAndFlushIfNeeded(size_t skip_index_pos);
};
}

View File

@ -95,6 +95,9 @@ void MergeTreeDataPartWriterWide::write(const Block & block,
if (compute_granularity)
{
size_t index_granularity_for_block = computeIndexGranularity(block);
/// Finish last unfinished mark rows it it's required
last_granule_was_adjusted = adjustLastUnfinishedMark(index_granularity_for_block);
/// Fill index granularity with granules of new size
fillIndexGranularity(index_granularity_for_block, block.rows());
}

View File

@ -55,6 +55,17 @@ void MergeTreeIndexGranularity::addRowsToLastMark(size_t rows_count)
marks_rows_partial_sums.back() += rows_count;
}
void MergeTreeIndexGranularity::setLastMarkRows(size_t rows_count)
{
if (marks_rows_partial_sums.empty())
marks_rows_partial_sums.push_back(rows_count);
else
{
marks_rows_partial_sums.back() -= getLastMarkRows();
marks_rows_partial_sums.back() += rows_count;
}
}
void MergeTreeIndexGranularity::popMark()
{
if (!marks_rows_partial_sums.empty())

View File

@ -98,6 +98,10 @@ public:
/// Extends last mark by rows_count.
void addRowsToLastMark(size_t rows_count);
/// Set amount of rows to last mark
/// (add new mark if new have nothing)
void setLastMarkRows(size_t rows_count);
/// Drops last mark if any exists.
void popMark();

View File

@ -119,7 +119,7 @@ INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-1
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database=currentDatabase();
INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
@ -127,7 +127,7 @@ OPTIMIZE TABLE mt_without_pk FINAL;
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database=currentDatabase();
DROP TABLE IF EXISTS mt_without_pk;
@ -149,7 +149,7 @@ INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (to
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database=currentDatabase();
INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
@ -157,6 +157,6 @@ OPTIMIZE TABLE mt_with_small_granularity FINAL;
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database=currentDatabase();
DROP TABLE IF EXISTS mt_with_small_granularity;

View File

@ -0,0 +1,2 @@
849
102400

View File

@ -0,0 +1,28 @@
DROP TABLE IF EXISTS adaptive_table;
--- If granularity of consequent blocks differs a lot, then adaptive
--- granularity will adjust amout of marks correctly. Data for test empirically
--- derived, it's quite hard to get good parameters.
CREATE TABLE adaptive_table(
key UInt64,
value String
) ENGINE MergeTree()
ORDER BY key
SETTINGS index_granularity_bytes=1048576, min_bytes_for_wide_part = 0, enable_vertical_merge_algorithm = 0;
SET max_block_size=900;
-- There are about 900 marks for our settings.
INSERT INTO adaptive_table SELECT number, if(number > 700, randomPrintableASCII(102400), randomPrintableASCII(1)) FROM numbers(10000);
OPTIMIZE TABLE adaptive_table FINAL;
SELECT marks FROM system.parts WHERE table = 'adaptive_table' and database=currentDatabase() and active;
-- If we have computed granularity incorrectly than we will exceed this limit.
SET max_memory_usage='30M';
SELECT max(length(value)) FROM adaptive_table;
DROP TABLE IF EXISTS adaptive_table;