write compressed blocks for every column in granule

This commit is contained in:
Anton Popov 2020-07-23 19:39:56 +03:00
parent 61018c275d
commit d3fbeb296e
3 changed files with 7 additions and 17 deletions

View File

@ -99,14 +99,13 @@ void MergeTreeDataPartWriterCompact::writeBlock(const Block & block)
for (const auto & column : columns_list)
{
/// There could already be enough data to compress into the new block.
if (stream->compressed.offset() >= settings.min_compress_block_size)
stream->compressed.next();
writeIntBinary(stream->plain_hashing.count(), stream->marks);
writeIntBinary(stream->compressed.offset(), stream->marks);
writeColumnSingleGranule(block.getByName(column.name), current_row, rows_to_write);
/// Write one compressed block per column in granule for more optimal reading.
stream->compressed.next();
}
++from_mark;

View File

@ -66,7 +66,6 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
/// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges);
std::cerr << "buffer_size: " << buffer_size << "\n";
if (!buffer_size || settings.max_read_buffer_size < buffer_size)
buffer_size = settings.max_read_buffer_size;
@ -276,19 +275,11 @@ public:
return *this;
}
Iterator operator++(int)
{
auto tmp = *this;
++*this;
return tmp;
}
bool operator==(const Iterator & other) const { return row == other.row && column == other.column; }
bool operator!=(const Iterator & other) const { return !(*this == other); }
};
Iterator get(size_t row, size_t column) { return Iterator(row, column, this); }
Iterator begin() { return get(0, 0); }
Iterator end() { return get(rows_num, 0); }
private:
@ -299,16 +290,16 @@ private:
}
size_t MergeTreeReaderCompact::getReadBufferSize(
const DataPartPtr & data_part,
const DataPartPtr & part,
MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions,
const MarkRanges & mark_ranges)
{
size_t buffer_size = 0;
size_t columns_num = column_positions.size();
size_t file_size = data_part->getFileSizeOrZero(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION);
size_t file_size = part->getFileSizeOrZero(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION);
MarksCounter counter(data_part->getMarksCount(), data_part->getColumns().size());
MarksCounter counter(part->getMarksCount(), part->getColumns().size());
for (const auto & mark_range : mark_ranges)
{

View File

@ -61,7 +61,7 @@ private:
/// Returns maximal value of granule size in compressed file from @mark_ranges.
/// This value is used as size of read buffer.
static size_t getReadBufferSize(
const DataPartPtr & data_part,
const DataPartPtr & part,
MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions,
const MarkRanges & mark_ranges);