diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp b/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp index 76291e09d58..597b5a94d8a 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp @@ -217,21 +217,22 @@ void IMergeTreeDataPartWriter::calculateAndSerializePrimaryIndex(const Block & p /// Write index. The index contains Primary Key value for each `index_granularity` row. - for (size_t i = index_offset; i < rows;) + size_t current_row = index_offset; + size_t total_marks = index_granularity.getMarksCount(); + + while (index_mark < total_marks && current_row < rows) { if (storage.hasPrimaryKey()) { for (size_t j = 0; j < primary_columns_num; ++j) { const auto & primary_column = primary_index_block.getByPosition(j); - index_columns[j]->insertFrom(*primary_column.column, i); - primary_column.type->serializeBinary(*primary_column.column, i, *index_stream); + index_columns[j]->insertFrom(*primary_column.column, current_row); + primary_column.type->serializeBinary(*primary_column.column, current_row, *index_stream); } } - i += index_granularity.getMarkRows(current_mark++); - if (current_mark >= index_granularity.getMarksCount()) - break; + current_row += index_granularity.getMarkRows(index_mark++); } /// store last index row to write final mark at the end of column diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index c572eccea60..c1cf127d721 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -112,6 +112,9 @@ protected: size_t computeIndexGranularity(const Block & block); virtual void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block); + size_t getCurrentMark() const { return current_mark; } + size_t getIndexOffset() const { return index_offset; } + using SerializationState = IDataType::SerializeBinaryBulkStatePtr; using SerializationStates = std::unordered_map; @@ -132,11 +135,6 @@ protected: bool compute_granularity; bool with_final_mark; - size_t current_mark = 0; - - /// The offset to the first row of the block for which you want to write the index. - size_t index_offset = 0; - size_t next_mark = 0; size_t next_index_offset = 0; @@ -162,6 +160,14 @@ protected: /// To correctly write Nested elements column-by-column. WrittenOffsetColumns * written_offset_columns = nullptr; + +private: + /// Data is already written up to this mark. + size_t current_mark = 0; + /// The offset to the first row of the block for which you want to write the index. + size_t index_offset = 0; + /// Index is already serialized up to this mark. + size_t index_mark = 0; }; } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index a9c223bf13b..2f708ac6954 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -91,7 +91,7 @@ void MergeTreeDataPartWriterCompact::write( void MergeTreeDataPartWriterCompact::writeBlock(const Block & block) { size_t total_rows = block.rows(); - size_t from_mark = current_mark; + size_t from_mark = getCurrentMark(); size_t current_row = 0; while (current_row < total_rows) @@ -166,13 +166,12 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart: stream.reset(); } -void fillIndexGranularityImpl( +static void fillIndexGranularityImpl( MergeTreeIndexGranularity & index_granularity, - size_t & index_offset, + size_t index_offset, size_t index_granularity_for_block, size_t rows_in_block) { - bool initial_marks_count = index_granularity.getMarksCount(); for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block) { size_t rows_left_in_block = rows_in_block - current_row; @@ -194,17 +193,13 @@ void fillIndexGranularityImpl( index_granularity.appendMark(index_granularity_for_block); } } - - /// Primary index shouldn't be written, if there was no granule added - if (initial_marks_count == index_granularity.getMarksCount()) - index_offset = std::max(index_offset, rows_in_block); } void MergeTreeDataPartWriterCompact::fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) { fillIndexGranularityImpl( index_granularity, - index_offset, + getIndexOffset(), index_granularity_for_block, rows_in_block); } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 4357d91964b..e5eececacfb 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -209,17 +209,18 @@ void MergeTreeDataPartWriterWide::writeColumn( size_t total_rows = column.size(); size_t current_row = 0; - size_t current_column_mark = current_mark; + size_t current_column_mark = getCurrentMark(); + size_t current_index_offset = getIndexOffset(); while (current_row < total_rows) { size_t rows_to_write; bool write_marks = true; /// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows. - if (current_row == 0 && index_offset != 0) + if (current_row == 0 && current_index_offset != 0) { write_marks = false; - rows_to_write = index_offset; + rows_to_write = current_index_offset; } else { diff --git a/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity_compact_parts.cpp b/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity_compact_parts.cpp index fc3c3146da0..f87293dcd5d 100644 --- a/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity_compact_parts.cpp +++ b/src/Storages/tests/gtest_aux_funcs_for_adaptive_granularity_compact_parts.cpp @@ -42,8 +42,6 @@ TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks) fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows2); - /// index_offset is increased, because we didn't add new granule - EXPECT_EQ(index_offset, rows2); EXPECT_EQ(index_granularity.getMarksCount(), 1); EXPECT_EQ(index_granularity.getMarkRows(0), rows1 + rows2); } @@ -61,8 +59,6 @@ TEST(IndexGranularityCompactParts, FillGranularitySequenceOfBlocks) fillIndexGranularityImpl(index_granularity, index_offset, granularity, rows2); - /// index_offset is not increased, because new granule was added - EXPECT_EQ(index_offset, granularity - rows1); EXPECT_EQ(index_granularity.getMarksCount(), 2); EXPECT_EQ(index_granularity.getMarkRows(0), granularity); EXPECT_EQ(index_granularity.getMarkRows(1), rows1 + rows2 - granularity);