fixed number of rows to read from first granula in mergetree

This commit is contained in:
Nikolai Kochetov 2017-08-01 16:04:48 +03:00 committed by alexey-milovidov
parent d6833a0d55
commit 0c15b2c6fc
2 changed files with 22 additions and 5 deletions

View File

@ -13,6 +13,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int LOGICAL_ERROR;
}
@ -208,7 +209,9 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (!res)
{
if (!pre_range_reader)
{
task->current_range_reader = std::experimental::nullopt;
}
return res;
}
@ -297,7 +300,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (!task->current_range_reader)
{
if (next_range_idx == ranges_to_read.size())
throw Exception("Nothing to read");
throw Exception("Not enough ranges to read after prewhere.", ErrorCodes::LOGICAL_ERROR);
const auto & range = ranges_to_read[next_range_idx++];
task->current_range_reader = reader->readRange(range.begin, range.end);
}
@ -307,7 +310,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
/// Now we need to read the same number of rows as in prewhere.
size_t rows_to_read = next_range_idx == ranges_to_read.size()
? rows_was_read_in_last_range : task->current_range_reader->unreadRows();
? rows_was_read_in_last_range : (task->current_range_reader->unreadRows() - number_of_rows_to_skip);
auto readRows = [&]()
{
@ -352,8 +355,18 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (will_read_until_mark)
{
/// Can skip the rest of granule with false prewhere conditon right now.
current_range_rows_read += range_reader.skipToNextMark() - number_of_rows_to_skip;
number_of_rows_to_skip = 0;
do
{
size_t rows_was_skipped = range_reader.skipToNextMark();
if (number_of_rows_to_skip < rows_was_skipped)
{
current_range_rows_read += rows_was_skipped - number_of_rows_to_skip;
number_of_rows_to_skip = 0;
}
else
number_of_rows_to_skip -= rows_was_skipped;
}
while (number_of_rows_to_skip);
}
else
{
@ -364,7 +377,6 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
pre_filter_begin_pos = limit;
}
pre_filter_pos = limit;
}
@ -405,6 +417,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
/// Replace column with condition value from PREWHERE to a constant.
if (!task->remove_prewhere_column)
res.getByName(prewhere_column).column = DataTypeUInt8().createConstColumn(rows, UInt64(1));
}
else
throw Exception{
@ -445,7 +458,9 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
task->current_range_reader = std::experimental::nullopt;
if (res && task->size_predictor)
{
task->size_predictor->update(res);
}
space_left -= rows_was_read;
}

View File

@ -15,6 +15,8 @@ size_t MergeTreeRangeReader::skipToNextMark()
auto unread_rows_in_current_part = unreadRowsInCurrentGranule();
continue_reading = false;
++current_mark;
if (current_mark == last_mark)
is_reading_finished = true;
read_rows_after_current_mark = 0;
return unread_rows_in_current_part;
}