diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp b/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp index 76e84cb620a..9928583df80 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp @@ -71,8 +71,7 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter( const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & settings_, - const MergeTreeIndexGranularity & index_granularity_, - bool need_finish_last_granule_) + const MergeTreeIndexGranularity & index_granularity_) : disk(std::move(disk_)) , part_path(part_path_) , storage(storage_) @@ -84,7 +83,6 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter( , settings(settings_) , compute_granularity(index_granularity.empty()) , with_final_mark(storage.getSettings()->write_final_mark && settings.can_use_adaptive_granularity) - , need_finish_last_granule(need_finish_last_granule_) { if (settings.blocks_are_granules_size && !index_granularity.empty()) throw Exception("Can't take information about index granularity from blocks, when non empty index_granularity array specified", ErrorCodes::LOGICAL_ERROR); @@ -95,15 +93,15 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter( IMergeTreeDataPartWriter::~IMergeTreeDataPartWriter() = default; -static void fillIndexGranularityImpl( +/// Implemetation is splitted into static functions for ability +/// of making unit tests without creation instance of IMergeTreeDataPartWriter, +/// which requires a lot of dependencies and access to filesystem. +static size_t computeIndexGranularityImpl( const Block & block, size_t index_granularity_bytes, size_t fixed_index_granularity_rows, bool blocks_are_granules, - size_t index_offset, - MergeTreeIndexGranularity & index_granularity, - bool can_use_adaptive_index_granularity, - bool need_finish_last_granule = false) + bool can_use_adaptive_index_granularity) { size_t rows_in_block = block.rows(); size_t index_granularity_for_block; @@ -130,43 +128,37 @@ static void fillIndexGranularityImpl( /// We should be less or equal than fixed index granularity index_granularity_for_block = std::min(fixed_index_granularity_rows, index_granularity_for_block); - - size_t current_row; - for (current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block) - { - size_t rows_left_in_block = rows_in_block - current_row; - - /// Try to extend last granule if it's needed and block is large enough - /// or it shouldn't be first in granule (index_offset != 0). - if (need_finish_last_granule && rows_left_in_block < index_granularity_for_block - && (rows_in_block >= index_granularity_for_block || index_offset != 0)) - { - // If enough rows are left, create a new granule. Otherwise, extend previous granule. - // So, real size of granule differs from index_granularity_for_block not more than 50%. - if (rows_left_in_block * 2 >= index_granularity_for_block) - index_granularity.appendMark(rows_left_in_block); - else - index_granularity.addRowsToLastMark(rows_left_in_block); - } - else - { - index_granularity.appendMark(index_granularity_for_block); - } - } + return index_granularity_for_block; } -void IMergeTreeDataPartWriter::fillIndexGranularity(const Block & block) +static void fillIndexGranularityImpl( + MergeTreeIndexGranularity & index_granularity, + size_t index_offset, + size_t index_granularity_for_block, + size_t rows_in_block) +{ + for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block) + index_granularity.appendMark(index_granularity_for_block); +} + +size_t IMergeTreeDataPartWriter::computeIndexGranularity(const Block & block) { const auto storage_settings = storage.getSettings(); + return computeIndexGranularityImpl( + block, + storage_settings->index_granularity_bytes, + storage_settings->index_granularity, + settings.blocks_are_granules_size, + settings.can_use_adaptive_granularity); +} + +void IMergeTreeDataPartWriter::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) +{ fillIndexGranularityImpl( - block, - storage_settings->index_granularity_bytes, - storage_settings->index_granularity, - settings.blocks_are_granules_size, - index_offset, index_granularity, - settings.can_use_adaptive_granularity, - need_finish_last_granule); + index_offset, + index_granularity_for_block, + rows_in_block); } void IMergeTreeDataPartWriter::initPrimaryIndex() @@ -225,21 +217,22 @@ void IMergeTreeDataPartWriter::calculateAndSerializePrimaryIndex(const Block & p /// Write index. The index contains Primary Key value for each `index_granularity` row. - for (size_t i = index_offset; i < rows;) + size_t current_row = index_offset; + size_t total_marks = index_granularity.getMarksCount(); + + while (index_mark < total_marks && current_row < rows) { if (storage.hasPrimaryKey()) { for (size_t j = 0; j < primary_columns_num; ++j) { const auto & primary_column = primary_index_block.getByPosition(j); - index_columns[j]->insertFrom(*primary_column.column, i); - primary_column.type->serializeBinary(*primary_column.column, i, *index_stream); + index_columns[j]->insertFrom(*primary_column.column, current_row); + primary_column.type->serializeBinary(*primary_column.column, current_row, *index_stream); } } - i += index_granularity.getMarkRows(current_mark++); - if (current_mark >= index_granularity.getMarksCount()) - break; + current_row += index_granularity.getMarkRows(index_mark++); } /// store last index row to write final mark at the end of column diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index 9679de8297f..142ad6ca14e 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -69,8 +69,7 @@ public: const String & marks_file_extension, const CompressionCodecPtr & default_codec, const MergeTreeWriterSettings & settings, - const MergeTreeIndexGranularity & index_granularity, - bool need_finish_last_granule); + const MergeTreeIndexGranularity & index_granularity); virtual ~IMergeTreeDataPartWriter(); @@ -87,9 +86,6 @@ public: /// calling calculations of primary and skip indices. void next(); - /// Count index_granularity for block and store in `index_granularity` - void fillIndexGranularity(const Block & block); - const MergeTreeIndexGranularity & getIndexGranularity() const { return index_granularity; } Columns releaseIndexColumns() @@ -112,6 +108,13 @@ public: void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & checksums); protected: + /// Count index_granularity for block and store in `index_granularity` + size_t computeIndexGranularity(const Block & block); + virtual void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block); + + size_t getCurrentMark() const { return current_mark; } + size_t getIndexOffset() const { return index_offset; } + using SerializationState = IDataType::SerializeBinaryBulkStatePtr; using SerializationStates = std::unordered_map; @@ -131,12 +134,6 @@ protected: bool compute_granularity; bool with_final_mark; - bool need_finish_last_granule; - - size_t current_mark = 0; - - /// The offset to the first row of the block for which you want to write the index. - size_t index_offset = 0; size_t next_mark = 0; size_t next_index_offset = 0; @@ -163,6 +160,14 @@ protected: /// To correctly write Nested elements column-by-column. WrittenOffsetColumns * written_offset_columns = nullptr; + +private: + /// Data is already written up to this mark. + size_t current_mark = 0; + /// The offset to the first row of the block for which you want to write the index. + size_t index_offset = 0; + /// Index is already serialized up to this mark. + size_t index_mark = 0; }; } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index e33d4a97cac..2f708ac6954 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -18,7 +18,7 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact( : IMergeTreeDataPartWriter(disk_, part_path_, storage_, columns_list_, indices_to_recalc_, marks_file_extension_, - default_codec_, settings_, index_granularity_, true) + default_codec_, settings_, index_granularity_) { using DataPart = MergeTreeDataPartCompact; String data_file_name = DataPart::DATA_FILE_NAME; @@ -42,7 +42,10 @@ void MergeTreeDataPartWriterCompact::write( /// if it's unknown (in case of insert data or horizontal merge, /// but not in case of vertical merge) if (compute_granularity) - fillIndexGranularity(block); + { + size_t index_granularity_for_block = computeIndexGranularity(block); + fillIndexGranularity(index_granularity_for_block, block.rows()); + } Block result_block; @@ -88,7 +91,7 @@ void MergeTreeDataPartWriterCompact::write( void MergeTreeDataPartWriterCompact::writeBlock(const Block & block) { size_t total_rows = block.rows(); - size_t from_mark = current_mark; + size_t from_mark = getCurrentMark(); size_t current_row = 0; while (current_row < total_rows) @@ -163,6 +166,44 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart: stream.reset(); } +static void fillIndexGranularityImpl( + MergeTreeIndexGranularity & index_granularity, + size_t index_offset, + size_t index_granularity_for_block, + size_t rows_in_block) +{ + for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block) + { + size_t rows_left_in_block = rows_in_block - current_row; + + /// Try to extend last granule if block is large enough + /// or it isn't first in granule (index_offset != 0). + if (rows_left_in_block < index_granularity_for_block && + (rows_in_block >= index_granularity_for_block || index_offset != 0)) + { + // If enough rows are left, create a new granule. Otherwise, extend previous granule. + // So, real size of granule differs from index_granularity_for_block not more than 50%. + if (rows_left_in_block * 2 >= index_granularity_for_block) + index_granularity.appendMark(rows_left_in_block); + else + index_granularity.addRowsToLastMark(rows_left_in_block); + } + else + { + index_granularity.appendMark(index_granularity_for_block); + } + } +} + +void MergeTreeDataPartWriterCompact::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) +{ + fillIndexGranularityImpl( + index_granularity, + getIndexOffset(), + index_granularity_for_block, + rows_in_block); +} + void MergeTreeDataPartWriterCompact::ColumnsBuffer::add(MutableColumns && columns) { if (accumulated_columns.empty()) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h index 0aff55588aa..45d72d90b1e 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h @@ -23,6 +23,9 @@ public: void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) override; +protected: + void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) override; + private: /// Write single granule of one column (rows between 2 marks) void writeColumnSingleGranule( diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 1e5640b4e23..e5eececacfb 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -24,7 +24,7 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide( const MergeTreeIndexGranularity & index_granularity_) : IMergeTreeDataPartWriter(disk_, part_path_, storage_, columns_list_, indices_to_recalc_, - marks_file_extension_, default_codec_, settings_, index_granularity_, false) + marks_file_extension_, default_codec_, settings_, index_granularity_) { const auto & columns = storage.getColumns(); for (const auto & it : columns_list) @@ -85,7 +85,10 @@ void MergeTreeDataPartWriterWide::write(const Block & block, /// if it's unknown (in case of insert data or horizontal merge, /// but not in case of vertical merge) if (compute_granularity) - fillIndexGranularity(block); + { + size_t index_granularity_for_block = computeIndexGranularity(block); + fillIndexGranularity(index_granularity_for_block, block.rows()); + } auto offset_columns = written_offset_columns ? *written_offset_columns : WrittenOffsetColumns{}; @@ -206,17 +209,18 @@ void MergeTreeDataPartWriterWide::writeColumn( size_t total_rows = column.size(); size_t current_row = 0; - size_t current_column_mark = current_mark; + size_t current_column_mark = getCurrentMark(); + size_t current_index_offset = getIndexOffset(); while (current_row < total_rows) { size_t rows_to_write; bool write_marks = true; /// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows. - if (current_row == 0 && index_offset != 0) + if (current_row == 0 && current_index_offset != 0) { write_marks = false; - rows_to_write = index_offset; + rows_to_write = current_index_offset; } else { diff --git a/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity.cpp b/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity.cpp index 0579fd05f5d..7488b6ea44a 100644 --- a/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity.cpp +++ b/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity.cpp @@ -25,14 +25,16 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests) EXPECT_EQ(block1.bytes(), 80); { /// Granularity bytes are not set. Take default index_granularity. MergeTreeIndexGranularity index_granularity; - fillIndexGranularityImpl(block1, 0, 100, false, 0, index_granularity, false); + auto granularity = computeIndexGranularityImpl(block1, 0, 100, false, false); + fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows()); EXPECT_EQ(index_granularity.getMarksCount(), 1); EXPECT_EQ(index_granularity.getMarkRows(0), 100); } { /// Granule size is less than block size. Block contains multiple granules. MergeTreeIndexGranularity index_granularity; - fillIndexGranularityImpl(block1, 16, 100, false, 0, index_granularity, true); + auto granularity = computeIndexGranularityImpl(block1, 16, 100, false, true); + fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows()); EXPECT_EQ(index_granularity.getMarksCount(), 5); /// First granule with 8 rows, and second with 1 row for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) EXPECT_EQ(index_granularity.getMarkRows(i), 2); @@ -41,7 +43,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests) { /// Granule size is more than block size. Whole block (and maybe more) can be placed in single granule. MergeTreeIndexGranularity index_granularity; - fillIndexGranularityImpl(block1, 512, 100, false, 0, index_granularity, true); + auto granularity = computeIndexGranularityImpl(block1, 512, 100, false, true); + fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows()); EXPECT_EQ(index_granularity.getMarksCount(), 1); for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) EXPECT_EQ(index_granularity.getMarkRows(i), 64); @@ -50,7 +53,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests) { /// Blocks with granule size MergeTreeIndexGranularity index_granularity; - fillIndexGranularityImpl(block1, 1, 100, true, 0, index_granularity, true); + auto granularity = computeIndexGranularityImpl(block1, 1, 100, true, true); + fillIndexGranularityImpl(index_granularity, 0, granularity, block1.rows()); EXPECT_EQ(index_granularity.getMarksCount(), 1); for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) EXPECT_EQ(index_granularity.getMarkRows(i), block1.rows()); @@ -58,7 +62,8 @@ TEST(AdaptiveIndexGranularity, FillGranularityToyTests) { /// Shift in index offset MergeTreeIndexGranularity index_granularity; - fillIndexGranularityImpl(block1, 16, 100, false, 6, index_granularity, true); + auto granularity = computeIndexGranularityImpl(block1, 16, 100, false, true); + fillIndexGranularityImpl(index_granularity, 6, granularity, block1.rows()); EXPECT_EQ(index_granularity.getMarksCount(), 2); for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) EXPECT_EQ(index_granularity.getMarkRows(i), 2); @@ -74,7 +79,10 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks) auto block3 = getBlockWithSize(65536, 8); MergeTreeIndexGranularity index_granularity; for (const auto & block : {block1, block2, block3}) - fillIndexGranularityImpl(block, 1024, 8192, false, 0, index_granularity, true); + { + auto granularity = computeIndexGranularityImpl(block, 1024, 8192, false, true); + fillIndexGranularityImpl(index_granularity, 0, granularity, block.rows()); + } EXPECT_EQ(index_granularity.getMarksCount(), 192); /// granules for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) @@ -87,7 +95,10 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks) EXPECT_EQ(block1.rows() + block2.rows() + block3.rows(), 3136); MergeTreeIndexGranularity index_granularity; for (const auto & block : {block1, block2, block3}) - fillIndexGranularityImpl(block, 1024, 8192, false, 0, index_granularity, true); + { + auto granularity = computeIndexGranularityImpl(block, 1024, 8192, false, true); + fillIndexGranularityImpl(index_granularity, 0, granularity, block.rows()); + } EXPECT_EQ(index_granularity.getMarksCount(), 98); /// granules for (size_t i = 0; i < index_granularity.getMarksCount(); ++i) @@ -105,7 +116,8 @@ TEST(AdaptiveIndexGranularity, FillGranularitySequenceOfBlocks) size_t index_offset = 0; for (const auto & block : {block1, block2, block3}) { - fillIndexGranularityImpl(block, 16384, 8192, false, index_offset, index_granularity, true); + auto granularity = computeIndexGranularityImpl(block, 16384, 8192, false, true); + fillIndexGranularityImpl(index_granularity, index_offset, granularity, block.rows()); index_offset = index_granularity.getLastMarkRows() - block.rows(); } EXPECT_EQ(index_granularity.getMarksCount(), 1); /// granules diff --git a/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity_compact_parts.cpp b/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity_compact_parts.cpp new file mode 100644 index 00000000000..f87293dcd5d --- /dev/null +++ b/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity_compact_parts.cpp @@ -0,0 +1,81 @@ +#include +#include +#include + +// I know that inclusion of .cpp is not good at all +#include + +using namespace DB; + +TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks) +{ + { /// Three blocks in one granule. + size_t rows = 8; + size_t granularity = 32; + + MergeTreeIndexGranularity index_granularity; + size_t index_offset = 0; + size_t rows_written = 0; + for (size_t i = 0; i < 3; ++i) + { + fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows); + rows_written += rows; + index_offset = granularity - rows_written; + } + + EXPECT_EQ(index_granularity.getMarksCount(), 1); /// granules + /// It's ok, that granularity is higher than actual number of row. + /// It will be corrected in CompactWriter. + EXPECT_EQ(index_granularity.getMarkRows(0), granularity); + } + + { /// Granule is extended with small block + size_t rows1 = 30; + size_t rows2 = 8; + size_t granularity = 32; + + MergeTreeIndexGranularity index_granularity; + size_t index_offset = 0; + + fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows1); + index_offset = granularity - rows1; + + fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows2); + + EXPECT_EQ(index_granularity.getMarksCount(), 1); + EXPECT_EQ(index_granularity.getMarkRows(0), rows1 + rows2); + } + + { /// New granule is created with large block; + size_t rows1 = 30; + size_t rows2 = 25; + size_t granularity = 32; + + MergeTreeIndexGranularity index_granularity; + size_t index_offset = 0; + + fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows1); + index_offset = granularity - rows1; + + fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows2); + + EXPECT_EQ(index_granularity.getMarksCount(), 2); + EXPECT_EQ(index_granularity.getMarkRows(0), granularity); + EXPECT_EQ(index_granularity.getMarkRows(1), rows1 + rows2 - granularity); + } + + { /// Three large blocks + size_t rows = 40; + size_t granularity = 32; + + MergeTreeIndexGranularity index_granularity; + size_t index_offset = 0; + + for (size_t i = 0; i < 3; ++i) + fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows); + + EXPECT_EQ(index_granularity.getMarksCount(), 3); + for (size_t i = 0; i < 3; ++i) + EXPECT_EQ(index_granularity.getMarkRows(i), rows); + } +} diff --git a/tests/integration/test_polymorphic_parts/test.py b/tests/integration/test_polymorphic_parts/test.py index 55f1237cd1d..6af9ffe4c8d 100644 --- a/tests/integration/test_polymorphic_parts/test.py +++ b/tests/integration/test_polymorphic_parts/test.py @@ -2,6 +2,8 @@ import time import pytest import random import string +import os +import struct from helpers.test_tools import TSV from helpers.test_tools import assert_eq_with_retry @@ -260,3 +262,24 @@ def test_polymorphic_parts_non_adaptive(start_cluster): "WHERE table = 'non_adaptive_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV("Wide\t2\n") assert node1.contains_in_log(" default.non_adaptive_table: Table can't create parts with adaptive granularity") + + +def test_polymorphic_parts_index(start_cluster): + node1.query(''' + CREATE TABLE index_compact(a UInt32, s String) + ENGINE = MergeTree ORDER BY a + SETTINGS min_rows_for_wide_part = 1000, index_granularity = 128, merge_max_block_size = 100''') + + node1.query("INSERT INTO index_compact SELECT number, toString(number) FROM numbers(100)") + node1.query("INSERT INTO index_compact SELECT number, toString(number) FROM numbers(30)") + node1.query("OPTIMIZE TABLE index_compact FINAL") + + assert node1.query("SELECT part_type FROM system.parts WHERE table = 'index_compact' AND active") == "Compact\n" + assert node1.query("SELECT marks FROM system.parts WHERE table = 'index_compact' AND active") == "2\n" + + index_path = os.path.join(node1.path, "database/data/default/index_compact/all_1_2_1/primary.idx") + f = open(index_path, 'rb') + + assert os.path.getsize(index_path) == 8 + assert struct.unpack('I', f.read(4))[0] == 0 + assert struct.unpack('I', f.read(4))[0] == 99