Split one method

This commit is contained in:
alesapin 2020-12-10 19:29:10 +03:00
parent 90fa9a2073
commit 1cb06bd975
3 changed files with 60 additions and 15 deletions

View File

@ -13,6 +13,15 @@
namespace DB
{
struct StreamNameAndMark
{
String stream_name;
MarkInCompressedFile mark;
};
using StreamsWithMarks = std::vector<StreamNameAndMark>;
using ColumnNameToMark = std::unordered_map<String, StreamsWithMarks>;
Block getBlockAndPermute(const Block & block, const Names & names, const IColumn::Permutation * permutation);
/// Writes data part to disk in different formats.

View File

@ -147,27 +147,52 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
size_t number_of_rows,
DB::IDataType::SubstreamPath & path)
{
StreamsWithMarks marks = getCurrentMarksForColumn(name, type, offset_columns, path);
for (const auto & mark : marks)
flushMarkToFile(mark, number_of_rows);
}
void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark)
{
Stream & stream = *column_streams[stream_with_mark.stream_name];
writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.marks);
writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.marks);
if (settings.can_use_adaptive_granularity)
writeIntBinary(rows_in_mark, stream.marks);
}
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
const String & name,
const IDataType & type,
WrittenOffsetColumns & offset_columns,
DB::IDataType::SubstreamPath & path)
{
StreamsWithMarks result;
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
{
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
String stream_name = IDataType::getFileNameForStream(name, substream_path);
String stream_name = IDataType::getFileNameForStream(name, substream_path);
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return;
/// Don't write offsets more than one time for Nested type.
if (is_offsets && offset_columns.count(stream_name))
return;
Stream & stream = *column_streams[stream_name];
Stream & stream = *column_streams[stream_name];
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= settings.min_compress_block_size)
stream.compressed.next();
/// There could already be enough data to compress into the new block.
if (stream.compressed.offset() >= settings.min_compress_block_size)
stream.compressed.next();
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
if (settings.can_use_adaptive_granularity)
writeIntBinary(number_of_rows, stream.marks);
}, path);
StreamNameAndMark stream_with_mark;
stream_with_mark.stream_name = stream_name;
stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing.count();
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset();
result.push_back(stream_with_mark);
}, path);
return result;
}
size_t MergeTreeDataPartWriterWide::writeSingleGranule(

View File

@ -47,6 +47,17 @@ private:
size_t number_of_rows,
bool write_marks);
void flushMarkToFile(
const StreamNameAndMark & stream_with_mark,
size_t rows_in_mark);
StreamsWithMarks getCurrentMarksForColumn(
const String & name,
const IDataType & type,
WrittenOffsetColumns & offset_columns,
DB::IDataType::SubstreamPath & path);
/// Write mark for column
void writeSingleMark(
const String & name,