updated MergeTreePrewhereRangeReader; renamed MergeTreePrewhereRangeReader to MergeTreeRangeReader

This commit is contained in:
Nikolai Kochetov 2018-02-20 14:45:58 +03:00
parent af9ac7b48b
commit ce70d4faa9
6 changed files with 235 additions and 314 deletions

View File

@ -14,7 +14,6 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER; extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER;
extern const int LOGICAL_ERROR;
} }
@ -84,7 +83,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
auto estimateNumRows = [preferred_block_size_bytes, max_block_size_rows, auto estimateNumRows = [preferred_block_size_bytes, max_block_size_rows,
index_granularity, preferred_max_column_in_block_size_bytes, min_filtration_ratio]( index_granularity, preferred_max_column_in_block_size_bytes, min_filtration_ratio](
MergeTreeReadTask & task, MergeTreePrewhereRangeReader & reader) MergeTreeReadTask & task, MergeTreeRangeReader & reader)
{ {
if (!task.size_predictor) if (!task.size_predictor)
return max_block_size_rows; return max_block_size_rows;
@ -102,7 +101,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
size_t rows_to_read_for_max_size_column size_t rows_to_read_for_max_size_column
= task.size_predictor->estimateNumRowsForMaxSizeColumn(preferred_max_column_in_block_size_bytes); = task.size_predictor->estimateNumRowsForMaxSizeColumn(preferred_max_column_in_block_size_bytes);
double filtration_ratio = std::max(min_filtration_ratio, 1.0 - task.size_predictor->filtered_rows_ratio); double filtration_ratio = std::max(min_filtration_ratio, 1.0 - task.size_predictor->filtered_rows_ratio);
size_t rows_to_read_for_max_size_column_with_filtration auto rows_to_read_for_max_size_column_with_filtration
= static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio); = static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio);
/// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than index_granularity. /// If preferred_max_column_in_block_size_bytes is used, number of rows to read can be less than index_granularity.
@ -117,127 +116,49 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
return index_granularity * granule_to_read - reader.numReadRowsInCurrentGranule(); return index_granularity * granule_to_read - reader.numReadRowsInCurrentGranule();
}; };
if (!task->range_reader.isInitialized())
{
if (prewhere_actions) if (prewhere_actions)
{ {
do task->pre_range_reader = MergeTreeRangeReader(
{ pre_reader.get(), index_granularity, nullptr, prewhere_actions,
auto processNextRange = [this]()
{
const auto & range = task->mark_ranges.back();
task->pre_range_reader = pre_reader->readRange(
range.begin, range.end, nullptr, prewhere_actions,
&prewhere_column_name, &task->ordered_names, task->should_reorder); &prewhere_column_name, &task->ordered_names, task->should_reorder);
task->range_reader = reader->readRange( task->range_reader = MergeTreeRangeReader(
range.begin, range.end, &task->pre_range_reader, reader.get(), index_granularity, &task->pre_range_reader, nullptr, nullptr, nullptr, true);
nullptr, nullptr, &task->ordered_names, true); }
else
task->mark_ranges.pop_back();
};
auto resetRangeReaders = [this]()
{ {
task->range_reader.reset(); task->range_reader = MergeTreeRangeReader(
task->pre_range_reader.reset(); reader.get(), index_granularity, nullptr, nullptr,
}; nullptr, &task->ordered_names, task->should_reorder);
}
}
if (!task->range_reader)
processNextRange();
/// FIXME: size prediction model is updated by filtered rows, but it predicts size of unfiltered rows also
size_t recommended_rows = estimateNumRows(*task, task->range_reader); size_t recommended_rows = estimateNumRows(*task, task->range_reader);
size_t rows_to_read = std::max(static_cast<decltype(max_block_size_rows)>(1),
std::min(max_block_size_rows, recommended_rows));
if (res && recommended_rows < 1) auto read_result = task->range_reader.read(rows_to_read, task->mark_ranges);
break;
size_t space_left = std::max(static_cast<decltype(max_block_size_rows)>(1), std::min(max_block_size_rows, recommended_rows)); progressImpl({ read_result.block.rows(), read_result.block.bytes() });
size_t total_filtered_rows = 0;
while (!task->isFinished() && space_left && !isCancelled())
{
if (!task->range_reader)
processNextRange();
size_t rows_to_read = std::min(task->range_reader.numPendingRows(), space_left);
size_t filtered_rows = 0;
auto read_result = task->range_reader.read(res, rows_to_read);
if (task->size_predictor) if (task->size_predictor)
{ {
task->size_predictor->updateFilteredRowsRation( task->size_predictor->updateFilteredRowsRation(
read_result.getNumAddedRows() + read_result.getNumFilteredRows(), read_result.getNumAddedRows() + read_result.getNumFilteredRows(),
read_result.getNumFilteredRows()); read_result.getNumFilteredRows());
if (read_result.block)
task->size_predictor->update(read_result.block);
} }
total_filtered_rows += filtered_rows;
if (task->range_reader.isReadingFinished())
resetRangeReaders();
space_left -= rows_to_read;
}
if (res.rows() == 0)
{
res.clear();
return res;
}
progressImpl({ res.rows(), res.bytes() });
if (task->remove_prewhere_column && res.has(prewhere_column_name)) if (task->remove_prewhere_column && res.has(prewhere_column_name))
res.erase(prewhere_column_name); res.erase(prewhere_column_name);
if (task->size_predictor && res)
task->size_predictor->update(res);
res.checkNumberOfRows(); res.checkNumberOfRows();
} return read_result.block;
while (!task->isFinished() && !res && !isCancelled());
}
else
{
size_t space_left = std::max(static_cast<decltype(max_block_size_rows)>(1), max_block_size_rows);
while (!task->isFinished() && space_left && !isCancelled())
{
if (!task->range_reader)
{
auto & range = task->mark_ranges.back();
task->range_reader = reader->readRange(range.begin, range.end, nullptr,
nullptr, nullptr, &task->ordered_names, task->should_reorder);
task->mark_ranges.pop_back();
}
size_t rows_to_read = std::min(task->range_reader.numPendingRows(), space_left);
size_t recommended_rows = estimateNumRows(*task, task->range_reader);
if (res && recommended_rows < 1)
break;
rows_to_read = std::min(rows_to_read, std::max(static_cast<decltype(recommended_rows)>(1), recommended_rows));
auto read_result = task->range_reader.read(res, rows_to_read);
if (task->size_predictor)
{
task->size_predictor->updateFilteredRowsRation(
read_result.getNumAddedRows() + read_result.getNumFilteredRows(),
read_result.getNumFilteredRows());
}
if (task->range_reader.isReadingFinished())
task->range_reader.reset();
if (task->size_predictor && res)
task->size_predictor->update(res);
space_left -= rows_to_read;
}
progressImpl({ res.rows(), res.bytes() });
}
return res;
} }

View File

@ -46,11 +46,11 @@ struct MergeTreeReadTask
const bool should_reorder; const bool should_reorder;
/// Used to satistfy preferred_block_size_bytes limitation /// Used to satistfy preferred_block_size_bytes limitation
MergeTreeBlockSizePredictorPtr size_predictor; MergeTreeBlockSizePredictorPtr size_predictor;
/// used to save current range processing status /// Used to save current range processing status
MergeTreePrewhereRangeReader range_reader; MergeTreeRangeReader range_reader;
MergeTreePrewhereRangeReader pre_range_reader; MergeTreeRangeReader pre_range_reader;
bool isFinished() const { return mark_ranges.empty() && !range_reader; } bool isFinished() const { return mark_ranges.empty() && range_reader.isCurrentRangeFinished(); }
MergeTreeReadTask( MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part, const MarkRanges & mark_ranges, const size_t part_index_in_query, const MergeTreeData::DataPartPtr & data_part, const MarkRanges & mark_ranges, const size_t part_index_in_query,

View File

@ -10,7 +10,7 @@
namespace DB namespace DB
{ {
MergeTreePrewhereRangeReader::DelayedStream::DelayedStream( MergeTreeRangeReader::DelayedStream::DelayedStream(
size_t from_mark, size_t index_granularity, MergeTreeReader * merge_tree_reader) size_t from_mark, size_t index_granularity, MergeTreeReader * merge_tree_reader)
: current_mark(from_mark), current_offset(0), num_delayed_rows(0) : current_mark(from_mark), current_offset(0), num_delayed_rows(0)
, index_granularity(index_granularity), merge_tree_reader(merge_tree_reader) , index_granularity(index_granularity), merge_tree_reader(merge_tree_reader)
@ -18,13 +18,12 @@ MergeTreePrewhereRangeReader::DelayedStream::DelayedStream(
{ {
} }
size_t MergeTreePrewhereRangeReader::DelayedStream::position() const size_t MergeTreeRangeReader::DelayedStream::position() const
{ {
return current_mark * index_granularity + current_offset + num_delayed_rows; return current_mark * index_granularity + current_offset + num_delayed_rows;
} }
size_t MergeTreeRangeReader::DelayedStream::readRows(Block & block, size_t num_rows)
size_t MergeTreePrewhereRangeReader::DelayedStream::readRows(Block & block, size_t num_rows)
{ {
if (num_rows) if (num_rows)
{ {
@ -44,7 +43,7 @@ size_t MergeTreePrewhereRangeReader::DelayedStream::readRows(Block & block, size
return 0; return 0;
} }
size_t MergeTreePrewhereRangeReader::DelayedStream::read(Block & block, size_t from_mark, size_t offset, size_t num_rows) size_t MergeTreeRangeReader::DelayedStream::read(Block & block, size_t from_mark, size_t offset, size_t num_rows)
{ {
if (position() == from_mark * index_granularity + offset) if (position() == from_mark * index_granularity + offset)
{ {
@ -64,7 +63,7 @@ size_t MergeTreePrewhereRangeReader::DelayedStream::read(Block & block, size_t f
} }
} }
size_t MergeTreePrewhereRangeReader::DelayedStream::finalize(Block & block) size_t MergeTreeRangeReader::DelayedStream::finalize(Block & block)
{ {
if (current_offset && !continue_reading) if (current_offset && !continue_reading)
{ {
@ -86,27 +85,28 @@ size_t MergeTreePrewhereRangeReader::DelayedStream::finalize(Block & block)
return readRows(block, rows_to_read); return readRows(block, rows_to_read);
} }
MergeTreePrewhereRangeReader::Stream::Stream(size_t from_mark, size_t to_mark, size_t index_granularity,
MergeTreeReader * merge_tree_reader) MergeTreeRangeReader::Stream::Stream(
size_t from_mark, size_t to_mark, size_t index_granularity, MergeTreeReader * merge_tree_reader)
: current_mark(from_mark), offset_after_current_mark(0) : current_mark(from_mark), offset_after_current_mark(0)
, index_granularity(index_granularity), last_mark(to_mark) , index_granularity(index_granularity), last_mark(to_mark)
, stream(from_mark, index_granularity, merge_tree_reader) , stream(from_mark, index_granularity, merge_tree_reader)
{ {
} }
void MergeTreePrewhereRangeReader::Stream::checkNotFinished() const void MergeTreeRangeReader::Stream::checkNotFinished() const
{ {
if (isFinished()) if (isFinished())
throw Exception("Cannot read out of marks range.", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot read out of marks range.", ErrorCodes::LOGICAL_ERROR);
} }
void MergeTreePrewhereRangeReader::Stream::checkEnoughSpaceInCurrentGranula(size_t num_rows) const void MergeTreeRangeReader::Stream::checkEnoughSpaceInCurrentGranule(size_t num_rows) const
{ {
if (num_rows + offset_after_current_mark > index_granularity) if (num_rows + offset_after_current_mark > index_granularity)
throw Exception("Cannot read from granule more than index_granularity.", ErrorCodes::LOGICAL_ERROR); throw Exception("Cannot read from granule more than index_granularity.", ErrorCodes::LOGICAL_ERROR);
} }
size_t MergeTreePrewhereRangeReader::Stream::readRows(Block & block, size_t num_rows) size_t MergeTreeRangeReader::Stream::readRows(Block & block, size_t num_rows)
{ {
size_t rows_read = stream.read(block, current_mark, offset_after_current_mark, num_rows); size_t rows_read = stream.read(block, current_mark, offset_after_current_mark, num_rows);
@ -116,10 +116,9 @@ size_t MergeTreePrewhereRangeReader::Stream::readRows(Block & block, size_t num_
return rows_read; return rows_read;
} }
size_t MergeTreePrewhereRangeReader::Stream::read(Block & block, size_t num_rows, size_t MergeTreeRangeReader::Stream::read(Block & block, size_t num_rows, bool skip_remaining_rows_in_current_granule)
bool skip_remaining_rows_in_current_granule)
{ {
checkEnoughSpaceInCurrentGranula(num_rows); checkEnoughSpaceInCurrentGranule(num_rows);
if (num_rows) if (num_rows)
{ {
@ -153,12 +152,12 @@ size_t MergeTreePrewhereRangeReader::Stream::read(Block & block, size_t num_rows
} }
} }
void MergeTreePrewhereRangeReader::Stream::skip(size_t num_rows) void MergeTreeRangeReader::Stream::skip(size_t num_rows)
{ {
if (num_rows) if (num_rows)
{ {
checkNotFinished(); checkNotFinished();
checkEnoughSpaceInCurrentGranula(num_rows); checkEnoughSpaceInCurrentGranule(num_rows);
offset_after_current_mark += num_rows; offset_after_current_mark += num_rows;
@ -171,7 +170,7 @@ void MergeTreePrewhereRangeReader::Stream::skip(size_t num_rows)
} }
} }
size_t MergeTreePrewhereRangeReader::Stream::finalize(Block & block) size_t MergeTreeRangeReader::Stream::finalize(Block & block)
{ {
size_t read_rows = stream.finalize(block); size_t read_rows = stream.finalize(block);
@ -182,13 +181,13 @@ size_t MergeTreePrewhereRangeReader::Stream::finalize(Block & block)
} }
void MergeTreePrewhereRangeReader::ReadResult::addGranule(size_t num_rows) void MergeTreeRangeReader::ReadResult::addGranule(size_t num_rows)
{ {
rows_per_granule.push_back(num_rows); rows_per_granule.push_back(num_rows);
num_read_rows += num_rows; num_read_rows += num_rows;
} }
void MergeTreePrewhereRangeReader::ReadResult::adjustLastGranule(size_t num_rows_to_subtract) void MergeTreeRangeReader::ReadResult::adjustLastGranule(size_t num_rows_to_subtract)
{ {
if (rows_per_granule.empty()) if (rows_per_granule.empty())
throw Exception("Can't adjust last granule because no granules were added.", ErrorCodes::LOGICAL_ERROR); throw Exception("Can't adjust last granule because no granules were added.", ErrorCodes::LOGICAL_ERROR);
@ -202,7 +201,7 @@ void MergeTreePrewhereRangeReader::ReadResult::adjustLastGranule(size_t num_rows
num_read_rows -= num_rows_to_subtract; num_read_rows -= num_rows_to_subtract;
} }
void MergeTreePrewhereRangeReader::ReadResult::clear() void MergeTreeRangeReader::ReadResult::clear()
{ {
/// Need to save information about the number of granules. /// Need to save information about the number of granules.
rows_per_granule.assign(rows_per_granule.size(), 0); rows_per_granule.assign(rows_per_granule.size(), 0);
@ -211,9 +210,10 @@ void MergeTreePrewhereRangeReader::ReadResult::clear()
num_added_rows = 0; num_added_rows = 0;
num_zeros_in_filter = 0; num_zeros_in_filter = 0;
filter = nullptr; filter = nullptr;
block.clear();
} }
void MergeTreePrewhereRangeReader::ReadResult::optimize() void MergeTreeRangeReader::ReadResult::optimize()
{ {
if (num_read_rows == 0 || !filter) if (num_read_rows == 0 || !filter)
return; return;
@ -243,8 +243,7 @@ void MergeTreePrewhereRangeReader::ReadResult::optimize()
} }
} }
void MergeTreePrewhereRangeReader::ReadResult::collapseZeroTails(const IColumn::Filter & filter, void MergeTreeRangeReader::ReadResult::collapseZeroTails(const IColumn::Filter & filter, IColumn::Filter & new_filter)
IColumn::Filter & new_filter)
{ {
auto filter_data = filter.data(); auto filter_data = filter.data();
auto new_filter_data = new_filter.data(); auto new_filter_data = new_filter.data();
@ -273,8 +272,7 @@ void MergeTreePrewhereRangeReader::ReadResult::collapseZeroTails(const IColumn::
new_filter.resize(new_filter_data - new_filter.data()); new_filter.resize(new_filter_data - new_filter.data());
} }
size_t MergeTreeRangeReader::ReadResult::numZerosInTail(const UInt8 * begin, const UInt8 * end)
size_t MergeTreePrewhereRangeReader::ReadResult::numZerosInTail(const UInt8 * begin, const UInt8 * end)
{ {
size_t count = 0; size_t count = 0;
@ -314,7 +312,7 @@ size_t MergeTreePrewhereRangeReader::ReadResult::numZerosInTail(const UInt8 * be
return count; return count;
} }
size_t MergeTreePrewhereRangeReader::ReadResult::numZerosInFilter() const size_t MergeTreeRangeReader::ReadResult::numZerosInFilter() const
{ {
if (!filter) if (!filter)
return 0; return 0;
@ -335,8 +333,7 @@ size_t MergeTreePrewhereRangeReader::ReadResult::numZerosInFilter() const
return size - countBytesInFilter(*data); return size - countBytesInFilter(*data);
} }
void MergeTreeRangeReader::ReadResult::setFilter(ColumnPtr filter_)
void MergeTreePrewhereRangeReader::ReadResult::setFilter(ColumnPtr filter_)
{ {
if (!filter_ && filter) if (!filter_ && filter)
throw Exception("Can't remove exising filter with empty.", ErrorCodes::LOGICAL_ERROR); throw Exception("Can't remove exising filter with empty.", ErrorCodes::LOGICAL_ERROR);
@ -364,45 +361,111 @@ void MergeTreePrewhereRangeReader::ReadResult::setFilter(ColumnPtr filter_)
num_zeros_in_filter = num_zeros; num_zeros_in_filter = num_zeros;
} }
MergeTreePrewhereRangeReader::MergeTreePrewhereRangeReader(
MergeTreePrewhereRangeReader * prev_reader, MergeTreeReader * merge_tree_reader, MergeTreeRangeReader::MergeTreeRangeReader(
size_t from_mark, size_t to_mark, size_t index_granularity, MergeTreeReader * merge_tree_reader, size_t index_granularity,
ExpressionActionsPtr prewhere_actions, const String * prewhere_column_name, MergeTreeRangeReader * prev_reader, ExpressionActionsPtr prewhere_actions,
const Names * ordered_names, bool always_reorder) const String * prewhere_column_name, const Names * ordered_names, bool always_reorder)
: stream(from_mark, to_mark, index_granularity, merge_tree_reader) : index_granularity(index_granularity), merge_tree_reader(merge_tree_reader)
, prev_reader(prev_reader), prewhere_actions(std::move(prewhere_actions)) , prev_reader(prev_reader), prewhere_actions(std::move(prewhere_actions))
, prewhere_column_name(prewhere_column_name), ordered_names(ordered_names), always_reorder(always_reorder) , prewhere_column_name(prewhere_column_name), ordered_names(ordered_names)
, always_reorder(always_reorder), is_initialized(true)
{ {
} }
bool MergeTreeRangeReader::isReadingFinished() const
{
return prev_reader ? prev_reader->isReadingFinished() : stream.isFinished();
}
MergeTreePrewhereRangeReader::ReadResult MergeTreePrewhereRangeReader::read( size_t MergeTreeRangeReader::numReadRowsInCurrentGranule() const
Block & res, size_t max_rows) {
return prev_reader ? prev_reader->numReadRowsInCurrentGranule() : stream.numReadRowsInCurrentGranule();
}
size_t MergeTreeRangeReader::numPendingRowsInCurrentGranule() const
{
return prev_reader ? prev_reader->numPendingRowsInCurrentGranule() : stream.numPendingRowsInCurrentGranule();
}
size_t MergeTreeRangeReader::numPendingRows() const
{
return prev_reader ? prev_reader->numPendingRows() : stream.numPendingRows();
}
bool isCurrentRangeFinished() const
{
return prev_reader ? prev_reader->isCurrentRangeFinished() : stream.isFinished();
}
MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, MarkRanges & ranges)
{ {
if (max_rows == 0) if (max_rows == 0)
throw Exception("Expected at least 1 row to read, got 0.", ErrorCodes::LOGICAL_ERROR); throw Exception("Expected at least 1 row to read, got 0.", ErrorCodes::LOGICAL_ERROR);
if (max_rows > numPendingRows())
throw Exception("Want to read " + toString(max_rows) + " rows, but has only "
+ toString(numPendingRows()) + " pending rows.", ErrorCodes::LOGICAL_ERROR);
ReadResult read_result; ReadResult read_result;
if (prev_reader) if (prev_reader)
read_result = prev_reader->read(res, max_rows); {
read_result = prev_reader->read(max_rows, ranges);
continueReadingChain(read_result);
}
else
read_result = startReadingChain(max_rows, ranges);
readRows(res, max_rows, read_result); if (!read_result.block)
if (!res)
return read_result; return read_result;
executePrewhereActionsAndFilterColumns(res, read_result); executePrewhereActionsAndFilterColumns(read_result);
return read_result; return read_result;
} }
void MergeTreePrewhereRangeReader::readRows(Block & block, size_t max_rows, ReadResult & result) MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t max_rows, MarkRanges & ranges)
{ {
if (prev_reader && result.numReadRows() == 0) ReadResult result;
/// Stream is lazy. result.num_added_rows is the number of rows added to block which is not equal to
/// result.num_rows_read until call to stream.finalize(). Also result.num_added_rows may be less than
/// result.num_rows_read if the last granule in range also the last in part (so we have to adjust last granule).
{
size_t space_left = max_rows;
while (space_left && !stream.isFinished() && !ranges.empty())
{
if (stream.isFinished())
{
result.addRows(stream.finalize(result.block));
ranges.pop_back();
stream = Stream(ranges.back().begin, ranges.back().end, index_granularity, merge_tree_reader);
result.addRange(ranges.back());
ranges.pop_back();
}
auto rows_to_read = std::min(space_left, stream.numPendingRowsInCurrentGranule());
bool last = rows_to_read == space_left;
result.addRows(stream.read(result.block, rows_to_read, !last));
result.addGranule(rows_to_read);
space_left -= rows_to_read;
}
}
result.addRows(stream.finalize(result.block));
auto last_granule = result.rowsPerGranule().back();
auto added_rows =result.getNumAddedRows();
if (max_rows - last_granule > added_rows)
throw Exception("RangeReader expected reading of at least " + toString(max_rows - last_granule) +
" rows, but only " + toString(added_rows) + " was read.", ErrorCodes::LOGICAL_ERROR);
/// Last granule may be incomplete.
size_t adjustment = max_rows - added_rows;
result.adjustLastGranule(adjustment);
return result;
}
void MergeTreeRangeReader::continueReadingChain(ReadResult & result)
{
if (result.numReadRows() == 0)
{ {
/// If zero rows were read on prev step, than there is no more rows to read. /// If zero rows were read on prev step, than there is no more rows to read.
/// Last granule may have less rows than index_granularity, so finish reading manually. /// Last granule may have less rows than index_granularity, so finish reading manually.
@ -412,7 +475,6 @@ void MergeTreePrewhereRangeReader::readRows(Block & block, size_t max_rows, Read
size_t rows_to_skip_in_last_granule = 0; size_t rows_to_skip_in_last_granule = 0;
if (!result.rowsPerGranule().empty())
{ {
size_t rows_in_last_granule = result.rowsPerGranule().back(); size_t rows_in_last_granule = result.rowsPerGranule().back();
result.optimize(); result.optimize();
@ -425,83 +487,58 @@ void MergeTreePrewhereRangeReader::readRows(Block & block, size_t max_rows, Read
} }
} }
if (result.rowsPerGranule().empty())
{
/// Stream is lazy. result.num_added_rows is the number of rows added to block which is not equal to
/// result.num_rows_read until call to stream.finalize(). Also result.num_added_rows may be less than
/// result.num_rows_read if the last granule in range also the last in part (so we have to adjust last granule).
{
size_t space_left = max_rows;
while (space_left && !stream.isFinished())
{
auto rows_to_read = std::min(space_left, stream.numPendingRowsInCurrentGranule());
bool last = rows_to_read == space_left;
result.addRows(stream.read(block, rows_to_read, !last));
result.addGranule(rows_to_read);
space_left -= rows_to_read;
}
}
stream.skip(rows_to_skip_in_last_granule);
result.addRows(stream.finalize(block));
auto last_granule = result.rowsPerGranule().back();
auto added_rows =result.getNumAddedRows();
if (max_rows - last_granule > added_rows)
throw Exception("RangeReader expected reading of at least " + toString(max_rows - last_granule) +
" rows, but only " + toString(added_rows) + " was read.", ErrorCodes::LOGICAL_ERROR);
/// Last granule may be incomplete.
size_t adjustment = max_rows - added_rows;
result.adjustLastGranule(adjustment);
}
else
{
size_t added_rows = 0;
auto & rows_per_granule = result.rowsPerGranule(); auto & rows_per_granule = result.rowsPerGranule();
auto & started_ranges = result.startedRanges();
size_t added_rows = 0;
size_t next_range_to_start = 0;
auto size = rows_per_granule.size(); auto size = rows_per_granule.size();
for (auto i : ext::range(0, size)) for (auto i : ext::range(0, size))
{ {
if (next_range_to_start < started_ranges.size()
&& i == started_ranges[next_range_to_start].num_granules_read_before_start)
{
added_rows += stream.finalize(result.block);
auto & range = started_ranges[next_range_to_start].range;
stream = Stream(range.begin, range.end, index_granularity, merge_tree_reader);
}
bool last = i + 1 == size; bool last = i + 1 == size;
added_rows += stream.read(block, rows_per_granule[i], !last); added_rows += stream.read(result.block, rows_per_granule[i], !last);
} }
stream.skip(rows_to_skip_in_last_granule); stream.skip(rows_to_skip_in_last_granule);
added_rows += stream.finalize(block); added_rows += stream.finalize(result.block);
/// added_rows may be zero if all columns were read in prewhere and it's ok. /// added_rows may be zero if all columns were read in prewhere and it's ok.
if (added_rows && added_rows != result.numReadRows()) if (added_rows && added_rows != result.numReadRows())
throw Exception("RangeReader read " + toString(added_rows) + " rows, but " throw Exception("RangeReader read " + toString(added_rows) + " rows, but "
+ toString(result.numReadRows()) + " expected.", ErrorCodes::LOGICAL_ERROR); + toString(result.numReadRows()) + " expected.", ErrorCodes::LOGICAL_ERROR);
} }
}
void MergeTreePrewhereRangeReader::executePrewhereActionsAndFilterColumns(Block & block, ReadResult & result) void MergeTreeRangeReader::executePrewhereActionsAndFilterColumns(ReadResult & result)
{ {
const auto & columns = stream.reader()->getColumns(); const auto & columns = stream.reader()->getColumns();
auto filterColumns = [&block, &columns](const IColumn::Filter & filter) auto filterColumns = [&result, &columns](const IColumn::Filter & filter)
{ {
for (const auto & column : columns) for (const auto & column : columns)
{ {
if (block.has(column.name)) if (result.block.has(column.name))
{ {
auto & column_with_type_and_name = block.getByName(column.name); auto & column_with_type_and_name = result.block.getByName(column.name);
column_with_type_and_name.column = std::move(column_with_type_and_name.column)->filter(filter, -1); column_with_type_and_name.column = std::move(column_with_type_and_name.column)->filter(filter, -1);
} }
} }
}; };
auto filterBlock = [&block](const IColumn::Filter & filter) auto filterBlock = [&result](const IColumn::Filter & filter)
{ {
for (const auto i : ext::range(0, block.columns())) for (const auto i : ext::range(0, result.block.columns()))
{ {
auto & col = block.safeGetByPosition(i); auto & col = result.block.safeGetByPosition(i);
if (col.column && col.column->size() == filter.size()) if (col.column && col.column->size() == filter.size())
col.column = std::move(col.column)->filter(filter, -1); col.column = std::move(col.column)->filter(filter, -1);
@ -521,56 +558,18 @@ void MergeTreePrewhereRangeReader::executePrewhereActionsAndFilterColumns(Block
} }
} }
if (!columns.empty()) stream.reader()->fillMissingColumns(result.block, *ordered_names, always_reorder);
{
if (columns.size() == block.columns())
{
stream.reader()->fillMissingColumns(block, *ordered_names, always_reorder);
if (prewhere_actions)
prewhere_actions->execute(block);
}
else
{
/// Columns in block may have different size here. Create temporary block which has only read columns.
Block tmp_block;
for (const auto & column : columns)
{
if (block.has(column.name))
{
auto & column_with_type_and_name = block.getByName(column.name);
tmp_block.insert(column_with_type_and_name);
column_with_type_and_name.column = nullptr;
}
}
if (tmp_block)
stream.reader()->fillMissingColumns(tmp_block, *ordered_names, always_reorder);
if (prewhere_actions)
prewhere_actions->execute(tmp_block);
for (auto col_num : ext::range(0, block.columns()))
{
auto & column = block.getByPosition(col_num);
if (!tmp_block.has(column.name))
tmp_block.insert(std::move(column));
}
std::swap(block, tmp_block);
}
}
ColumnPtr filter; ColumnPtr filter;
if (prewhere_actions) if (prewhere_actions)
{ {
auto & prewhere_column = block.getByName(*prewhere_column_name); prewhere_actions->execute(result.block);
auto & prewhere_column = result.block.getByName(*prewhere_column_name);
ConstantFilterDescription constant_filter_description(*prewhere_column.column); ConstantFilterDescription constant_filter_description(*prewhere_column.column);
if (constant_filter_description.always_false) if (constant_filter_description.always_false)
{ {
result.clear(); result.clear();
block.clear();
return; return;
} }
else if (!constant_filter_description.always_true) else if (!constant_filter_description.always_true)
@ -580,14 +579,14 @@ void MergeTreePrewhereRangeReader::executePrewhereActionsAndFilterColumns(Block
filterBlock(*filter_and_holder.data); filterBlock(*filter_and_holder.data);
} }
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), UInt64(1)); prewhere_column.column = prewhere_column.type->createColumnConst(result.block.rows(), UInt64(1));
} }
if (filter && result.getFilter()) if (filter && result.getFilter())
{ {
/// TODO: implement for prewhere chain. /// TODO: implement for prewhere chain.
/// In order to do it we need combine filter and result.filter, where filter filters only '1' in result.filter. /// In order to do it we need combine filter and result.filter, where filter filters only '1' in result.filter.
throw Exception("MergeTreePrewhereRangeReader chain with several prewhere actions in not implemented.", throw Exception("MergeTreeRangeReader chain with several prewhere actions in not implemented.",
ErrorCodes::LOGICAL_ERROR); ErrorCodes::LOGICAL_ERROR);
} }

View File

@ -10,15 +10,14 @@ class MergeTreeReader;
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part. /// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
/// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark. /// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark.
/// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks. /// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks.
class MergeTreePrewhereRangeReader class MergeTreeRangeReader
{ {
public: public:
MergeTreePrewhereRangeReader(MergeTreePrewhereRangeReader * prev_reader, MergeTreeReader * merge_tree_reader, MergeTreeRangeReader(MergeTreeReader * merge_tree_reader, size_t index_granularity,
size_t from_mark, size_t to_mark, size_t index_granularity, MergeTreeRangeReader * prev_reader, ExpressionActionsPtr prewhere_actions,
ExpressionActionsPtr prewhere_actions, const String * prewhere_column_name, const String * prewhere_column_name, const Names * ordered_names, bool always_reorder);
const Names * ordered_names, bool always_reorder);
MergeTreePrewhereRangeReader() : is_initialized(false) { } MergeTreeRangeReader() = default;
bool isReadingFinished() const { return prev_reader ? prev_reader->isReadingFinished() : stream.isFinished(); } bool isReadingFinished() const { return prev_reader ? prev_reader->isReadingFinished() : stream.isFinished(); }
@ -26,13 +25,14 @@ public:
size_t numPendingRowsInCurrentGranule() const { return prev_reader ? prev_reader->numPendingRowsInCurrentGranule() : stream.numPendingRowsInCurrentGranule(); } size_t numPendingRowsInCurrentGranule() const { return prev_reader ? prev_reader->numPendingRowsInCurrentGranule() : stream.numPendingRowsInCurrentGranule(); }
size_t numPendingRows() const { return prev_reader ? prev_reader->numPendingRows() : stream.numPendingRows(); } size_t numPendingRows() const { return prev_reader ? prev_reader->numPendingRows() : stream.numPendingRows(); }
operator bool() const { return is_initialized; } bool isCurrentRangeFinished() const { return prev_reader ? prev_reader->isCurrentRangeFinished() : stream.isFinished(); }
void reset() { is_initialized = false; }
bool isInitialized() const { return is_initialized; }
class DelayedStream class DelayedStream
{ {
public: public:
DelayedStream() {} DelayedStream() = default;
DelayedStream(size_t from_mark, size_t index_granularity, MergeTreeReader * merge_tree_reader); DelayedStream(size_t from_mark, size_t index_granularity, MergeTreeReader * merge_tree_reader);
/// Returns the number of rows added to block. /// Returns the number of rows added to block.
@ -46,14 +46,14 @@ public:
MergeTreeReader * reader() const { return merge_tree_reader; } MergeTreeReader * reader() const { return merge_tree_reader; }
private: private:
size_t current_mark; size_t current_mark = 0;
size_t current_offset; size_t current_offset = 0;
size_t num_delayed_rows; size_t num_delayed_rows = 0;
size_t index_granularity; size_t index_granularity = 0;
MergeTreeReader * merge_tree_reader; MergeTreeReader * merge_tree_reader = nullptr;
bool continue_reading; bool continue_reading = false;
bool is_finished; bool is_finished = true;
size_t position() const; size_t position() const;
size_t readRows(Block & block, size_t num_rows); size_t readRows(Block & block, size_t num_rows);
@ -63,7 +63,7 @@ public:
{ {
public: public:
Stream() {} Stream() = default;
Stream(size_t from_mark, size_t to_mark, size_t index_granularity, MergeTreeReader * merge_tree_reader); Stream(size_t from_mark, size_t to_mark, size_t index_granularity, MergeTreeReader * merge_tree_reader);
/// Returns the n /// Returns the n
@ -82,17 +82,17 @@ public:
MergeTreeReader * reader() const { return stream.reader(); } MergeTreeReader * reader() const { return stream.reader(); }
private: private:
size_t current_mark; size_t current_mark = 0;
/// Invariant: offset_after_current_mark + skipped_rows_after_offset < index_granularity /// Invariant: offset_after_current_mark + skipped_rows_after_offset < index_granularity
size_t offset_after_current_mark; size_t offset_after_current_mark = 0;
size_t index_granularity; size_t index_granularity = 0;
size_t last_mark; size_t last_mark = 0;
DelayedStream stream; DelayedStream stream;
void checkNotFinished() const; void checkNotFinished() const;
void checkEnoughSpaceInCurrentGranula(size_t num_rows) const; void checkEnoughSpaceInCurrentGranule(size_t num_rows) const;
size_t readRows(Block & block, size_t num_rows); size_t readRows(Block & block, size_t num_rows);
}; };
@ -100,7 +100,15 @@ public:
class ReadResult class ReadResult
{ {
public: public:
struct RangeInfo
{
size_t num_granules_read_before_start;
MarkRange range;
};
const std::vector<RangeInfo> & startedRanges() const { return started_ranges; }
const std::vector<size_t> & rowsPerGranule() const { return rows_per_granule; } const std::vector<size_t> & rowsPerGranule() const { return rows_per_granule; }
/// The number of rows were read at LAST iteration in chain. <= num_added_rows + num_filtered_rows. /// The number of rows were read at LAST iteration in chain. <= num_added_rows + num_filtered_rows.
size_t numReadRows() const { return num_read_rows; } size_t numReadRows() const { return num_read_rows; }
/// The number of rows were added to block as a result of reading chain. /// The number of rows were added to block as a result of reading chain.
@ -113,6 +121,7 @@ public:
void addGranule(size_t num_rows); void addGranule(size_t num_rows);
void adjustLastGranule(size_t num_rows_to_subtract); void adjustLastGranule(size_t num_rows_to_subtract);
void addRows(size_t rows) { num_added_rows += rows; } void addRows(size_t rows) { num_added_rows += rows; }
void addRange(const MarkRange & range) { started_ranges.emplace_back(rows_per_granule.size(), range); }
/// Set filter or replace old one. Filter must have more zeroes than previous. /// Set filter or replace old one. Filter must have more zeroes than previous.
void setFilter(ColumnPtr filter_); void setFilter(ColumnPtr filter_);
@ -121,7 +130,10 @@ public:
/// Remove all rows from granules. /// Remove all rows from granules.
void clear(); void clear();
Block block;
private: private:
std::vector<RangeInfo> started_ranges;
/// The number of rows read from each granule. /// The number of rows read from each granule.
std::vector<size_t> rows_per_granule; std::vector<size_t> rows_per_granule;
/// Sum(rows_per_granule) /// Sum(rows_per_granule)
@ -140,22 +152,27 @@ public:
static size_t numZerosInTail(const UInt8 * begin, const UInt8 * end); static size_t numZerosInTail(const UInt8 * begin, const UInt8 * end);
}; };
ReadResult read(Block & res, size_t max_rows); ReadResult read(size_t max_rows, MarkRanges & ranges);
private: private:
void readRows(Block & block, size_t max_rows, ReadResult & result); ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
void executePrewhereActionsAndFilterColumns(Block & block, ReadResult & result); void continueReadingChain(ReadResult & result);
void executePrewhereActionsAndFilterColumns(ReadResult & result);
size_t index_granularity = 0;
MergeTreeReader * merge_tree_reader = nullptr;
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
ExpressionActionsPtr prewhere_actions = nullptr; /// If not nullptr, calculate filter.
const String * prewhere_column_name = nullptr;
const Names * ordered_names = nullptr;
Stream stream; Stream stream;
MergeTreePrewhereRangeReader * prev_reader; /// If not nullptr, read from prev_reader firstly.
ExpressionActionsPtr prewhere_actions; /// If not nullptr, calculate filter. bool always_reorder = true;
const String * prewhere_column_name; bool is_initialized = false;
const Names * ordered_names;
bool always_reorder;
bool is_initialized = true;
}; };
} }

View File

@ -66,17 +66,6 @@ const MergeTreeReader::ValueSizeMap & MergeTreeReader::getAvgValueSizeHints() co
} }
MergeTreePrewhereRangeReader MergeTreeReader::readRange(
size_t from_mark, size_t to_mark, MergeTreePrewhereRangeReader * prev_reader,
ExpressionActionsPtr prewhere_actions, const String * prewhere_column_name,
const Names * ordered_names, bool always_reorder)
{
return MergeTreePrewhereRangeReader(
prev_reader, this, from_mark, to_mark, storage.index_granularity,
prewhere_actions, prewhere_column_name, ordered_names, always_reorder);
}
size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res) size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res)
{ {
size_t read_rows = 0; size_t read_rows = 0;

View File

@ -38,11 +38,6 @@ public:
const ValueSizeMap & getAvgValueSizeHints() const; const ValueSizeMap & getAvgValueSizeHints() const;
/// Create MergeTreeRangeReader iterator, which allows reading arbitrary number of rows from range.
MergeTreePrewhereRangeReader readRange(size_t from_mark, size_t to_mark, MergeTreePrewhereRangeReader * prev_reader,
ExpressionActionsPtr prewhere_actions, const String * prewhere_column_name,
const Names * ordered_names, bool always_reorder);
/// Add columns from ordered_names that are not present in the block. /// Add columns from ordered_names that are not present in the block.
/// Missing columns are added in the order specified by ordered_names. /// Missing columns are added in the order specified by ordered_names.
/// If at least one column was added, reorders all columns in the block according to ordered_names. /// If at least one column was added, reorders all columns in the block according to ordered_names.
@ -121,7 +116,7 @@ private:
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res); size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res);
friend class MergeTreePrewhereRangeReader::DelayedStream; friend class MergeTreeRangeReader::DelayedStream;
}; };
} }