partial part reading with prewhere

This commit is contained in:
Nikolai Kochetov 2017-06-15 20:01:13 +03:00 committed by alexey-milovidov
parent c6d5ef6d30
commit 609711b20c
6 changed files with 159 additions and 64 deletions

View File

@ -55,7 +55,7 @@ Block MergeTreeBaseBlockInputStream::readImpl()
if (res)
injectVirtualColumns(res);
if (task->mark_ranges.empty())
if (task->isFinished())
task.reset();
}
@ -75,32 +75,39 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
do
{
/// Let's read the full block of columns needed to calculate the expression in PREWHERE.
size_t space_left = std::max(1LU, max_block_size_marks);
size_t space_left = std::max(1LU, max_block_size_rows);
MarkRanges ranges_to_read;
if (task->size_predictor)
{
/// FIXME: size prediction model is updated by filtered rows, but it predicts size of unfiltered rows also
size_t recommended_marks = task->size_predictor->estimateNumMarks(preferred_block_size_bytes, storage.index_granularity);
if (res && recommended_marks < 1)
size_t recommended_rows = task->size_predictor->estimateNumRows(preferred_block_size_bytes);
if (res && recommended_rows < 1)
break;
space_left = std::min(space_left, std::max(1LU, recommended_marks));
space_left = std::min(space_left, std::max(1LU, recommended_rows));
}
while (!task->mark_ranges.empty() && space_left && !isCancelled())
std::experimental::optional<MergeTreeRangeReader> range_reader;
if (task->current_range_reader)
range_reader = task->current_range_reader->copyForReader(*pre_reader);
while ((range_reader || !task->mark_ranges.empty()) && space_left && !isCancelled())
{
auto & range = task->mark_ranges.back();
size_t marks_to_read = std::min(range.end - range.begin, space_left);
pre_reader->readRange(range.begin, range.begin + marks_to_read, res);
ranges_to_read.emplace_back(range.begin, range.begin + marks_to_read);
space_left -= marks_to_read;
range.begin += marks_to_read;
if (range.begin == range.end)
if (!range_reader)
{
auto & range = task->mark_ranges.back();
task->current_range_reader = pre_reader->readRange(range.begin, range.end);
ranges_to_read.push_back(range);
task->mark_ranges.pop_back();
}
size_t rows_to_read = std::min(range_reader->unreadRows(), space_left);
if (!range_reader->read(res, rows_to_read))
range_reader = std::experimental::nullopt;
space_left -= rows_to_read;
}
/// In case of isCancelled.
@ -139,15 +146,35 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
return res;
}
for (const auto & range : ranges_to_read)
reader->readRange(range.begin, range.end, res);
auto rows_to_read = column_const->size();
if (task->current_range_reader && rows_to_read < task->current_range_reader->unreadRows())
task->current_range_reader->read(res, rows_to_read);
else
{
if (task->current_range_reader)
{
rows_to_read -= task->current_range_reader->unreadRows();
task->current_range_reader->read(res, task->current_range_reader->unreadRows());
task->current_range_reader = std::experimental::nullopt;
}
for (const auto & range : ranges_to_read)
{
if ((range.end - range.begin) * storage.index_granularity <= rows_to_read)
reader->readRange(range.begin, range.end, res);
else
{
task->current_range_reader = reader->readRange(range.begin, range.end);
task->current_range_reader->read(res, rows_to_read);
}
}
}
progressImpl({ 0, res.bytes() - pre_bytes });
}
else if (const auto column_vec = typeid_cast<const ColumnUInt8 *>(observed_column.get()))
{
size_t index_granularity = storage.index_granularity;
throw Exception("column_vec");
const auto & pre_filter = column_vec->getData();
IColumn::Filter post_filter(pre_filter.size());
@ -155,40 +182,62 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
size_t pre_filter_pos = 0;
size_t post_filter_pos = 0;
for (const auto & range : ranges_to_read)
size_t next_range_idx = 0;
while (pre_filter_pos < pre_filter.size())
{
auto begin = range.begin;
if (!task->current_range_reader)
{
const auto & range = ranges_to_read[next_range_idx++];
task->current_range_reader = reader->readRange(range.begin, range.end);
}
MergeTreeRangeReader & range_reader = task->current_range_reader.value();
auto pre_filter_begin_pos = pre_filter_pos;
for (auto mark = range.begin; mark <= range.end; ++mark)
while (range_reader.unreadRows() > 0 && pre_filter_pos < pre_filter.size())
{
auto rows_should_be_copied = pre_filter_pos - pre_filter_begin_pos;
auto unread_rows_in_current_part = range_reader.skipRows(rows_should_be_copied).unreadRowsInCurrentPart();
const size_t limit = std::min(pre_filter.size(), pre_filter_pos + unread_rows_in_current_part);
UInt8 nonzero = 0;
for (size_t row = pre_filter_pos; row < limit; ++row)
nonzero |= pre_filter[row];
if (mark != range.end)
bool will_read_until_mark = unread_rows_in_current_part == limit - pre_filter_pos;
/// can't skip empty rows if won't read until mark
if (!nonzero && will_read_until_mark)
{
const size_t limit = std::min(pre_filter.size(), pre_filter_pos + index_granularity);
for (size_t row = pre_filter_pos; row < limit; ++row)
nonzero |= pre_filter[row];
}
if (!nonzero)
{
if (mark > begin)
if (pre_filter_pos != pre_filter_begin_pos)
{
memcpy(
&post_filter[post_filter_pos],
&pre_filter[pre_filter_begin_pos],
pre_filter_pos - pre_filter_begin_pos);
post_filter_pos += pre_filter_pos - pre_filter_begin_pos;
reader->readRange(begin, mark, res);
auto rows = pre_filter_pos - pre_filter_begin_pos;
memcpy(&post_filter[post_filter_pos], &pre_filter[pre_filter_begin_pos], rows);
post_filter_pos += rows;
size_t cur_rows = res.rows();
range_reader.read(res, rows);
if (cur_rows + rows != res.rows())
throw Exception("read " + std::to_string(res.rows() - cur_rows) + " expected " + std::to_string(rows) + " was rows " + std::to_string(cur_rows));
}
begin = mark + 1;
pre_filter_begin_pos = std::min(pre_filter_pos + index_granularity, pre_filter.size());
}
if (mark < range.end)
pre_filter_pos = std::min(pre_filter_pos + index_granularity, pre_filter.size());
pre_filter_begin_pos = pre_filter_pos = limit;
range_reader.skipToNextMark();
}
else
pre_filter_pos = limit;
}
if (pre_filter_pos != pre_filter_begin_pos)
{
auto rows = pre_filter_pos - pre_filter_begin_pos;
memcpy(&post_filter[post_filter_pos], &pre_filter[pre_filter_begin_pos], rows);
post_filter_pos += rows;
size_t cur_rows = res.rows();
range_reader.read(res, rows);
if (cur_rows + rows != res.rows())
throw Exception("read " + std::to_string(res.rows() - cur_rows) + " expected " + std::to_string(rows) + " was rows " + std::to_string(cur_rows));
}
if (range_reader.unreadRows() == 0)
task->current_range_reader = std::experimental::nullopt;
}
if (!post_filter_pos)
@ -232,12 +281,12 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
reader->fillMissingColumnsAndReorder(res, task->ordered_names);
}
}
while (!task->mark_ranges.empty() && !res && !isCancelled());
while (!task->isFinished() && !res && !isCancelled());
}
else
{
size_t space_left = std::max(1LU, max_block_size_rows);
while (!task->mark_ranges.empty() && space_left && !isCancelled())
while (!task->isFinished() && space_left && !isCancelled())
{
if (!task->current_range_reader)
{
@ -246,13 +295,14 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
task->mark_ranges.pop_back();
}
size_t rows_to_read = max_block_size_rows;
size_t rows_to_read = space_left;
// size_t marks_to_read = std::min(range.end - range.begin, space_left);
if (task->size_predictor)
{
size_t recommended_rows = task->size_predictor->estimateNumRows(preferred_block_size_bytes);
if (res && rows_to_read < 1)
/// TODO: stop reading if recommended_rows small enough
if (res && recommended_rows < 1)
break;
rows_to_read = std::min(rows_to_read, std::max(1LU, recommended_rows));
}

View File

@ -49,6 +49,8 @@ struct MergeTreeReadTask
/// used to save current range processing status
std::experimental::optional<MergeTreeRangeReader> current_range_reader;
bool isFinished() const { return mark_ranges.empty() && !current_range_reader; }
MergeTreeReadTask(
const MergeTreeData::DataPartPtr & data_part, const MarkRanges & mark_ranges, const std::size_t part_index_in_query,
const Names & ordered_names, const NameSet & column_name_set, const NamesAndTypesList & columns,

View File

@ -5,20 +5,38 @@ namespace DB
MergeTreeRangeReader::MergeTreeRangeReader(
MergeTreeReader & merge_tree_reader, size_t from_mark, size_t to_mark, size_t index_granularity)
: merge_tree_reader(merge_tree_reader), current_mark(from_mark), last_mark(to_mark)
, read_rows_after_current_mark(0), index_granularity(index_granularity), is_reading_started(false)
: logger(&Poco::Logger::get("MergeTreeRangeReader"))
, merge_tree_reader(merge_tree_reader), current_mark(from_mark), last_mark(to_mark)
, read_rows_after_current_mark(0), index_granularity(index_granularity), seek_to_from_mark(true)
{
}
void MergeTreeRangeReader::skipToNextMark()
{
seek_to_from_mark = true;
++current_mark;
read_rows_after_current_mark = 0;
}
const MergeTreeRangeReader MergeTreeRangeReader::skipRows(size_t rows) const
{
MergeTreeRangeReader copy = *this;
copy.read_rows_after_current_mark += rows;
size_t read_parts = copy.read_rows_after_current_mark / index_granularity;
copy.current_mark += read_parts;
copy.read_rows_after_current_mark -= index_granularity * read_parts;
return copy;
}
bool MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read)
{
size_t rows_to_read = (last_mark - current_mark) * index_granularity - read_rows_after_current_mark;
size_t rows_to_read = unreadRows();
rows_to_read = std::min(rows_to_read, max_rows_to_read);
if (rows_to_read == 0)
return false;
merge_tree_reader.get().readRange(current_mark, !is_reading_started, rows_to_read, res);
merge_tree_reader.get().readRange(current_mark, seek_to_from_mark, rows_to_read, res);
seek_to_from_mark = false;
read_rows_after_current_mark += rows_to_read;
size_t read_parts = read_rows_after_current_mark / index_granularity;
@ -28,4 +46,12 @@ bool MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read)
return current_mark != last_mark;
}
MergeTreeRangeReader MergeTreeRangeReader::copyForReader(MergeTreeReader & reader)
{
MergeTreeRangeReader copy(reader, current_mark, last_mark, index_granularity);
copy.seek_to_from_mark = seek_to_from_mark;
copy.read_rows_after_current_mark = read_rows_after_current_mark;
return copy;
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <Core/Block.h>
#include <common/logger_useful.h>
namespace DB
{
@ -11,18 +11,35 @@ class MergeTreeReader;
class MergeTreeRangeReader
{
public:
size_t unreadRows() const {
return (last_mark - current_mark) * index_granularity - read_rows_after_current_mark;
}
size_t unreadRowsInCurrentPart() const {
return index_granularity - read_rows_after_current_mark;
}
void skipToNextMark();
const MergeTreeRangeReader skipRows(size_t rows) const;
bool read(Block & res, size_t max_rows_to_read);
// MergeTreeRangeReader & operator=(MergeTreeRangeReader && other) = default;
~MergeTreeRangeReader() {
if (last_mark != current_mark)
LOG_ERROR(logger, "last_mark = " << last_mark << " current_mark = " << current_mark << " read_rows_after_current_mark = " << read_rows_after_current_mark);
}
MergeTreeRangeReader copyForReader(MergeTreeReader & reader);
private:
MergeTreeRangeReader(MergeTreeReader & merge_tree_reader, size_t from_mark, size_t to_mark, size_t index_granularity);
Poco::Logger * logger;
std::reference_wrapper<MergeTreeReader> merge_tree_reader;
size_t current_mark;
size_t last_mark;
size_t read_rows_after_current_mark;
size_t index_granularity;
bool is_reading_started;
bool seek_to_from_mark;
friend class MergeTreeReader;
};

View File

@ -79,7 +79,7 @@ MergeTreeRangeReader MergeTreeReader::readRange(size_t from_mark, size_t to_mark
}
void MergeTreeReader::readRange(size_t from_mark, bool is_first_mark_in_range, size_t max_rows_to_read, Block & res)
void MergeTreeReader::readRange(size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read, Block & res)
{
try
{
@ -142,7 +142,7 @@ void MergeTreeReader::readRange(size_t from_mark, bool is_first_mark_in_range, s
try
{
readData(column.name, *column.type, *column.column, from_mark, is_first_mark_in_range, max_rows_to_read, 0, read_offsets);
readData(column.name, *column.type, *column.column, from_mark, seek_to_from_mark, max_rows_to_read, 0, read_offsets);
}
catch (Exception & e)
{
@ -439,7 +439,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
void MergeTreeReader::readData(
const String & name, const IDataType & type, IColumn & column,
size_t from_mark, bool is_first_mark_in_range, size_t max_rows_to_read,
size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read,
size_t level, bool read_offsets)
{
if (type.isNullable())
@ -454,13 +454,13 @@ void MergeTreeReader::readData(
std::string filename = name + NULL_MAP_EXTENSION;
Stream & stream = *(streams.at(filename));
if (is_first_mark_in_range)
if (seek_to_from_mark)
stream.seekToMark(from_mark);
IColumn & col8 = nullable_col.getNullMapConcreteColumn();
DataTypeUInt8{}.deserializeBinaryBulk(col8, *stream.data_buffer, max_rows_to_read, 0);
/// Then read data.
readData(name, nested_type, nested_col, from_mark, max_rows_to_read, level, read_offsets);
readData(name, nested_type, nested_col, from_mark, seek_to_from_mark, max_rows_to_read, level, read_offsets);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
@ -468,7 +468,7 @@ void MergeTreeReader::readData(
if (read_offsets)
{
Stream & stream = *streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)];
if (is_first_mark_in_range)
if (seek_to_from_mark)
stream.seekToMark(from_mark);
type_arr->deserializeOffsets(
column,
@ -483,7 +483,7 @@ void MergeTreeReader::readData(
name,
*type_arr->getNestedType(),
array.getData(),
from_mark, required_internal_size - array.getData().size(),
from_mark, seek_to_from_mark, required_internal_size - array.getData().size(),
level + 1);
size_t read_internal_size = array.getData().size();
@ -514,7 +514,7 @@ void MergeTreeReader::readData(
return;
double & avg_value_size_hint = avg_value_size_hints[name];
if (is_first_mark_in_range)
if (seek_to_from_mark)
stream.seekToMark(from_mark);
type.deserializeBinaryBulk(column, *stream.data_buffer, max_rows_to_read, avg_value_size_hint);

View File

@ -126,12 +126,12 @@ private:
void readData(
const String & name, const IDataType & type, IColumn & column,
size_t from_mark, bool is_first_mark_in_range, size_t max_rows_to_read,
size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read,
size_t level = 0, bool read_offsets = true);
void fillMissingColumnsImpl(Block & res, const Names & ordered_names, bool always_reorder);
void readRange(size_t from_mark, bool is_first_mark_in_range, size_t max_rows_to_read, Block & res);
void readRange(size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read, Block & res);
friend class MergeTreeRangeReader;
};