mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 09:10:48 +00:00
Split one method
This commit is contained in:
parent
90fa9a2073
commit
1cb06bd975
@ -13,6 +13,15 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
struct StreamNameAndMark
|
||||||
|
{
|
||||||
|
String stream_name;
|
||||||
|
MarkInCompressedFile mark;
|
||||||
|
};
|
||||||
|
|
||||||
|
using StreamsWithMarks = std::vector<StreamNameAndMark>;
|
||||||
|
using ColumnNameToMark = std::unordered_map<String, StreamsWithMarks>;
|
||||||
|
|
||||||
Block getBlockAndPermute(const Block & block, const Names & names, const IColumn::Permutation * permutation);
|
Block getBlockAndPermute(const Block & block, const Names & names, const IColumn::Permutation * permutation);
|
||||||
|
|
||||||
/// Writes data part to disk in different formats.
|
/// Writes data part to disk in different formats.
|
||||||
|
@ -147,27 +147,52 @@ void MergeTreeDataPartWriterWide::writeSingleMark(
|
|||||||
size_t number_of_rows,
|
size_t number_of_rows,
|
||||||
DB::IDataType::SubstreamPath & path)
|
DB::IDataType::SubstreamPath & path)
|
||||||
{
|
{
|
||||||
|
StreamsWithMarks marks = getCurrentMarksForColumn(name, type, offset_columns, path);
|
||||||
|
for (const auto & mark : marks)
|
||||||
|
flushMarkToFile(mark, number_of_rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark)
|
||||||
|
{
|
||||||
|
Stream & stream = *column_streams[stream_with_mark.stream_name];
|
||||||
|
writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, stream.marks);
|
||||||
|
writeIntBinary(stream_with_mark.mark.offset_in_decompressed_block, stream.marks);
|
||||||
|
if (settings.can_use_adaptive_granularity)
|
||||||
|
writeIntBinary(rows_in_mark, stream.marks);
|
||||||
|
}
|
||||||
|
|
||||||
|
StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn(
|
||||||
|
const String & name,
|
||||||
|
const IDataType & type,
|
||||||
|
WrittenOffsetColumns & offset_columns,
|
||||||
|
DB::IDataType::SubstreamPath & path)
|
||||||
|
{
|
||||||
|
StreamsWithMarks result;
|
||||||
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
|
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
|
||||||
{
|
{
|
||||||
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
|
bool is_offsets = !substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes;
|
||||||
|
|
||||||
String stream_name = IDataType::getFileNameForStream(name, substream_path);
|
String stream_name = IDataType::getFileNameForStream(name, substream_path);
|
||||||
|
|
||||||
/// Don't write offsets more than one time for Nested type.
|
/// Don't write offsets more than one time for Nested type.
|
||||||
if (is_offsets && offset_columns.count(stream_name))
|
if (is_offsets && offset_columns.count(stream_name))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
Stream & stream = *column_streams[stream_name];
|
Stream & stream = *column_streams[stream_name];
|
||||||
|
|
||||||
/// There could already be enough data to compress into the new block.
|
/// There could already be enough data to compress into the new block.
|
||||||
if (stream.compressed.offset() >= settings.min_compress_block_size)
|
if (stream.compressed.offset() >= settings.min_compress_block_size)
|
||||||
stream.compressed.next();
|
stream.compressed.next();
|
||||||
|
|
||||||
writeIntBinary(stream.plain_hashing.count(), stream.marks);
|
StreamNameAndMark stream_with_mark;
|
||||||
writeIntBinary(stream.compressed.offset(), stream.marks);
|
stream_with_mark.stream_name = stream_name;
|
||||||
if (settings.can_use_adaptive_granularity)
|
stream_with_mark.mark.offset_in_compressed_file = stream.plain_hashing.count();
|
||||||
writeIntBinary(number_of_rows, stream.marks);
|
stream_with_mark.mark.offset_in_decompressed_block = stream.compressed.offset();
|
||||||
}, path);
|
|
||||||
|
result.push_back(stream_with_mark);
|
||||||
|
}, path);
|
||||||
|
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t MergeTreeDataPartWriterWide::writeSingleGranule(
|
size_t MergeTreeDataPartWriterWide::writeSingleGranule(
|
||||||
|
@ -47,6 +47,17 @@ private:
|
|||||||
size_t number_of_rows,
|
size_t number_of_rows,
|
||||||
bool write_marks);
|
bool write_marks);
|
||||||
|
|
||||||
|
|
||||||
|
void flushMarkToFile(
|
||||||
|
const StreamNameAndMark & stream_with_mark,
|
||||||
|
size_t rows_in_mark);
|
||||||
|
|
||||||
|
StreamsWithMarks getCurrentMarksForColumn(
|
||||||
|
const String & name,
|
||||||
|
const IDataType & type,
|
||||||
|
WrittenOffsetColumns & offset_columns,
|
||||||
|
DB::IDataType::SubstreamPath & path);
|
||||||
|
|
||||||
/// Write mark for column
|
/// Write mark for column
|
||||||
void writeSingleMark(
|
void writeSingleMark(
|
||||||
const String & name,
|
const String & name,
|
||||||
|
Loading…
Reference in New Issue
Block a user