added MergeTreePrewhereRangeReader

This commit is contained in:
Nikolai Kochetov 2018-02-13 22:34:15 +03:00
parent 271afb0301
commit 82c76d8467
6 changed files with 788 additions and 373 deletions

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeBaseBlockInputStream.h> #include <Storages/MergeTree/MergeTreeBaseBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeRangeReader.h>
#include <Storages/MergeTree/MergeTreeReader.h> #include <Storages/MergeTree/MergeTreeReader.h>
#include <Storages/MergeTree/MergeTreeBlockReadUtils.h> #include <Storages/MergeTree/MergeTreeBlockReadUtils.h>
#include <Columns/FilterDescription.h> #include <Columns/FilterDescription.h>
@ -83,7 +84,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
auto estimateNumRows = [preferred_block_size_bytes, max_block_size_rows, auto estimateNumRows = [preferred_block_size_bytes, max_block_size_rows,
index_granularity, preferred_max_column_in_block_size_bytes, min_filtration_ratio]( index_granularity, preferred_max_column_in_block_size_bytes, min_filtration_ratio](
MergeTreeReadTask & task, MergeTreeRangeReader & reader) MergeTreeReadTask & task, MergeTreePrewhereRangeReader & reader)
{ {
if (!task.size_predictor) if (!task.size_predictor)
return max_block_size_rows; return max_block_size_rows;
@ -116,315 +117,84 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
return index_granularity * granule_to_read - reader.numReadRowsInCurrentGranule(); return index_granularity * granule_to_read - reader.numReadRowsInCurrentGranule();
}; };
// read rows from reader and clear columns
auto skipRows = [&estimateNumRows](
Block & block, MergeTreeRangeReader & reader, MergeTreeReadTask & task, size_t rows)
{
size_t recommended_rows = rows;
if (task.size_predictor)
recommended_rows = std::max<size_t>(1, estimateNumRows(task, reader));
while (rows)
{
size_t rows_to_skip = std::min(rows, recommended_rows);
rows -= rows_to_skip;
reader.read(block, rows_to_skip);
for (const auto i : ext::range(0, block.columns()))
{
auto & col = block.getByPosition(i);
if (task.column_name_set.count(col.name))
{
if (const ColumnArray * column_array = typeid_cast<const ColumnArray *>(col.column.get()))
{
/// ColumnArray columns in block could have common offset column, which is used while reading.
/// This is in case of nested data structures.
/// TODO Very dangerous and unclear. Get rid of this after implemented full-featured Nested data type.
/// Have to call resize(0) instead of cloneEmpty to save structure.
/// (To keep offsets possibly shared between different arrays.)
static_cast<ColumnArray &>(*column_array->assumeMutable()).getOffsets().resize(0);
/// It's ok until multidimensional arrays are not stored in MergeTree.
static_cast<ColumnArray &>(*column_array->assumeMutable()).getDataPtr() = column_array->getDataPtr()->cloneEmpty();
}
else
col.column = col.column->cloneEmpty();
}
}
}
};
if (prewhere_actions) if (prewhere_actions)
{ {
do do
{ {
/// Let's read the full block of columns needed to calculate the expression in PREWHERE. auto processNextRange = [this]()
MarkRanges ranges_to_read;
/// Last range may be partl read. The same number of rows we need to read after prewhere
size_t rows_was_read_in_last_range = 0;
std::optional<MergeTreeRangeReader> pre_range_reader;
auto processNextRange = [& ranges_to_read, & rows_was_read_in_last_range, & pre_range_reader](
MergeTreeReadTask & task, MergeTreeReader & pre_reader)
{ {
auto & range = task.mark_ranges.back(); const auto & range = task->mark_ranges.back();
pre_range_reader = pre_reader.readRange(range.begin, range.end); task->pre_range_reader = pre_reader->readRange(
ranges_to_read.push_back(range); range.begin, range.end, nullptr, prewhere_actions,
rows_was_read_in_last_range = 0; &prewhere_column_name, &task->ordered_names, task->should_reorder);
task.mark_ranges.pop_back();
task->range_reader = reader->readRange(
range.begin, range.end, &task->pre_range_reader,
nullptr, nullptr, &task->ordered_names, true);
task->mark_ranges.pop_back();
}; };
if (task->current_range_reader) auto resetRangeReaders = [this]()
{ {
/// Havn't finihsed reading at last step. Copy state for prewhere columns task->range_reader.reset();
pre_range_reader = task->current_range_reader->copyForReader(*pre_reader); task->pre_range_reader.reset();
if (task->number_of_rows_to_skip) };
{
/// number_of_rows_to_skip already was read for prewhere columns. skip them. if (!task->range_reader)
pre_range_reader = pre_range_reader->getFutureState(task->number_of_rows_to_skip); processNextRange();
pre_range_reader->disableNextSeek();
}
}
else
processNextRange(*task, *pre_reader);
/// FIXME: size prediction model is updated by filtered rows, but it predicts size of unfiltered rows also /// FIXME: size prediction model is updated by filtered rows, but it predicts size of unfiltered rows also
size_t recommended_rows = estimateNumRows(*task, *pre_range_reader); size_t recommended_rows = estimateNumRows(*task, task->range_reader);
if (res && recommended_rows < 1) if (res && recommended_rows < 1)
break; break;
size_t space_left = std::max(static_cast<decltype(max_block_size_rows)>(1), std::min(max_block_size_rows, recommended_rows)); size_t space_left = std::max(static_cast<decltype(max_block_size_rows)>(1), std::min(max_block_size_rows, recommended_rows));
while ((pre_range_reader || !task->mark_ranges.empty()) && space_left && !isCancelled()) size_t total_filtered_rows = 0;
while (!task->isFinished() && space_left && !isCancelled())
{ {
if (!pre_range_reader) if (!task->range_reader)
processNextRange(*task, *pre_reader); processNextRange();
size_t rows_to_read = std::min(pre_range_reader->numPendingRows(), space_left); size_t rows_to_read = std::min(task->range_reader.numPendingRows(), space_left);
size_t read_rows = pre_range_reader->read(res, rows_to_read); size_t filtered_rows = 0;
rows_was_read_in_last_range += read_rows;
if (pre_range_reader->isReadingFinished())
pre_range_reader.reset();
space_left -= read_rows; auto read_result = task->range_reader.read(res, rows_to_read);
if (task->size_predictor)
{
task->size_predictor->updateFilteredRowsRation(
read_result.getNumAddedRows() + read_result.getNumFilteredRows(),
read_result.getNumFilteredRows());
}
total_filtered_rows += filtered_rows;
if (task->range_reader.isReadingFinished())
resetRangeReaders();
space_left -= rows_to_read;
} }
/// In case of isCancelled. if (res.rows() == 0)
if (!res)
{ {
task->current_range_reader.reset(); res.clear();
return res; return res;
} }
progressImpl({ res.rows(), res.bytes() }); progressImpl({ res.rows(), res.bytes() });
pre_reader->fillMissingColumns(res, task->ordered_names, task->should_reorder);
/// Compute the expression in PREWHERE. if (task->remove_prewhere_column && res.has(prewhere_column_name))
prewhere_actions->execute(res);
ColumnPtr prewhere_column = res.getByName(prewhere_column_name).column;
if (task->remove_prewhere_column)
res.erase(prewhere_column_name); res.erase(prewhere_column_name);
const auto pre_bytes = res.bytes(); if (task->size_predictor && res)
task->size_predictor->update(res);
ConstantFilterDescription constant_filter_description(*prewhere_column);
/** If the filter is a constant (for example, it says PREWHERE 0), res.checkNumberOfRows();
* then either return an empty block, or return the block unchanged.
*/
if (constant_filter_description.always_false)
{
/*
If this filter is PREWHERE 0, MergeTree Stream can be marked as done,
and this task can be clear.
If we don't mark this task finished here, readImpl could
jump into endless loop.
Error scenario:
select * from table where isNull(NOT_NULLABLE_COLUMN) AND OTHER PRED;
and isNull pred is promoted to PREWHERE.
(though it is difficult to reproduce)
*/
task->current_range_reader.reset();
task->mark_ranges.clear();
res.clear();
return res;
}
else if (constant_filter_description.always_true)
{
if (task->current_range_reader)
{
if (task->number_of_rows_to_skip)
skipRows(res, *task->current_range_reader, *task, task->number_of_rows_to_skip);
size_t rows_to_read = ranges_to_read.empty()
? rows_was_read_in_last_range : task->current_range_reader->numPendingRows();
task->current_range_reader->read(res, rows_to_read);
}
for (auto range_idx : ext::range(0, ranges_to_read.size()))
{
const auto & range = ranges_to_read[range_idx];
task->current_range_reader = reader->readRange(range.begin, range.end);
size_t rows_to_read = range_idx + 1 == ranges_to_read.size()
? rows_was_read_in_last_range : task->current_range_reader->numPendingRows();
task->current_range_reader->read(res, rows_to_read);
}
if (!pre_range_reader)
task->current_range_reader.reset();
task->number_of_rows_to_skip = 0;
progressImpl({ 0, res.bytes() - pre_bytes });
}
else
{
FilterDescription filter_and_holder(*prewhere_column);
const auto & pre_filter = *filter_and_holder.data;
auto & number_of_rows_to_skip = task->number_of_rows_to_skip;
if (!task->current_range_reader)
number_of_rows_to_skip = 0;
IColumn::Filter post_filter(pre_filter.size());
/// Let's read the rest of the columns in the required segments and compose our own filter for them.
size_t pre_filter_pos = 0;
size_t post_filter_pos = 0;
size_t next_range_idx = 0;
while (pre_filter_pos < pre_filter.size())
{
if (!task->current_range_reader)
{
if (next_range_idx == ranges_to_read.size())
throw Exception("Not enough ranges to read after prewhere.", ErrorCodes::LOGICAL_ERROR);
const auto & range = ranges_to_read[next_range_idx++];
task->current_range_reader = reader->readRange(range.begin, range.end);
}
MergeTreeRangeReader & range_reader = *task->current_range_reader;
size_t current_range_rows_read = 0;
auto pre_filter_begin_pos = pre_filter_pos;
/// Now we need to read the same number of rows as in prewhere.
size_t rows_to_read = next_range_idx == ranges_to_read.size()
? rows_was_read_in_last_range : (task->current_range_reader->numPendingRows() - number_of_rows_to_skip);
auto readRows = [&]()
{
if (pre_filter_pos != pre_filter_begin_pos)
{
/// Fulfilling the promise to read (pre_filter_pos - pre_filter_begin_pos) rows
auto rows = pre_filter_pos - pre_filter_begin_pos;
memcpy(&post_filter[post_filter_pos], &pre_filter[pre_filter_begin_pos], rows);
post_filter_pos += rows;
current_range_rows_read += rows;
if (number_of_rows_to_skip)
{
/** Wasn't able to skip 'number_of_rows_to_skip' with false prewhere conditon
* Just read them and throw away. */
skipRows(res, range_reader, *task, number_of_rows_to_skip);
number_of_rows_to_skip = 0;
}
range_reader.read(res, rows);
}
};
/** (pre_filter_pos - pre_filter_begin_pos) here is the number of rows we promies to read, but
haven't read yet to merge consecutive nonempy granulas. */
while (current_range_rows_read + (pre_filter_pos - pre_filter_begin_pos) < rows_to_read)
{
auto rows_should_be_copied = pre_filter_pos - pre_filter_begin_pos;
auto range_reader_with_skipped_rows = range_reader.getFutureState(number_of_rows_to_skip + rows_should_be_copied);
auto unread_rows_in_current_granule = range_reader_with_skipped_rows.numPendingRowsInCurrentGranule();
const size_t limit = std::min(pre_filter.size(), pre_filter_pos + unread_rows_in_current_granule);
bool will_read_until_mark = unread_rows_in_current_granule == limit - pre_filter_pos;
UInt8 nonzero = 0;
for (size_t row = pre_filter_pos; row < limit; ++row)
nonzero |= pre_filter[row];
if (!nonzero)
{
/// Zero! Prewhere condition is false for all (limit - pre_filter_pos) rows.
readRows();
if (will_read_until_mark)
{
/// Can skip the rest of granule with false prewhere conditon right now.
do
{
size_t rows_was_skipped = range_reader.skipToNextMark();
if (number_of_rows_to_skip < rows_was_skipped)
{
current_range_rows_read += rows_was_skipped - number_of_rows_to_skip;
number_of_rows_to_skip = 0;
}
else
number_of_rows_to_skip -= rows_was_skipped;
}
while (number_of_rows_to_skip);
}
else
{
/// Here reading seems to be done. It's still possible to skip rows during next reading.
number_of_rows_to_skip += limit - pre_filter_pos;
current_range_rows_read += limit - pre_filter_pos;
}
pre_filter_begin_pos = limit;
}
pre_filter_pos = limit;
}
readRows();
if (next_range_idx != ranges_to_read.size())
task->current_range_reader.reset();
}
if (!pre_range_reader)
task->current_range_reader.reset();
if (!post_filter_pos)
{
if (task->size_predictor)
task->size_predictor->updateFilteredRowsRation(pre_filter.size(), pre_filter.size());
res.clear();
continue;
}
progressImpl({ 0, res.bytes() - pre_bytes });
post_filter.resize(post_filter_pos);
/// Filter the columns related to PREWHERE using pre_filter,
/// other columns - using post_filter.
size_t rows = 0;
for (const auto i : ext::range(0, res.columns()))
{
auto & col = res.safeGetByPosition(i);
if (col.name == prewhere_column_name && res.columns() > 1)
continue;
col.column =
col.column->filter(task->column_name_set.count(col.name) ? post_filter : pre_filter, -1);
rows = col.column->size();
}
if (task->size_predictor)
task->size_predictor->updateFilteredRowsRation(pre_filter.size(), pre_filter.size() - rows);
/// Replace column with condition value from PREWHERE to a constant.
if (!task->remove_prewhere_column)
res.getByName(prewhere_column_name).column = DataTypeUInt8().createColumnConst(rows, UInt64(1));
}
if (res)
{
if (task->size_predictor)
task->size_predictor->update(res);
reader->fillMissingColumns(res, task->ordered_names, true);
res.checkNumberOfRows();
}
} }
while (!task->isFinished() && !res && !isCancelled()); while (!task->isFinished() && !res && !isCancelled());
} }
@ -433,37 +203,38 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
size_t space_left = std::max(static_cast<decltype(max_block_size_rows)>(1), max_block_size_rows); size_t space_left = std::max(static_cast<decltype(max_block_size_rows)>(1), max_block_size_rows);
while (!task->isFinished() && space_left && !isCancelled()) while (!task->isFinished() && space_left && !isCancelled())
{ {
if (!task->current_range_reader) if (!task->range_reader)
{ {
auto & range = task->mark_ranges.back(); auto & range = task->mark_ranges.back();
task->current_range_reader = reader->readRange(range.begin, range.end); task->range_reader = reader->readRange(range.begin, range.end, nullptr,
nullptr, nullptr, &task->ordered_names, task->should_reorder);
task->mark_ranges.pop_back(); task->mark_ranges.pop_back();
} }
size_t rows_to_read = space_left; size_t rows_to_read = std::min(task->range_reader.numPendingRows(), space_left);
size_t recommended_rows = estimateNumRows(*task, *task->current_range_reader); size_t recommended_rows = estimateNumRows(*task, task->range_reader);
if (res && recommended_rows < 1) if (res && recommended_rows < 1)
break; break;
rows_to_read = std::min(rows_to_read, std::max(static_cast<decltype(recommended_rows)>(1), recommended_rows)); rows_to_read = std::min(rows_to_read, std::max(static_cast<decltype(recommended_rows)>(1), recommended_rows));
size_t rows_was_read = task->current_range_reader->read(res, rows_to_read); auto read_result = task->range_reader.read(res, rows_to_read);
if (task->current_range_reader->isReadingFinished()) if (task->size_predictor)
task->current_range_reader.reset();
if (res && task->size_predictor)
{ {
task->size_predictor->update(res); task->size_predictor->updateFilteredRowsRation(
read_result.getNumAddedRows() + read_result.getNumFilteredRows(),
read_result.getNumFilteredRows());
} }
space_left -= rows_was_read; if (task->range_reader.isReadingFinished())
task->range_reader.reset();
if (task->size_predictor && res)
task->size_predictor->update(res);
space_left -= rows_to_read;
} }
/// In the case of isCancelled.
if (!res)
return res;
progressImpl({ res.rows(), res.bytes() }); progressImpl({ res.rows(), res.bytes() });
reader->fillMissingColumns(res, task->ordered_names, task->should_reorder);
} }
return res; return res;

View File

@ -47,12 +47,10 @@ struct MergeTreeReadTask
/// Used to satistfy preferred_block_size_bytes limitation /// Used to satistfy preferred_block_size_bytes limitation
MergeTreeBlockSizePredictorPtr size_predictor; MergeTreeBlockSizePredictorPtr size_predictor;
/// used to save current range processing status /// used to save current range processing status
std::optional<MergeTreeRangeReader> current_range_reader; MergeTreePrewhereRangeReader range_reader;
/// the number of rows wasn't read by range_reader if condition in prewhere was false MergeTreePrewhereRangeReader pre_range_reader;
/// helps to skip graunule if all conditions will be aslo false
size_t number_of_rows_to_skip;
bool isFinished() const { return mark_ranges.empty() && !current_range_reader; } bool isFinished() const { return mark_ranges.empty() && !range_reader; }
MergeTreeReadTask( MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part, const MarkRanges & mark_ranges, const size_t part_index_in_query, const MergeTreeData::DataPartPtr & data_part, const MarkRanges & mark_ranges, const size_t part_index_in_query,

View File

@ -1,70 +1,598 @@
#include <Storages/MergeTree/MergeTreeReader.h> #include <Storages/MergeTree/MergeTreeReader.h>
#include <Columns/FilterDescription.h>
#include <ext/range.h>
#include <Columns/ColumnsCommon.h>
#if __SSE2__
#include <emmintrin.h>
#endif
namespace DB namespace DB
{ {
MergeTreeRangeReader::MergeTreeRangeReader( MergeTreePrewhereRangeReader::DelayedStream::DelayedStream(
MergeTreeReader & merge_tree_reader, size_t from_mark, size_t to_mark, size_t index_granularity) size_t from_mark, size_t index_granularity, MergeTreeReader * merge_tree_reader)
: merge_tree_reader(merge_tree_reader), current_mark(from_mark), last_mark(to_mark) : current_mark(from_mark), current_offset(0), num_delayed_rows(0)
, index_granularity(index_granularity) , index_granularity(index_granularity), merge_tree_reader(merge_tree_reader)
, continue_reading(false), is_finished(false)
{ {
} }
size_t MergeTreeRangeReader::skipToNextMark() size_t MergeTreePrewhereRangeReader::DelayedStream::position() const
{ {
auto unread_rows_in_current_part = numPendingRowsInCurrentGranule(); return current_mark * index_granularity + current_offset + num_delayed_rows;
continue_reading = false;
++current_mark;
if (current_mark == last_mark)
is_reading_finished = true;
read_rows_after_current_mark = 0;
return unread_rows_in_current_part;
} }
MergeTreeRangeReader MergeTreeRangeReader::getFutureState(size_t rows_to_read) const
size_t MergeTreePrewhereRangeReader::DelayedStream::readRows(Block & block, size_t num_rows)
{ {
MergeTreeRangeReader copy = *this; if (num_rows)
copy.read_rows_after_current_mark += rows_to_read; {
size_t read_parts = copy.read_rows_after_current_mark / index_granularity; size_t rows_read = merge_tree_reader->readRows(current_mark, continue_reading, num_rows, block);
copy.current_mark += read_parts; continue_reading = true;
copy.read_rows_after_current_mark -= index_granularity * read_parts;
return copy; /// Zero rows_read my be either because reading has finished
/// or because there is no columns we can read in current part (for example, all columns are default).
/// In the last case we can't finish reading, but it's also ok for the first case
/// because we can finish reading by calculation the number of pending rows.
if (0 < rows_read && rows_read < num_rows)
is_finished = true;
return rows_read;
}
return 0;
} }
size_t MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read) size_t MergeTreePrewhereRangeReader::DelayedStream::read(Block & block, size_t from_mark, size_t offset, size_t num_rows)
{ {
size_t rows_to_read = numPendingRows(); if (position() == from_mark * index_granularity + offset)
rows_to_read = std::min(rows_to_read, max_rows_to_read); {
if (rows_to_read == 0) num_delayed_rows += num_rows;
throw Exception("Logical error: 0 rows to read.", ErrorCodes::LOGICAL_ERROR); return 0;
}
else
{
size_t read_rows = finalize(block);
auto read_rows = merge_tree_reader.get().readRows(current_mark, continue_reading, rows_to_read, res); continue_reading = false;
current_mark = from_mark;
current_offset = offset;
num_delayed_rows = num_rows;
if (read_rows && read_rows < rows_to_read) return read_rows;
is_reading_finished = true; }
}
if (!read_rows) size_t MergeTreePrewhereRangeReader::DelayedStream::finalize(Block & block)
read_rows = rows_to_read; {
if (current_offset && !continue_reading)
{
size_t granules_to_skip = current_offset / index_granularity;
current_mark += granules_to_skip;
current_offset -= granules_to_skip * index_granularity;
continue_reading = true; if (current_offset)
{
Block temp_block;
readRows(temp_block, current_offset);
}
}
read_rows_after_current_mark += read_rows; size_t rows_to_read = num_delayed_rows;
size_t read_parts = read_rows_after_current_mark / index_granularity; current_offset += num_delayed_rows;
current_mark += read_parts; num_delayed_rows = 0;
read_rows_after_current_mark -= index_granularity * read_parts;
if (current_mark == last_mark) return readRows(block, rows_to_read);
is_reading_finished = true; }
MergeTreePrewhereRangeReader::Stream::Stream(size_t from_mark, size_t to_mark, size_t index_granularity,
MergeTreeReader * merge_tree_reader)
: current_mark(from_mark), offset_after_current_mark(0)
, index_granularity(index_granularity), last_mark(to_mark)
, stream(from_mark, index_granularity, merge_tree_reader)
{
}
void MergeTreePrewhereRangeReader::Stream::checkNotFinished() const
{
if (isFinished())
throw Exception("Cannot read out of marks range.", ErrorCodes::LOGICAL_ERROR);
}
void MergeTreePrewhereRangeReader::Stream::checkEnoughSpaceInCurrentGranula(size_t num_rows) const
{
if (num_rows + offset_after_current_mark > index_granularity)
throw Exception("Cannot read from granule more than index_granularity.", ErrorCodes::LOGICAL_ERROR);
}
size_t MergeTreePrewhereRangeReader::Stream::readRows(Block & block, size_t num_rows)
{
size_t rows_read = stream.read(block, current_mark, offset_after_current_mark, num_rows);
if (stream.isFinished())
finish();
return rows_read;
}
size_t MergeTreePrewhereRangeReader::Stream::read(Block & block, size_t num_rows,
bool skip_remaining_rows_in_current_granule)
{
checkEnoughSpaceInCurrentGranula(num_rows);
if (num_rows)
{
checkNotFinished();
size_t read_rows = readRows(block, num_rows);
offset_after_current_mark += num_rows;
if (offset_after_current_mark == index_granularity || skip_remaining_rows_in_current_granule)
{
/// Start new granule; skipped_rows_after_offset is already zero.
++current_mark;
offset_after_current_mark = 0;
}
return read_rows;
}
else
{
/// Nothing to read.
if (skip_remaining_rows_in_current_granule)
{
/// Skip the rest of the rows in granule and start new one.
checkNotFinished();
++current_mark;
offset_after_current_mark = 0;
}
return 0;
}
}
void MergeTreePrewhereRangeReader::Stream::skip(size_t num_rows)
{
if (num_rows)
{
checkNotFinished();
checkEnoughSpaceInCurrentGranula(num_rows);
offset_after_current_mark += num_rows;
if (offset_after_current_mark == index_granularity)
{
/// Start new granule; skipped_rows_after_offset is already zero.
++current_mark;
offset_after_current_mark = 0;
}
}
}
size_t MergeTreePrewhereRangeReader::Stream::finalize(Block & block)
{
size_t read_rows = stream.finalize(block);
if (stream.isFinished())
finish();
return read_rows; return read_rows;
} }
MergeTreeRangeReader MergeTreeRangeReader::copyForReader(MergeTreeReader & reader)
void MergeTreePrewhereRangeReader::ReadResult::addGranule(size_t num_rows)
{ {
MergeTreeRangeReader copy(reader, current_mark, last_mark, index_granularity); rows_per_granule.push_back(num_rows);
copy.continue_reading = continue_reading; num_read_rows += num_rows;
copy.read_rows_after_current_mark = read_rows_after_current_mark; }
return copy;
void MergeTreePrewhereRangeReader::ReadResult::adjustLastGranule(size_t num_rows_to_subtract)
{
if (rows_per_granule.empty())
throw Exception("Can't adjust last granule because no granules were added.", ErrorCodes::LOGICAL_ERROR);
if (num_rows_to_subtract > rows_per_granule.back())
throw Exception("Can't adjust last granule because it has " + toString(rows_per_granule.back())
+ "rows, but try to subtract " + toString(num_rows_to_subtract) + " rows.",
ErrorCodes::LOGICAL_ERROR);
rows_per_granule.back() -= num_rows_to_subtract;
num_read_rows -= num_rows_to_subtract;
}
void MergeTreePrewhereRangeReader::ReadResult::clear()
{
/// Need to save information about the number of granules.
rows_per_granule.assign(rows_per_granule.size(), 0);
num_filtered_rows += num_read_rows - num_zeros_in_filter;
num_read_rows = 0;
num_added_rows = 0;
num_zeros_in_filter = 0;
filter = nullptr;
}
void MergeTreePrewhereRangeReader::ReadResult::optimize()
{
if (num_read_rows == 0 || !filter)
return;
ConstantFilterDescription constant_filter_description(*filter);
if (constant_filter_description.always_false)
clear();
else if (constant_filter_description.always_true)
filter = nullptr;
else
{
ColumnPtr prev_filter = std::move(filter);
FilterDescription prev_description(*prev_filter);
MutableColumnPtr new_filter_ptr = ColumnUInt8::create(prev_description.data->size());
auto & new_filter = static_cast<ColumnUInt8 &>(*new_filter_ptr);
IColumn::Filter & new_data = new_filter.getData();
collapseZeroTails(*prev_description.data, new_data);
size_t num_removed_zeroes = new_filter.size() - num_read_rows;
num_read_rows = new_filter.size();
num_zeros_in_filter -= num_removed_zeroes;
filter = std::move(new_filter_ptr);
}
}
void MergeTreePrewhereRangeReader::ReadResult::collapseZeroTails(const IColumn::Filter & filter,
IColumn::Filter & new_filter)
{
auto filter_data = filter.data();
auto new_filter_data = new_filter.data();
size_t rows_in_filter_from_prev_iteration = filter.size() - num_read_rows;
if (rows_in_filter_from_prev_iteration)
{
memcpySmallAllowReadWriteOverflow15(new_filter_data, filter_data, rows_in_filter_from_prev_iteration);
filter_data += rows_in_filter_from_prev_iteration;
new_filter_data += rows_in_filter_from_prev_iteration;
}
for (auto & rows_to_read : rows_per_granule)
{
/// Count the number of zeros at the end of filter for rows were read from current granule.
size_t filtered_rows_num_at_granule_end = numZerosInTail(filter_data, filter_data + rows_to_read);
rows_to_read -= filtered_rows_num_at_granule_end;
memcpySmallAllowReadWriteOverflow15(new_filter_data, filter_data, rows_to_read);
filter_data += rows_to_read;
new_filter_data += rows_to_read;
filter_data += filtered_rows_num_at_granule_end;
}
new_filter.resize(new_filter_data - new_filter.data());
}
size_t MergeTreePrewhereRangeReader::ReadResult::numZerosInTail(const UInt8 * begin, const UInt8 * end)
{
size_t count = 0;
#if __SSE2__ && __POPCNT__
const __m128i zero16 = _mm_setzero_si128();
while (end - begin >= 64)
{
end -= 64;
auto pos = end;
UInt64 val =
static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos)),
zero16)))
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 16)),
zero16))) << 16)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 32)),
zero16))) << 32)
| (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpgt_epi8(
_mm_loadu_si128(reinterpret_cast<const __m128i *>(pos + 48)),
zero16))) << 48);
if (val == 0)
count += 64;
else
{
count += __builtin_clzll(val);
return count;
}
}
#endif
while (end > begin && *(--end) == 0)
{
++count;
}
return count;
}
size_t MergeTreePrewhereRangeReader::ReadResult::numZerosInFilter() const
{
if (!filter)
return 0;
{
ConstantFilterDescription constant_filter_description(*filter);
if (constant_filter_description.always_false)
return filter->size();
if (constant_filter_description.always_true)
return 0;
}
FilterDescription description(*filter);
auto data = description.data;
auto size = description.data->size();
return size - countBytesInFilter(*data);
}
void MergeTreePrewhereRangeReader::ReadResult::setFilter(ColumnPtr filter_)
{
if (!filter_ && filter)
throw Exception("Can't remove exising filter with empty.", ErrorCodes::LOGICAL_ERROR);
if (!filter_)
return;
if (filter_->size() < num_read_rows)
throw Exception("Can't set filter because it's size is " + toString(filter_->size()) + " but "
+ toString(num_read_rows) + " rows was read.", ErrorCodes::LOGICAL_ERROR);
if (filter && filter_->size() != filter->size())
throw Exception("Can't set filter because it's size is " + toString(filter_->size()) + " but previous filter"
+ " has size " + toString(filter->size()) + ".", ErrorCodes::LOGICAL_ERROR);
filter = std::move(filter_);
size_t num_zeros = numZerosInFilter();
if (num_zeros < num_zeros_in_filter)
throw Exception("New filter has less zeros than previous.", ErrorCodes::LOGICAL_ERROR);
size_t added_zeros = num_zeros - num_zeros_in_filter;
num_added_rows -= added_zeros;
num_filtered_rows += added_zeros;
num_zeros_in_filter = num_zeros;
}
MergeTreePrewhereRangeReader::MergeTreePrewhereRangeReader(
MergeTreePrewhereRangeReader * prev_reader, MergeTreeReader * merge_tree_reader,
size_t from_mark, size_t to_mark, size_t index_granularity,
ExpressionActionsPtr prewhere_actions, const String * prewhere_column_name,
const Names * ordered_names, bool always_reorder)
: stream(from_mark, to_mark, index_granularity, merge_tree_reader)
, prev_reader(prev_reader), prewhere_actions(std::move(prewhere_actions))
, prewhere_column_name(prewhere_column_name), ordered_names(ordered_names), always_reorder(always_reorder)
{
}
MergeTreePrewhereRangeReader::ReadResult MergeTreePrewhereRangeReader::read(
Block & res, size_t max_rows)
{
if (max_rows == 0)
throw Exception("Expected at least 1 row to read, got 0.", ErrorCodes::LOGICAL_ERROR);
if (max_rows > numPendingRows())
throw Exception("Want to read " + toString(max_rows) + " rows, but has only "
+ toString(numPendingRows()) + " pending rows.", ErrorCodes::LOGICAL_ERROR);
ReadResult read_result;
if (prev_reader)
read_result = prev_reader->read(res, max_rows);
readRows(res, max_rows, read_result);
if (!res)
return read_result;
executePrewhereActionsAndFilterColumns(res, read_result);
return read_result;
}
void MergeTreePrewhereRangeReader::readRows(Block & block, size_t max_rows, ReadResult & result)
{
if (prev_reader && result.numReadRows() == 0)
{
/// If zero rows were read on prev step, than there is no more rows to read.
/// Last granule may have less rows than index_granularity, so finish reading manually.
stream.finish();
return;
}
size_t rows_to_skip_in_last_granule = 0;
if (!result.rowsPerGranule().empty())
{
size_t rows_in_last_granule = result.rowsPerGranule().back();
result.optimize();
rows_to_skip_in_last_granule = rows_in_last_granule - result.rowsPerGranule().back();
if (auto & filter = result.getFilter())
{
if (ConstantFilterDescription(*filter).always_false)
throw Exception("Shouldn't read rows with constant zero prewhere result.", ErrorCodes::LOGICAL_ERROR);
}
}
if (result.rowsPerGranule().empty())
{
/// Stream is lazy. result.num_added_rows is the number of rows added to block which is not equal to
/// result.num_rows_read until call to stream.finalize(). Also result.num_added_rows may be less than
/// result.num_rows_read if the last granule in range also the last in part (so we have to adjust last granule).
{
size_t space_left = max_rows;
while (space_left && !stream.isFinished())
{
auto rows_to_read = std::min(space_left, stream.numPendingRowsInCurrentGranule());
bool last = rows_to_read == space_left;
result.addRows(stream.read(block, rows_to_read, !last));
result.addGranule(rows_to_read);
space_left -= rows_to_read;
}
}
stream.skip(rows_to_skip_in_last_granule);
result.addRows(stream.finalize(block));
auto last_granule = result.rowsPerGranule().back();
auto added_rows =result.getNumAddedRows();
if (max_rows - last_granule > added_rows)
throw Exception("RangeReader expected reading of at least " + toString(max_rows - last_granule) +
" rows, but only " + toString(added_rows) + " was read.", ErrorCodes::LOGICAL_ERROR);
/// Last granule may be incomplete.
size_t adjustment = max_rows - added_rows;
result.adjustLastGranule(adjustment);
}
else
{
size_t added_rows = 0;
auto & rows_per_granule = result.rowsPerGranule();
auto size = rows_per_granule.size();
for (auto i : ext::range(0, size))
{
bool last = i + 1 == size;
added_rows += stream.read(block, rows_per_granule[i], !last);
}
stream.skip(rows_to_skip_in_last_granule);
added_rows += stream.finalize(block);
/// added_rows may be zero if all columns were read in prewhere and it's ok.
if (added_rows && added_rows != result.numReadRows())
throw Exception("RangeReader read " + toString(added_rows) + " rows, but "
+ toString(result.numReadRows()) + " expected.", ErrorCodes::LOGICAL_ERROR);
}
}
void MergeTreePrewhereRangeReader::executePrewhereActionsAndFilterColumns(Block & block, ReadResult & result)
{
const auto & columns = stream.reader()->getColumns();
auto filterColumns = [&block, &columns](const IColumn::Filter & filter)
{
for (const auto & column : columns)
{
if (block.has(column.name))
{
auto & column_with_type_and_name = block.getByName(column.name);
column_with_type_and_name.column = std::move(column_with_type_and_name.column)->filter(filter, -1);
}
}
};
auto filterBlock = [&block](const IColumn::Filter & filter)
{
for (const auto i : ext::range(0, block.columns()))
{
auto & col = block.safeGetByPosition(i);
if (col.column && col.column->size() == filter.size())
col.column = std::move(col.column)->filter(filter, -1);
}
};
if (auto & filter = result.getFilter())
{
ConstantFilterDescription constant_filter_description(*filter);
if (constant_filter_description.always_false)
throw Exception("RangeReader mustn't execute prewhere actions with const zero prewhere result.",
ErrorCodes::LOGICAL_ERROR);
if (!constant_filter_description.always_true)
{
FilterDescription filter_and_holder(*filter);
filterColumns(*filter_and_holder.data);
}
}
if (!columns.empty())
{
if (columns.size() == block.columns())
{
stream.reader()->fillMissingColumns(block, *ordered_names, always_reorder);
if (prewhere_actions)
prewhere_actions->execute(block);
}
else
{
/// Columns in block may have different size here. Create temporary block which has only read columns.
Block tmp_block;
for (const auto & column : columns)
{
if (block.has(column.name))
{
auto & column_with_type_and_name = block.getByName(column.name);
tmp_block.insert(column_with_type_and_name);
column_with_type_and_name.column = nullptr;
}
}
if (tmp_block)
stream.reader()->fillMissingColumns(tmp_block, *ordered_names, always_reorder);
if (prewhere_actions)
prewhere_actions->execute(tmp_block);
for (auto col_num : ext::range(0, block.columns()))
{
auto & column = block.getByPosition(col_num);
if (!tmp_block.has(column.name))
tmp_block.insert(std::move(column));
}
std::swap(block, tmp_block);
}
}
ColumnPtr filter;
if (prewhere_actions)
{
auto & prewhere_column = block.getByName(*prewhere_column_name);
ConstantFilterDescription constant_filter_description(*prewhere_column.column);
if (constant_filter_description.always_false)
{
result.clear();
block.clear();
return;
}
else if (!constant_filter_description.always_true)
{
filter = std::move(prewhere_column.column);
FilterDescription filter_and_holder(*filter);
filterBlock(*filter_and_holder.data);
}
prewhere_column.column = prewhere_column.type->createColumnConst(block.rows(), UInt64(1));
}
if (filter && result.getFilter())
{
/// TODO: implement for prewhere chain.
/// In order to do it we need combine filter and result.filter, where filter filters only '1' in result.filter.
throw Exception("MergeTreePrewhereRangeReader chain with several prewhere actions in not implemented.",
ErrorCodes::LOGICAL_ERROR);
}
if (filter)
result.setFilter(filter);
} }
} }

View File

@ -10,43 +10,152 @@ class MergeTreeReader;
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part. /// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
/// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark. /// Stores reading state, which can be inside granule. Can skip rows in current granule and start reading from next mark.
/// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks. /// Used generally for reading number of rows less than index granularity to decrease cache misses for fat blocks.
class MergeTreeRangeReader class MergeTreePrewhereRangeReader
{ {
public: public:
size_t numPendingRows() const { return (last_mark - current_mark) * index_granularity - read_rows_after_current_mark; } MergeTreePrewhereRangeReader(MergeTreePrewhereRangeReader * prev_reader, MergeTreeReader * merge_tree_reader,
size_t numPendingRowsInCurrentGranule() const { return index_granularity - read_rows_after_current_mark; } size_t from_mark, size_t to_mark, size_t index_granularity,
ExpressionActionsPtr prewhere_actions, const String * prewhere_column_name,
const Names * ordered_names, bool always_reorder);
size_t numReadRowsInCurrentGranule() const { return read_rows_after_current_mark; } MergeTreePrewhereRangeReader() : is_initialized(false) { }
/// Seek to next mark before next reading. bool isReadingFinished() const { return prev_reader ? prev_reader->isReadingFinished() : stream.isFinished(); }
size_t skipToNextMark();
/// Seturn state will be afrer reading rows_to_read, no reading happens.
MergeTreeRangeReader getFutureState(size_t rows_to_read) const;
/// If columns are not present in the block, adds them. If they are present - appends the values that have been read. size_t numReadRowsInCurrentGranule() const { return prev_reader ? prev_reader->numReadRowsInCurrentGranule() : stream.numReadRowsInCurrentGranule(); }
/// Do not add columns, if the files are not present for them. size_t numPendingRowsInCurrentGranule() const { return prev_reader ? prev_reader->numPendingRowsInCurrentGranule() : stream.numPendingRowsInCurrentGranule(); }
/// Block should contain either no columns from the columns field, or all columns for which files are present. size_t numPendingRows() const { return prev_reader ? prev_reader->numPendingRows() : stream.numPendingRows(); }
/// Returns the number of rows was read.
size_t read(Block & res, size_t max_rows_to_read);
bool isReadingFinished() const { return is_reading_finished; } operator bool() const { return is_initialized; }
void reset() { is_initialized = false; }
void disableNextSeek() { continue_reading = true; } class DelayedStream
/// Return the same state for other MergeTreeReader. {
MergeTreeRangeReader copyForReader(MergeTreeReader & reader); public:
DelayedStream() {}
DelayedStream(size_t from_mark, size_t index_granularity, MergeTreeReader * merge_tree_reader);
/// Returns the number of rows added to block.
/// NOTE: have to return number of rows because block has broken invariant:
/// some columns may have different size (for example, default columns may be zero size).
size_t read(Block & block, size_t from_mark, size_t offset, size_t num_rows);
size_t finalize(Block & block);
bool isFinished() const { return is_finished; }
MergeTreeReader * reader() const { return merge_tree_reader; }
private:
size_t current_mark;
size_t current_offset;
size_t num_delayed_rows;
size_t index_granularity;
MergeTreeReader * merge_tree_reader;
bool continue_reading;
bool is_finished;
size_t position() const;
size_t readRows(Block & block, size_t num_rows);
};
class Stream
{
public:
Stream() {}
Stream(size_t from_mark, size_t to_mark, size_t index_granularity, MergeTreeReader * merge_tree_reader);
/// Returns the n
size_t read(Block & block, size_t num_rows, bool skip_remaining_rows_in_current_granule);
size_t finalize(Block & block);
void skip(size_t num_rows);
void finish() { current_mark = last_mark; }
bool isFinished() const { return current_mark >= last_mark; }
size_t numReadRowsInCurrentGranule() const { return offset_after_current_mark; }
size_t numPendingRowsInCurrentGranule() const { return index_granularity - numReadRowsInCurrentGranule(); }
size_t numRendingGranules() const { return last_mark - current_mark; }
size_t numPendingRows() const { return numRendingGranules() * index_granularity - offset_after_current_mark; }
MergeTreeReader * reader() const { return stream.reader(); }
private:
size_t current_mark;
/// Invariant: offset_after_current_mark + skipped_rows_after_offset < index_granularity
size_t offset_after_current_mark;
size_t index_granularity;
size_t last_mark;
DelayedStream stream;
void checkNotFinished() const;
void checkEnoughSpaceInCurrentGranula(size_t num_rows) const;
size_t readRows(Block & block, size_t num_rows);
};
/// Statistics after next reading step.
class ReadResult
{
public:
const std::vector<size_t> & rowsPerGranule() const { return rows_per_granule; }
/// The number of rows were read at LAST iteration in chain. <= num_added_rows + num_filtered_rows.
size_t numReadRows() const { return num_read_rows; }
/// The number of rows were added to block as a result of reading chain.
size_t getNumAddedRows() const { return num_added_rows; }
/// The number of filtered rows at all steps in reading chain.
size_t getNumFilteredRows() const { return num_filtered_rows; }
/// Filter you need to allply to newly-read columns in order to add them to block.
const ColumnPtr & getFilter() const { return filter; }
void addGranule(size_t num_rows);
void adjustLastGranule(size_t num_rows_to_subtract);
void addRows(size_t rows) { num_added_rows += rows; }
/// Set filter or replace old one. Filter must have more zeroes than previous.
void setFilter(ColumnPtr filter_);
/// For each granule calculate the number of filtered rows at the end. Remove them and update filter.
void optimize();
/// Remove all rows from granules.
void clear();
private:
/// The number of rows read from each granule.
std::vector<size_t> rows_per_granule;
/// Sum(rows_per_granule)
size_t num_read_rows = 0;
/// The number of rows was added to block while reading columns. May be zero if no read columns present in part.
size_t num_added_rows = 0;
/// num_zeros_in_filter + the number of rows removed after optimizes.
size_t num_filtered_rows = 0;
/// Zero if filter is nullptr.
size_t num_zeros_in_filter = 0;
/// nullptr if prev reader hasn't prewhere_actions. Otherwise filter.size() >= total_rows_read.
ColumnPtr filter;
void collapseZeroTails(const IColumn::Filter & filter, IColumn::Filter & new_filter);
size_t numZerosInFilter() const;
static size_t numZerosInTail(const UInt8 * begin, const UInt8 * end);
};
ReadResult read(Block & res, size_t max_rows);
private: private:
MergeTreeRangeReader(MergeTreeReader & merge_tree_reader, size_t from_mark, size_t to_mark, size_t index_granularity);
std::reference_wrapper<MergeTreeReader> merge_tree_reader; void readRows(Block & block, size_t max_rows, ReadResult & result);
size_t current_mark; void executePrewhereActionsAndFilterColumns(Block & block, ReadResult & result);
size_t last_mark;
size_t read_rows_after_current_mark = 0;
size_t index_granularity;
bool continue_reading = false;
bool is_reading_finished = false;
friend class MergeTreeReader; Stream stream;
MergeTreePrewhereRangeReader * prev_reader; /// If not nullptr, read from prev_reader firstly.
ExpressionActionsPtr prewhere_actions; /// If not nullptr, calculate filter.
const String * prewhere_column_name;
const Names * ordered_names;
bool always_reorder;
bool is_initialized = true;
}; };
} }

View File

@ -66,9 +66,14 @@ const MergeTreeReader::ValueSizeMap & MergeTreeReader::getAvgValueSizeHints() co
} }
MergeTreeRangeReader MergeTreeReader::readRange(size_t from_mark, size_t to_mark) MergeTreePrewhereRangeReader MergeTreeReader::readRange(
size_t from_mark, size_t to_mark, MergeTreePrewhereRangeReader * prev_reader,
ExpressionActionsPtr prewhere_actions, const String * prewhere_column_name,
const Names * ordered_names, bool always_reorder)
{ {
return MergeTreeRangeReader(*this, from_mark, to_mark, storage.index_granularity); return MergeTreePrewhereRangeReader(
prev_reader, this, from_mark, to_mark, storage.index_granularity,
prewhere_actions, prewhere_column_name, ordered_names, always_reorder);
} }

View File

@ -39,13 +39,17 @@ public:
const ValueSizeMap & getAvgValueSizeHints() const; const ValueSizeMap & getAvgValueSizeHints() const;
/// Create MergeTreeRangeReader iterator, which allows reading arbitrary number of rows from range. /// Create MergeTreeRangeReader iterator, which allows reading arbitrary number of rows from range.
MergeTreeRangeReader readRange(size_t from_mark, size_t to_mark); MergeTreePrewhereRangeReader readRange(size_t from_mark, size_t to_mark, MergeTreePrewhereRangeReader * prev_reader,
ExpressionActionsPtr prewhere_actions, const String * prewhere_column_name,
const Names * ordered_names, bool always_reorder);
/// Add columns from ordered_names that are not present in the block. /// Add columns from ordered_names that are not present in the block.
/// Missing columns are added in the order specified by ordered_names. /// Missing columns are added in the order specified by ordered_names.
/// If at least one column was added, reorders all columns in the block according to ordered_names. /// If at least one column was added, reorders all columns in the block according to ordered_names.
void fillMissingColumns(Block & res, const Names & ordered_names, const bool always_reorder = false); void fillMissingColumns(Block & res, const Names & ordered_names, const bool always_reorder = false);
const NamesAndTypesList & getColumns() const { return columns; }
private: private:
class Stream class Stream
{ {
@ -117,7 +121,7 @@ private:
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res); size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res);
friend class MergeTreeRangeReader; friend class MergeTreePrewhereRangeReader::DelayedStream;
}; };
} }