better code in ReaderCompact and perf test added

This commit is contained in:
Anton Popov 2020-07-23 18:37:21 +03:00
parent c6423b2b28
commit 1d0493cfcc
3 changed files with 107 additions and 29 deletions

View File

@ -64,7 +64,9 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
column_positions[i] = std::move(position);
}
auto buffer_size = getReadBufferSize();
/// Do not use max_read_buffer_size, but try to lower buffer size with maximal size of granule to avoid reading much data.
auto buffer_size = getReadBufferSize(data_part, marks_loader, column_positions, all_mark_ranges);
std::cerr << "buffer_size: " << buffer_size << "\n";
if (!buffer_size || settings.max_read_buffer_size < buffer_size)
buffer_size = settings.max_read_buffer_size;
@ -240,47 +242,90 @@ bool MergeTreeReaderCompact::isContinuousReading(size_t mark, size_t column_posi
|| (mark == last_mark + 1 && column_position == 0 && last_column == data_part->getColumns().size() - 1);
}
size_t MergeTreeReaderCompact::getReadBufferSize()
namespace
{
size_t buffer_size = 0;
size_t columns_num = columns.size();
size_t last_column_position = data_part->getColumns().size() - 1;
size_t file_size = data_part->getFileSizeOrZero(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION);
size_t marks_count = data_part->getMarksCount();
auto next = [&](size_t & row_index, size_t & column_index)
/// A simple class that helps to iterate over 2-dim marks of compact parts.
class MarksCounter
{
public:
MarksCounter(size_t rows_num_, size_t columns_num_)
: rows_num(rows_num_), columns_num(columns_num_) {}
struct Iterator
{
if (column_index == last_column_position)
{
++row_index;
column_index = 0;
if (row_index == marks_count)
return false;
}
else
++column_index;
size_t row;
size_t column;
MarksCounter * counter;
return true;
Iterator(size_t row_, size_t column_, MarksCounter * counter_)
: row(row_), column(column_), counter(counter_) {}
Iterator operator++()
{
if (column + 1 == counter->columns_num)
{
++row;
column = 0;
}
else
{
++column;
}
return *this;
}
Iterator operator++(int)
{
auto tmp = *this;
++*this;
return tmp;
}
bool operator==(const Iterator & other) const { return row == other.row && column == other.column; }
bool operator!=(const Iterator & other) const { return !(*this == other); }
};
for (const auto & mark_range : all_mark_ranges)
Iterator get(size_t row, size_t column) { return Iterator(row, column, this); }
Iterator begin() { return get(0, 0); }
Iterator end() { return get(rows_num, 0); }
private:
size_t rows_num;
size_t columns_num;
};
}
size_t MergeTreeReaderCompact::getReadBufferSize(
const DataPartPtr & data_part,
MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions,
const MarkRanges & mark_ranges)
{
size_t buffer_size = 0;
size_t columns_num = column_positions.size();
size_t file_size = data_part->getFileSizeOrZero(MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION);
MarksCounter counter(data_part->getMarksCount(), data_part->getColumns().size());
for (const auto & mark_range : mark_ranges)
{
for (size_t mark = mark_range.begin; mark <= mark_range.end; ++mark)
for (size_t mark = mark_range.begin; mark < mark_range.end; ++mark)
{
for (size_t i = 0; i < columns_num; ++i)
{
if (!column_positions[i])
continue;
size_t row_ind = mark;
size_t col_ind = *column_positions[i];
size_t cur_offset = marks_loader.getMark(mark, col_ind).offset_in_compressed_file;
auto it = counter.get(mark, *column_positions[i]);
size_t cur_offset = marks_loader.getMark(it.row, it.column).offset_in_compressed_file;
while (next(row_ind, col_ind) && cur_offset == marks_loader.getMark(row_ind, col_ind).offset_in_compressed_file)
;
while (it != counter.end() && cur_offset == marks_loader.getMark(it.row, it.column).offset_in_compressed_file)
++it;
size_t next_offset = (row_ind == marks_count ? file_size : marks_loader.getMark(row_ind, col_ind).offset_in_compressed_file);
size_t next_offset = (it == counter.end() ? file_size : marks_loader.getMark(it.row, it.column).offset_in_compressed_file);
buffer_size = std::max(buffer_size, next_offset - cur_offset);
}
}

View File

@ -10,6 +10,9 @@ namespace DB
class MergeTreeDataPartCompact;
using DataPartCompactPtr = std::shared_ptr<const MergeTreeDataPartCompact>;
class IMergeTreeDataPart;
using DataPartPtr = std::shared_ptr<const IMergeTreeDataPart>;
/// Reader for compact parts
class MergeTreeReaderCompact : public IMergeTreeReader
{
@ -42,7 +45,8 @@ private:
MergeTreeMarksLoader marks_loader;
/// Positions of columns in part structure.
std::vector<ColumnPosition> column_positions;
using ColumnPositions = std::vector<ColumnPosition>;
ColumnPositions column_positions;
/// Should we read full column or only it's offsets
std::vector<bool> read_only_offsets;
@ -54,7 +58,13 @@ private:
void readData(const String & name, IColumn & column, const IDataType & type,
size_t from_mark, size_t column_position, size_t rows_to_read, bool only_offsets = false);
size_t getReadBufferSize();
/// Returns maximal value of granule size in compressed file from @mark_ranges.
/// This value is used as size of read buffer.
static size_t getReadBufferSize(
const DataPartPtr & data_part,
MergeTreeMarksLoader & marks_loader,
const ColumnPositions & column_positions,
const MarkRanges & mark_ranges);
};
}

View File

@ -0,0 +1,23 @@
<test>
<create_query>
CREATE TABLE mt_comp_parts
ENGINE = MergeTree
ORDER BY (c1, c2)
SETTINGS min_rows_for_wide_part = 1000000000 AS
SELECT *
FROM generateRandom('c1 UInt32, c2 UInt64, s1 String, arr1 Array(UInt32), c3 UInt64, s2 String', 0, 30, 30)
LIMIT 50000000
</create_query>
<settings>
<max_threads>8</max_threads>
</settings>
<query>SELECT count() FROM mt_comp_parts WHERE NOT ignore(c1)</query>
<query>SELECT count() FROM mt_comp_parts WHERE NOT ignore(c2, s1, arr1, s2)</query>
<query>SELECT count() FROM mt_comp_parts WHERE NOT ignore(c1, s1, c3)</query>
<query>SELECT count() FROM mt_comp_parts WHERE NOT ignore(c1, c2, c3)</query>
<query>SELECT count() FROM mt_comp_parts WHERE NOT ignore(*)</query>
<drop_query>DROP TABLE IF EXISTS mt_comp_parts</drop_query>
</test>