From 5524b77915b172ec809af750c7c7b36b6f9f465e Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 30 Nov 2018 18:36:10 +0300 Subject: [PATCH] Compilable code! --- .../MergingSortedBlockInputStream.cpp | 34 ++- dbms/src/Storages/MergeTree/MergeTreeData.cpp | 9 +- .../MergeTree/MergeTreeDataMergerMutator.cpp | 16 +- .../MergeTree/MergedBlockOutputStream.cpp | 250 +++++++++++------- .../MergeTree/MergedBlockOutputStream.h | 56 +++- 5 files changed, 252 insertions(+), 113 deletions(-) diff --git a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp index 8dd929759ca..91aecf91ea8 100644 --- a/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergingSortedBlockInputStream.cpp @@ -114,7 +114,7 @@ Block MergingSortedBlockInputStream::readImpl() template void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue & queue) { - size_t order = current.impl->order; + size_t order = current->order; size_t size = cursors.size(); if (order >= size || &cursors[order] != current.impl) @@ -130,6 +130,16 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, } } +namespace { +size_t getAvgGranularity(const std::unordered_map & rows_granularity, size_t total_rows) +{ + size_t sum = 0; + for (const auto [granularity, rows_num] : rows_granularity) + sum += granularity * rows_num; + return sum / total_rows; +} +} + template void MergingSortedBlockInputStream::fetchNextBlock(const SortCursor & current, std::priority_queue & queue); @@ -142,10 +152,11 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: { size_t merged_rows = 0; + std::unordered_map rows_granularity; /** Increase row counters. * Return true if it's time to finish generating the current data block. */ - auto count_row_and_check_limit = [&, this]() + auto count_row_and_check_limit = [&, this](size_t current_granularity) { ++total_merged_rows; if (limit && total_merged_rows == limit) @@ -156,8 +167,9 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: return true; } + rows_granularity[current_granularity]++; ++merged_rows; - if (merged_rows == max_block_size) + if (merged_rows == getAvgGranularity(rows_granularity, merged_rows)) { // std::cerr << "max_block_size reached\n"; return true; @@ -170,6 +182,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: while (!queue.empty()) { TSortCursor current = queue.top(); + size_t current_block_granularity = current->rows; queue.pop(); while (true) @@ -177,7 +190,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: /** And what if the block is totally less or equal than the rest for the current cursor? * Or is there only one data source left in the queue? Then you can take the entire block on current cursor. */ - if (current.impl->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top()))) + if (current->isFirst() && (queue.empty() || current.totallyLessOrEquals(queue.top()))) { // std::cerr << "current block is totally less or equals\n"; @@ -189,8 +202,8 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: return; } - /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - size_t source_num = current.impl->order; + /// Actually, current->order stores source number (i.e. cursors[current->order] == current) + size_t source_num = current->order; if (source_num >= cursors.size()) throw Exception("Logical error in MergingSortedBlockInputStream", ErrorCodes::LOGICAL_ERROR); @@ -202,6 +215,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: merged_rows = merged_columns.at(0)->size(); + /// Limit output if (limit && total_merged_rows + merged_rows > limit) { merged_rows = limit - total_merged_rows; @@ -215,6 +229,8 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: finished = true; } + /// Write order of rows for other columns + /// this data will be used in grather stream if (out_row_sources_buf) { RowSourcePart row_source(source_num); @@ -237,7 +253,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: if (out_row_sources_buf) { /// Actually, current.impl->order stores source number (i.e. cursors[current.impl->order] == current.impl) - RowSourcePart row_source(current.impl->order); + RowSourcePart row_source(current->order); out_row_sources_buf->write(row_source.data); } @@ -248,7 +264,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: if (queue.empty() || !(current.greater(queue.top()))) { - if (count_row_and_check_limit()) + if (count_row_and_check_limit(current_block_granularity)) { // std::cerr << "pushing back to queue\n"; queue.push(current); @@ -275,7 +291,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std:: break; } - if (count_row_and_check_limit()) + if (count_row_and_check_limit(current_block_granularity)) return; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 788ac6a4ad6..b99ac827d1f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1248,7 +1248,14 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart( */ IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets; MergedColumnOnlyOutputStream out( - *this, in.getHeader(), full_path + part->name + '/', true /* sync */, compression_settings, true /* skip_offsets */, unused_written_offsets); + *this, + in.getHeader(), + full_path + part->name + '/', + true /* sync */, + compression_settings, + true /* skip_offsets */, + unused_written_offsets, + part->marks_index_granularity); in.readPrefix(); out.writePrefix(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index c7f25e372ae..7c5c67de3e0 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -700,8 +700,17 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor if (deduplicate) merged_stream = std::make_shared(merged_stream, SizeLimits(), 0 /*limit_hint*/, Names()); + /// If merge is vertical we cannot calculate it + bool calculate_index_granularity = (merge_alg != MergeAlgorithm::Vertical); + MergedBlockOutputStream to{ - data, new_part_tmp_path, merging_columns, compression_settings, merged_column_to_size, data.settings.min_merge_bytes_to_use_direct_io}; + data, + new_part_tmp_path, + merging_columns, + compression_settings, + merged_column_to_size, + data.settings.min_merge_bytes_to_use_direct_io, + calculate_index_granularity}; merged_stream->readPrefix(); to.writePrefix(); @@ -786,7 +795,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor rows_sources_read_buf.seek(0, 0); ColumnGathererStream column_gathered_stream(column_name, column_part_streams, rows_sources_read_buf); MergedColumnOnlyOutputStream column_to( - data, column_gathered_stream.getHeader(), new_part_tmp_path, false, compression_settings, false, written_offset_columns); + data, column_gathered_stream.getHeader(), new_part_tmp_path, false, compression_settings, false, written_offset_columns, to.getIndexGranularity()); size_t column_elems_written = 0; column_to.writePrefix(); @@ -963,7 +972,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets; MergedColumnOnlyOutputStream out( - data, in_header, new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false, unused_written_offsets); + data, in_header, new_part_tmp_path, /* sync = */ false, compression_settings, /* skip_offsets = */ false, unused_written_offsets, source_part->marks_index_granularity); in->readPrefix(); out.writePrefix(); @@ -1002,6 +1011,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor new_data_part->rows_count = source_part->rows_count; new_data_part->marks_count = source_part->marks_count; + new_data_part->marks_index_granularity = source_part->marks_index_granularity; new_data_part->index = source_part->index; new_data_part->partition.assign(source_part->partition); new_data_part->minmax_idx = source_part->minmax_idx; diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 8b7650cf18a..0185b78b7ca 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -32,16 +32,18 @@ IMergedBlockOutputStream::IMergedBlockOutputStream( size_t max_compress_block_size_, CompressionSettings compression_settings_, size_t aio_threshold_, + bool blocks_are_granules_size_, const std::vector & index_granularity_) - : storage(storage_), - min_compress_block_size(min_compress_block_size_), - max_compress_block_size(max_compress_block_size_), - aio_threshold(aio_threshold_), - compression_settings(compression_settings_), - marks_file_extension(storage.index_granularity_bytes == 0 ? FIXED_MARKS_FILE_EXTENSION : ADAPTIVE_MARKS_FILE_EXTENSION), - mark_size_in_bytes(storage.index_granularity_bytes == 0 ? FIXED_MARK_BYTE_SIZE : ADAPTIVE_MARK_BYTE_SIZE), - index_granularity(index_granularity_), - compute_granularity_unknown(index_granularity.empty()) + : storage(storage_) + , min_compress_block_size(min_compress_block_size_) + , max_compress_block_size(max_compress_block_size_) + , aio_threshold(aio_threshold_) + , compression_settings(compression_settings_) + , index_granularity(index_granularity_) + , marks_file_extension(storage.index_granularity_bytes == 0 ? FIXED_MARKS_FILE_EXTENSION : ADAPTIVE_MARKS_FILE_EXTENSION) + , mark_size_in_bytes(storage.index_granularity_bytes == 0 ? FIXED_MARK_BYTE_SIZE : ADAPTIVE_MARK_BYTE_SIZE) + , blocks_are_granules_size(blocks_are_granules_size_) + , compute_granularity(index_granularity.empty()) { } @@ -98,73 +100,43 @@ IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter( }; } -size_t IMergedBlockOutputStream::getBlockIndexGranularity(const Block & block) const +void IMergedBlockOutputStream::fillIndexGranularity(const Block & block) { + size_t rows = block.rows(); + size_t index_granularity_for_block; if (storage.index_granularity_bytes == 0) - return storage.index_granularity; - - size_t block_size_in_memory = block.allocatedBytes(); - return std::max(block_size_in_memory / storage.index_granularity_bytes, 1); + index_granularity_for_block = storage.index_granularity; + else + { + size_t block_size_in_memory = block.allocatedBytes(); + if (blocks_are_granules_size) + index_granularity_for_block = rows; + else + index_granularity_for_block = std::max(block_size_in_memory / storage.index_granularity_bytes, 1); + } + size_t current_row = 0; + while (current_row < rows) + { + index_granularity.push_back(index_granularity_for_block); + ++current_row; + } } - -void IMergedBlockOutputStream::writeData( +size_t IMergedBlockOutputStream::writeSingleGranule( const String & name, const IDataType & type, const IColumn & column, WrittenOffsetColumns & offset_columns, bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state, - size_t index_granularity) + IDataType::SerializeBinaryBulkSettings & serialize_settings, + size_t from_row, + size_t number_of_rows, + bool write_marks) { - auto & settings = storage.context.getSettingsRef(); - IDataType::SerializeBinaryBulkSettings serialize_settings; - serialize_settings.getter = createStreamGetter(name, offset_columns, skip_offsets); - serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size; - serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0; - - size_t size = column.size(); - size_t prev_mark = 0; - while (prev_mark < size) + if (write_marks) { - size_t limit = 0; - - /// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows. - if (prev_mark == 0 && index_offset != 0) - limit = index_offset; - else - { - limit = index_granularity; - - /// Write marks. - type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path) - { - bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; - if (is_offsets && skip_offsets) - return; - - String stream_name = IDataType::getFileNameForStream(name, substream_path); - - /// Don't write offsets more than one time for Nested type. - if (is_offsets && offset_columns.count(stream_name)) - return; - - ColumnStream & stream = *column_streams[stream_name]; - - /// There could already be enough data to compress into the new block. - if (stream.compressed.offset() >= min_compress_block_size) - stream.compressed.next(); - - writeIntBinary(stream.plain_hashing.count(), stream.marks); - writeIntBinary(stream.compressed.offset(), stream.marks); - if (stream.marks_file_extension != FIXED_MARKS_FILE_EXTENSION) - writeIntBinary(index_granularity, stream.marks); - }, serialize_settings.path); - } - - type.serializeBinaryBulkWithMultipleStreams(column, prev_mark, limit, serialize_settings, serialization_state); - - /// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one. + /// Write marks. type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path) { bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; @@ -177,10 +149,93 @@ void IMergedBlockOutputStream::writeData( if (is_offsets && offset_columns.count(stream_name)) return; - column_streams[stream_name]->compressed.nextIfAtEnd(); - }, serialize_settings.path); + ColumnStream & stream = *column_streams[stream_name]; - prev_mark += limit; + /// There could already be enough data to compress into the new block. + if (stream.compressed.offset() >= min_compress_block_size) + stream.compressed.next(); + + writeIntBinary(stream.plain_hashing.count(), stream.marks); + writeIntBinary(stream.compressed.offset(), stream.marks); + if (stream.marks_file_extension != FIXED_MARKS_FILE_EXTENSION) + writeIntBinary(index_granularity, stream.marks); + }, serialize_settings.path); + } + + type.serializeBinaryBulkWithMultipleStreams(column, from_row, number_of_rows, serialize_settings, serialization_state); + + /// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one. + type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path) + { + bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; + if (is_offsets && skip_offsets) + return; + + String stream_name = IDataType::getFileNameForStream(name, substream_path); + + /// Don't write offsets more than one time for Nested type. + if (is_offsets && offset_columns.count(stream_name)) + return; + + column_streams[stream_name]->compressed.nextIfAtEnd(); + }, serialize_settings.path); + + return from_row + number_of_rows; +} + +std::pair IMergedBlockOutputStream::writeColumn( + const String & name, + const IDataType & type, + const IColumn & column, + WrittenOffsetColumns & offset_columns, + bool skip_offsets, + IDataType::SerializeBinaryBulkStatePtr & serialization_state, + size_t from_mark) +{ + auto & settings = storage.context.getSettingsRef(); + IDataType::SerializeBinaryBulkSettings serialize_settings; + serialize_settings.getter = createStreamGetter(name, offset_columns, skip_offsets); + serialize_settings.low_cardinality_max_dictionary_size = settings.low_cardinality_max_dictionary_size; + serialize_settings.low_cardinality_use_single_dictionary_for_part = settings.low_cardinality_use_single_dictionary_for_part != 0; + + size_t total_rows = column.size(); + size_t current_row = 0; + size_t current_column_mark = from_mark; + while (current_row < total_rows) + { + size_t rows_to_write; + bool write_marks = true; + + /// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows. + if (current_row == 0 && index_offset != 0) + { + write_marks = false; + rows_to_write = index_offset; + } + else + { + if (index_granularity.size() <= current_column_mark) + throw Exception( + "Incorrect size of index granularity expect mark " + toString(current_column_mark) + " totally have marks " + toString(index_granularity.size()), + ErrorCodes::LOGICAL_ERROR); + + rows_to_write = index_granularity[current_column_mark]; + } + + current_row = writeSingleGranule( + name, + type, + column, + offset_columns, + skip_offsets, + serialization_state, + serialize_settings, + current_row, + rows_to_write, + write_marks + ); + + current_column_mark++; } /// Memoize offsets for Nested types, that are already written. They will not be written again for next columns of Nested structure. @@ -193,6 +248,8 @@ void IMergedBlockOutputStream::writeData( offset_columns.insert(stream_name); } }, serialize_settings.path); + + return std::make_pair(current_column_mark - from_mark - 1, current_row - total_rows); } @@ -251,11 +308,15 @@ MergedBlockOutputStream::MergedBlockOutputStream( MergeTreeData & storage_, String part_path_, const NamesAndTypesList & columns_list_, - CompressionSettings compression_settings) + CompressionSettings compression_settings, + bool blocks_are_granules_size_, + const std::vector & index_granularity_) : IMergedBlockOutputStream( storage_, storage_.context.getSettings().min_compress_block_size, storage_.context.getSettings().max_compress_block_size, compression_settings, - storage_.context.getSettings().min_bytes_to_use_direct_io), + storage_.context.getSettings().min_bytes_to_use_direct_io, + blocks_are_granules_size_, + index_granularity_), columns_list(columns_list_), part_path(part_path_) { init(); @@ -269,11 +330,13 @@ MergedBlockOutputStream::MergedBlockOutputStream( const NamesAndTypesList & columns_list_, CompressionSettings compression_settings, const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_, - size_t aio_threshold_) + size_t aio_threshold_, + bool blocks_are_granules_size_, + const std::vector & index_granularity_) : IMergedBlockOutputStream( storage_, storage_.context.getSettings().min_compress_block_size, storage_.context.getSettings().max_compress_block_size, compression_settings, - aio_threshold_), + aio_threshold_, blocks_are_granules_size_, index_granularity_), columns_list(columns_list_), part_path(part_path_) { init(); @@ -394,14 +457,14 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart( } new_part->rows_count = rows_count; - new_part->marks_count = marks_count; + new_part->marks_count = current_mark; new_part->modification_time = time(nullptr); new_part->columns = *total_column_list; new_part->index.assign(std::make_move_iterator(index_columns.begin()), std::make_move_iterator(index_columns.end())); new_part->checksums = checksums; new_part->bytes_on_disk = MergeTreeData::DataPart::calculateTotalSizeOnDisk(new_part->getFullPath()); new_part->marks_file_extension = marks_file_extension; - new_part->marks_index_granularity.swap(marks_index_granularity); + new_part->marks_index_granularity.swap(index_granularity); new_part->mark_size_in_bytes = mark_size_in_bytes; } @@ -422,7 +485,12 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm { block.checkNumberOfRows(); size_t rows = block.rows(); - size_t current_block_index_granularity = getBlockIndexGranularity(block); + + /// Fill index granularity for this block + /// if it's unknown (in case of insert data or horizontal merge, + /// but not in case of vertical merge) + if (compute_granularity) + fillIndexGranularity(block); /// The set of written offset columns so that you do not write shared offsets of nested structures columns several times WrittenOffsetColumns offset_columns; @@ -468,6 +536,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm } } + size_t new_index_offset = 0; /// Now write the data. auto it = columns_list.begin(); for (size_t i = 0; i < columns_list.size(); ++i, ++it) @@ -480,18 +549,18 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm if (primary_columns_name_to_position.end() != primary_column_it) { auto & primary_column = *primary_columns[primary_column_it->second].column; - writeData(column.name, *column.type, primary_column, offset_columns, false, serialization_states[i], current_block_index_granularity); + std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, primary_column, offset_columns, false, serialization_states[i], current_mark); } else { /// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM. ColumnPtr permuted_column = column.column->permute(*permutation, 0); - writeData(column.name, *column.type, *permuted_column, offset_columns, false, serialization_states[i], current_block_index_granularity); + std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, *permuted_column, offset_columns, false, serialization_states[i], current_mark); } } else { - writeData(column.name, *column.type, *column.column, offset_columns, false, serialization_states[i], current_block_index_granularity); + std::tie(std::ignore, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, false, serialization_states[i], current_mark); } } @@ -507,7 +576,7 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock(); /// Write index. The index contains Primary Key value for each `index_granularity` row. - for (size_t i = index_offset; i < rows; i += current_block_index_granularity) + for (size_t i = index_offset; i < rows; i += index_granularity[current_mark]) { if (storage.hasPrimaryKey()) { @@ -519,13 +588,11 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm } } - marks_index_granularity.push_back(current_block_index_granularity); - ++marks_count; + ++current_mark; } } - size_t written_for_last_mark = (current_block_index_granularity - index_offset + rows) % current_block_index_granularity; - index_offset = (current_block_index_granularity - written_for_last_mark) % current_block_index_granularity; + index_offset = new_index_offset; } @@ -534,14 +601,19 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_, - WrittenOffsetColumns & already_written_offset_columns) + WrittenOffsetColumns & already_written_offset_columns, + const std::vector & index_granularity_) : IMergedBlockOutputStream( storage_, storage_.context.getSettings().min_compress_block_size, storage_.context.getSettings().max_compress_block_size, compression_settings, - storage_.context.getSettings().min_bytes_to_use_direct_io), + storage_.context.getSettings().min_bytes_to_use_direct_io, + false, + index_granularity_), header(header_), part_path(part_path_), sync(sync_), skip_offsets(skip_offsets_), already_written_offset_columns(already_written_offset_columns) { + if (index_granularity.empty()) + throw Exception("Can't write column without information about part index granularity", ErrorCodes::LOGICAL_ERROR); } void MergedColumnOnlyOutputStream::write(const Block & block) @@ -567,17 +639,17 @@ void MergedColumnOnlyOutputStream::write(const Block & block) initialized = true; } - size_t rows = block.rows(); - + size_t new_index_offset = 0; + size_t new_current_mark = 0; WrittenOffsetColumns offset_columns = already_written_offset_columns; for (size_t i = 0; i < block.columns(); ++i) { const ColumnWithTypeAndName & column = block.safeGetByPosition(i); - writeData(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], storage.index_granularity); + std::tie(new_current_mark, new_index_offset) = writeColumn(column.name, *column.type, *column.column, offset_columns, skip_offsets, serialization_states[i], current_mark); } - // size_t written_for_last_mark = (current_block_index_granularity - index_offset + rows) % current_block_index_granularity; - // index_offset = (current_block_index_granularity - written_for_last_mark) % current_block_index_granularity; + index_offset = new_index_offset; + current_mark = new_current_mark; } void MergedColumnOnlyOutputStream::writeSuffix() diff --git a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h index f82b1f597d9..c60bc464740 100644 --- a/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/dbms/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -22,7 +22,8 @@ public: size_t max_compress_block_size_, CompressionSettings compression_settings_, size_t aio_threshold_, - const std::vector & index_granularity_ = {}); + bool blocks_are_granules_size_, + const std::vector & index_granularity_); using WrittenOffsetColumns = std::set; @@ -72,10 +73,31 @@ protected: IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns, bool skip_offsets); /// Write data of one column. - void writeData(const String & name, const IDataType & type, const IColumn & column, WrittenOffsetColumns & offset_columns, - bool skip_offsets, IDataType::SerializeBinaryBulkStatePtr & serialization_state, size_t index_granularity); + /// Return how many marks were written and + /// how many rows were written for last mark + std::pair writeColumn( + const String & name, + const IDataType & type, + const IColumn & column, + WrittenOffsetColumns & offset_columns, + bool skip_offsets, + IDataType::SerializeBinaryBulkStatePtr & serialization_state, + size_t from_mark + ); - size_t getBlockIndexGranularity(const Block & block) const; + size_t writeSingleGranule( + const String & name, + const IDataType & type, + const IColumn & column, + WrittenOffsetColumns & offset_columns, + bool skip_offsets, + IDataType::SerializeBinaryBulkStatePtr & serialization_state, + IDataType::SerializeBinaryBulkSettings & serialize_settings, + size_t from_row, + size_t number_of_rows, + bool write_marks); + + void fillIndexGranularity(const Block & block); MergeTreeData & storage; @@ -89,14 +111,17 @@ protected: size_t aio_threshold; + size_t current_mark = 0; CompressionSettings compression_settings; + const std::string marks_file_extension; + const size_t mark_size_in_bytes; + const bool blocks_are_granules_size; + std::vector index_granularity; - std::string marks_file_extension; - size_t mark_size_in_bytes; - bool compute_granularity_unknown; + const bool compute_granularity; }; @@ -110,7 +135,9 @@ public: MergeTreeData & storage_, String part_path_, const NamesAndTypesList & columns_list_, - CompressionSettings compression_settings); + CompressionSettings compression_settings, + bool blocks_are_granules_size_ = false, + const std::vector & index_granularity_ = {}); MergedBlockOutputStream( MergeTreeData & storage_, @@ -118,7 +145,9 @@ public: const NamesAndTypesList & columns_list_, CompressionSettings compression_settings, const MergeTreeData::DataPart::ColumnToSize & merged_column_to_size_, - size_t aio_threshold_); + size_t aio_threshold_, + bool blocks_are_granules_size_ = false, + const std::vector & index_granularity_ = {}); std::string getPartPath() const; @@ -140,6 +169,11 @@ public: const NamesAndTypesList * total_columns_list = nullptr, MergeTreeData::DataPart::Checksums * additional_column_checksums = nullptr); + const std::vector & getIndexGranularity() const + { + return index_granularity; + } + private: void init(); @@ -154,7 +188,6 @@ private: String part_path; size_t rows_count = 0; - size_t marks_count = 0; std::unique_ptr index_file_stream; std::unique_ptr index_stream; @@ -172,7 +205,8 @@ public: MergedColumnOnlyOutputStream( MergeTreeData & storage_, const Block & header_, String part_path_, bool sync_, CompressionSettings compression_settings, bool skip_offsets_, - WrittenOffsetColumns & already_written_offset_columns); + WrittenOffsetColumns & already_written_offset_columns, + const std::vector & index_granularity_); Block getHeader() const override { return header; } void write(const Block & block) override;