delay reading for granulas if prewhere column is false during partial granula reading

This commit is contained in:
Nikolai Kochetov 2017-06-20 13:12:20 +03:00 committed by alexey-milovidov
parent 86680f5513
commit 586dff9126
7 changed files with 98 additions and 52 deletions

View File

@ -103,11 +103,4 @@ void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t o
}
}
void CachedCompressedReadBuffer::position(size_t & offset_in_compressed_file, size_t & offset_in_decompressed_block) const
{
offset_in_compressed_file = file_pos - owned_cell->compressed_size;
offset_in_decompressed_block = pos - working_buffer.begin();
}
}

View File

@ -47,7 +47,6 @@ public:
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);
void position(size_t & offset_in_compressed_file, size_t & offset_in_decompressed_block) const;
void setProfileCallback(const ReadBufferFromFileBase::ProfileCallback & profile_callback_, clockid_t clock_type_ = CLOCK_MONOTONIC_COARSE)
{

View File

@ -70,12 +70,29 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (task->size_predictor)
task->size_predictor->startBlock();
const auto preferred_block_size_bytes = this->preferred_block_size_bytes;
// read rows from reader and clean columns
auto skipRows = [& preferred_block_size_bytes](Block & block, MergeTreeRangeReader & reader, MergeTreeReadTask & task, size_t rows) {
size_t recommended_rows = std::max<size_t>(1, task.size_predictor->estimateNumRows(preferred_block_size_bytes));
while (rows)
{
size_t rows_to_skip = std::min(rows, recommended_rows);
rows -= rows_to_skip;
reader.read(block, rows_to_skip);
for (const auto i : ext::range(0, block.columns()))
{
auto & col = block.safeGetByPosition(i);
if (task.column_name_set.count(col.name))
col.column = col.column->cloneEmpty();
}
}
};
if (prewhere_actions)
{
// LOG_TRACE(log, "readFromPart prewhere");
do
{
// LOG_TRACE(log, "readFromPart prewhe do");
/// Let's read the full block of columns needed to calculate the expression in PREWHERE.
size_t space_left = std::max(1LU, max_block_size_rows);
MarkRanges ranges_to_read;
@ -92,35 +109,36 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
space_left = std::min(space_left, std::max(1LU, recommended_rows));
}
// LOG_TRACE(log, "readFromPart prewhere key read " << space_left << "rows" );
std::experimental::optional<MergeTreeRangeReader> range_reader;
//LOG_TRACE(log, "******************* prewhere");
std::experimental::optional<MergeTreeRangeReader> pre_range_reader;
if (task->current_range_reader)
{
range_reader = task->current_range_reader->copyForReader(*pre_reader);
pre_range_reader = task->current_range_reader->copyForReader(*pre_reader);
if (task->unread_rows_in_current_granule)
{
// LOG_TRACE(log, "skipping " << task->unread_rows_in_current_granule << " rows");
pre_range_reader = pre_range_reader->skipRows(task->unread_rows_in_current_granule);
pre_range_reader->disableNextSeek();
}
rows_was_read.push_back(0);
}
while ((range_reader || !task->mark_ranges.empty()) && space_left && !isCancelled())
while ((pre_range_reader || !task->mark_ranges.empty()) && space_left && !isCancelled())
{
if (!range_reader)
if (!pre_range_reader)
{
auto & range = task->mark_ranges.back();
range_reader = pre_reader->readRange(range.begin, range.end);
pre_range_reader = pre_reader->readRange(range.begin, range.end);
ranges_to_read.push_back(range);
rows_was_read.push_back(0);
task->mark_ranges.pop_back();
}
size_t rows_to_read = std::min(range_reader->unreadRows(), space_left);
size_t read_rows = range_reader->read(res, rows_to_read);
size_t rows_to_read = std::min(pre_range_reader->unreadRows(), space_left);
size_t read_rows = pre_range_reader->read(res, rows_to_read);
rows_was_read.back() += read_rows;
if (range_reader->isReadingFinished())
{
if (range_reader->unreadRows())
LOG_TRACE(log, "??????????????? was read " << read_rows << " expected " << rows_to_read);
range_reader = std::experimental::nullopt;
}
if (pre_range_reader->isReadingFinished())
pre_range_reader = std::experimental::nullopt;
space_left -= read_rows;
}
@ -128,13 +146,11 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
/// In case of isCancelled.
if (!res)
{
if (!range_reader)
if (!pre_range_reader)
task->current_range_reader = std::experimental::nullopt;
return res;
}
// LOG_TRACE(log, "readFromPart prewhe actions " );
progressImpl({ res.rows(), res.bytes() });
pre_reader->fillMissingColumns(res, task->ordered_names, task->should_reorder);
@ -161,15 +177,14 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
*/
if (const auto column_const = typeid_cast<const ColumnConstUInt8 *>(observed_column.get()))
{
// LOG_TRACE(log, "readFromPart prewhe column_const " );
if (!column_const->getData())
{
if (range_reader)
if (pre_range_reader)
{
/// have to read rows from last partly read granula
auto & range = ranges_to_read.back();
task->current_range_reader = reader->readRange(range.begin, range.end);
task->current_range_reader->read(res, rows_was_read.back());
task->unread_rows_in_current_granule = rows_was_read.back();
}
else
task->current_range_reader = std::experimental::nullopt;
@ -182,6 +197,11 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (task->current_range_reader)
{
if (task->unread_rows_in_current_granule)
{
skipRows(res, *task->current_range_reader, *task, task->unread_rows_in_current_granule);
task->current_range_reader->read(res, task->unread_rows_in_current_granule);
}
task->current_range_reader->read(res, rows_was_read[rows_was_read_idx++]);
}
@ -191,16 +211,19 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
task->current_range_reader->read(res, rows_was_read[rows_was_read_idx++]);
}
if (!range_reader)
if (!pre_range_reader)
task->current_range_reader = std::experimental::nullopt;
task->unread_rows_in_current_granule = 0;
progressImpl({ 0, res.bytes() - pre_bytes });
}
else if (const auto column_vec = typeid_cast<const ColumnUInt8 *>(observed_column.get()))
{
// LOG_TRACE(log, "readFromPart prewhe column_vec " );
//LOG_TRACE(log, ">>>>>>>>>>>>>>>>>>>>>>>>>>>>> where");
const auto & pre_filter = column_vec->getData();
auto & additional_rows_to_read = task->unread_rows_in_current_granule;
if (!task->current_range_reader)
additional_rows_to_read = 0;
IColumn::Filter post_filter(pre_filter.size());
/// Let's read the rest of the columns in the required segments and compose our own filter for them.
@ -225,17 +248,17 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
while (current_range_rows_read + (pre_filter_pos - pre_filter_begin_pos) < rows_was_read[rows_was_read_idx])
{
auto rows_should_be_copied = pre_filter_pos - pre_filter_begin_pos;
auto range_reader_with_skipped_rows = range_reader.skipRows(rows_should_be_copied);
auto unread_rows_in_current_part = range_reader_with_skipped_rows.unreadRowsInCurrentPart();
auto range_reader_with_skipped_rows = range_reader.skipRows(additional_rows_to_read + rows_should_be_copied);
auto unread_rows_in_current_granule = range_reader_with_skipped_rows.unreadRowsInCurrentGranule();
const size_t limit = std::min(pre_filter.size(), pre_filter_pos + unread_rows_in_current_granule);
bool will_read_until_mark = unread_rows_in_current_granule == limit - pre_filter_pos;
const size_t limit = std::min(pre_filter.size(), pre_filter_pos + unread_rows_in_current_part);
UInt8 nonzero = 0;
for (size_t row = pre_filter_pos; row < limit; ++row)
nonzero |= pre_filter[row];
bool will_read_until_mark = unread_rows_in_current_part == limit - pre_filter_pos;
/// can't skip empty rows if won't read until mark
if (!nonzero && will_read_until_mark)
if (!nonzero)
{
if (pre_filter_pos != pre_filter_begin_pos)
{
@ -243,11 +266,31 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
memcpy(&post_filter[post_filter_pos], &pre_filter[pre_filter_begin_pos], rows);
post_filter_pos += rows;
current_range_rows_read += rows;
if (additional_rows_to_read)
{
skipRows(res, range_reader, *task, additional_rows_to_read);
// LOG_TRACE(log, "additional " << additional_rows_to_read);
}
range_reader.read(res, rows);
additional_rows_to_read = 0;
}
if (will_read_until_mark)
{
//if (additional_rows_to_read)
// LOG_TRACE(log, "additional skiped " << additional_rows_to_read);
current_range_rows_read += range_reader.skipToNextMark() - additional_rows_to_read;
additional_rows_to_read = 0;
}
else
{
//LOG_TRACE(log, "additional " << limit - pre_filter_pos << " rows to " << additional_rows_to_read << ", current_range_rows_read = " << current_range_rows_read);
//LOG_TRACE(log, rows_was_read_idx << ' ' << rows_was_read.size() << ' ' << pre_filter_begin_pos << ' ' << pre_filter_pos << ' ' << limit);
additional_rows_to_read += limit - pre_filter_pos;
current_range_rows_read += limit - pre_filter_pos;
}
pre_filter_begin_pos = pre_filter_pos = limit;
current_range_rows_read += range_reader.skipToNextMark();
}
else
pre_filter_pos = limit;
@ -259,7 +302,14 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
memcpy(&post_filter[post_filter_pos], &pre_filter[pre_filter_begin_pos], rows);
post_filter_pos += rows;
current_range_rows_read += rows;
if (additional_rows_to_read)
{
skipRows(res, range_reader, *task, additional_rows_to_read);
//LOG_TRACE(log, "additional " << additional_rows_to_read);
}
range_reader.read(res, rows);
additional_rows_to_read = 0;
}
if (rows_was_read_idx + 1 < rows_was_read.size())
@ -267,11 +317,9 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
++rows_was_read_idx;
}
if (!range_reader)
if (!pre_range_reader)
task->current_range_reader = std::experimental::nullopt;
// LOG_TRACE(log, "readFromPart prewhe column_vec " << post_filter_pos << "rows" );
if (!post_filter_pos)
{
res.clear();
@ -333,7 +381,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
{
size_t recommended_rows = task->size_predictor->estimateNumRows(preferred_block_size_bytes);
/// TODO: stop reading if recommended_rows small enough
/// TODO: stop reading if recommended_rows small enough, same in prewhere
if (res && recommended_rows < 1)
break;
rows_to_read = std::min(rows_to_read, std::max(1LU, recommended_rows));

View File

@ -48,6 +48,9 @@ struct MergeTreeReadTask
MergeTreeBlockSizePredictorPtr size_predictor;
/// used to save current range processing status
std::experimental::optional<MergeTreeRangeReader> current_range_reader;
/// the number of rows wasn't read by range_reader if condition in prewhere was false
/// halps to skip graunule if all conditions will be aslo false
std::size_t unread_rows_in_current_granule;
bool isFinished() const { return mark_ranges.empty() && !current_range_reader; }

View File

@ -13,14 +13,14 @@ MergeTreeRangeReader::MergeTreeRangeReader(
size_t MergeTreeRangeReader::skipToNextMark()
{
auto unread_rows_in_current_part = unreadRowsInCurrentPart();
auto unread_rows_in_current_part = unreadRowsInCurrentGranule();
seek_to_from_mark = true;
++current_mark;
read_rows_after_current_mark = 0;
return unread_rows_in_current_part;
}
const MergeTreeRangeReader MergeTreeRangeReader::skipRows(size_t rows) const
MergeTreeRangeReader MergeTreeRangeReader::skipRows(size_t rows) const
{
MergeTreeRangeReader copy = *this;
copy.read_rows_after_current_mark += rows;
@ -38,6 +38,7 @@ size_t MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read)
return false;
auto read_rows = merge_tree_reader.get().readRange(current_mark, seek_to_from_mark, rows_to_read, res);
//LOG_TRACE(logger, "to read: " << rows_to_read << " read: " << read_rows << " max: " << max_rows_to_read);
/// if no columns to read, consider all rows was read
if (!read_rows)
read_rows = rows_to_read;

View File

@ -14,15 +14,19 @@ public:
size_t unreadRows() const {
return (last_mark - current_mark) * index_granularity - read_rows_after_current_mark;
}
size_t unreadRowsInCurrentPart() const {
size_t unreadRowsInCurrentGranule() const {
return index_granularity - read_rows_after_current_mark;
}
size_t readRowsInCurrentGranule() const { return read_rows_after_current_mark; }
size_t skipToNextMark();
const MergeTreeRangeReader skipRows(size_t rows) const;
MergeTreeRangeReader skipRows(size_t rows) const;
size_t read(Block & res, size_t max_rows_to_read);
bool isReadingFinished() const { return is_reading_finished; }
void disableNextSeek() { seek_to_from_mark = false; }
~MergeTreeRangeReader() {
//if (last_mark != current_mark)
// LOG_ERROR(logger, "last_mark = " << last_mark << " current_mark = " << current_mark << " read_rows_after_current_mark = " << read_rows_after_current_mark);

View File

@ -41,8 +41,6 @@ public:
/// If columns are not present in the block, adds them. If they are present - appends the values that have been read.
/// Do not adds columns, if the files are not present for them (to add them, call fillMissingColumns).
/// Block should contain either no columns from the columns field, or all columns for which files are present.
size_t readRange(size_t from_mark, size_t to_mark, Block & res)
{ return readRange(from_mark, true, (to_mark - from_mark) * storage.index_granularity, res); }
MergeTreeRangeReader readRange(size_t from_mark, size_t to_mark);
/// Add columns from ordered_names that are not present in the block.
@ -105,7 +103,6 @@ private:
MergeTreeData::DataPartPtr data_part;
FileStreams streams;
// size_t cur_mark_idx = 0; /// Mark index corresponding to the current position for all streams.
/// Columns that are read.
NamesAndTypesList columns;
@ -131,6 +128,7 @@ private:
void fillMissingColumnsImpl(Block & res, const Names & ordered_names, bool always_reorder);
/// return the number of rows has been read or zero if ther is no columns to read
size_t readRange(size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read, Block & res);
friend class MergeTreeRangeReader;