From 1cb06bd97570403084c5d6958872a05abff8345c Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 10 Dec 2020 19:29:10 +0300 Subject: [PATCH] Split one method --- .../MergeTree/IMergeTreeDataPartWriter.h | 9 +++ .../MergeTree/MergeTreeDataPartWriterWide.cpp | 55 ++++++++++++++----- .../MergeTree/MergeTreeDataPartWriterWide.h | 11 ++++ 3 files changed, 60 insertions(+), 15 deletions(-) diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h index acf68143a9a..d31c5785bf6 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.h +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.h @@ -13,6 +13,15 @@ namespace DB { +struct StreamNameAndMark +{ + String stream_name; + MarkInCompressedFile mark; +}; + +using StreamsWithMarks = std::vector; +using ColumnNameToMark = std::unordered_map; + Block getBlockAndPermute(const Block & block, const Names & names, const IColumn::Permutation * permutation); /// Writes data part to disk in different formats. diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 25c97ab5dfa..5e03a42ec84 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -147,27 +147,52 @@ void MergeTreeDataPartWriterWide::writeSingleMark( size_t number_of_rows, DB::IDataType::SubstreamPath & path) { + StreamsWithMarks marks = getCurrentMarksForColumn(name, type, offset_columns, path); + for (const auto & mark : marks) + flushMarkToFile(mark, number_of_rows); +} + +void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark) +{ + Stream & stream = *column_streams[stream_with_mark.stream_name]; + writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.marks); + writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.marks); + if (settings.can_use_adaptive_granularity) + writeIntBinary(rows_in_mark, stream.marks); +} + +StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( + const String & name, + const IDataType & type, + WrittenOffsetColumns & offset_columns, + DB::IDataType::SubstreamPath & path) +{ + StreamsWithMarks result; type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */) - { - bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; + { + bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes; - String stream_name = IDataType::getFileNameForStream(name, substream_path); + String stream_name = IDataType::getFileNameForStream(name, substream_path); - /// Don't write offsets more than one time for Nested type. - if (is_offsets && offset_columns.count(stream_name)) - return; + /// Don't write offsets more than one time for Nested type. + if (is_offsets && offset_columns.count(stream_name)) + return; - Stream & stream = *column_streams[stream_name]; + Stream & stream = *column_streams[stream_name]; - /// There could already be enough data to compress into the new block. - if (stream.compressed.offset() >= settings.min_compress_block_size) - stream.compressed.next(); + /// There could already be enough data to compress into the new block. + if (stream.compressed.offset() >= settings.min_compress_block_size) + stream.compressed.next(); - writeIntBinary(stream.plain_hashing.count(), stream.marks); - writeIntBinary(stream.compressed.offset(), stream.marks); - if (settings.can_use_adaptive_granularity) - writeIntBinary(number_of_rows, stream.marks); - }, path); + StreamNameAndMark stream_with_mark; + stream_with_mark.stream_name = stream_name; + stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing.count(); + stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset(); + + result.push_back(stream_with_mark); + }, path); + + return result; } size_t MergeTreeDataPartWriterWide::writeSingleGranule( diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index d58aac17d66..7f7cbf93aed 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -47,6 +47,17 @@ private: size_t number_of_rows, bool write_marks); + + void flushMarkToFile( + const StreamNameAndMark & stream_with_mark, + size_t rows_in_mark); + + StreamsWithMarks getCurrentMarksForColumn( + const String & name, + const IDataType & type, + WrittenOffsetColumns & offset_columns, + DB::IDataType::SubstreamPath & path); + /// Write mark for column void writeSingleMark( const String & name,