refactoring and more comments in MergeTreeBaseBlockInputStream [#CLICKHOUSE-3065]

This commit is contained in:
Nikolai Kochetov 2017-07-11 12:32:39 +03:00 committed by alexey-milovidov
parent 89386394aa
commit 30ed774bf7
6 changed files with 46 additions and 43 deletions

View File

@ -72,30 +72,30 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (task->size_predictor)
task->size_predictor->startBlock();
const auto max_block_size_rows = this->max_block_size_rows;
const auto preferred_block_size_bytes = this->preferred_block_size_bytes;
const auto preferred_max_column_in_block_size_bytes =
this->preferred_max_column_in_block_size_bytes ? this->preferred_max_column_in_block_size_bytes : max_block_size_rows;
const auto index_granularity = storage.index_granularity;
const auto default_block_size = std::max(1LU, max_block_size_rows);
const double min_filtration_ratio = 0.00001;
auto estimateNumRows = [preferred_block_size_bytes, default_block_size,
auto estimateNumRows = [preferred_block_size_bytes, max_block_size_rows,
index_granularity, preferred_max_column_in_block_size_bytes, min_filtration_ratio](
MergeTreeReadTask & task, MergeTreeRangeReader & reader)
{
if (!task.size_predictor)
return default_block_size;
return max_block_size_rows;
size_t rows_to_read_for_block = task.size_predictor->estimateNumRows(preferred_block_size_bytes);
size_t rows_to_read_for_max_size_column
= task.size_predictor->estimateNumRowsForMaxSizeColumn(preferred_max_column_in_block_size_bytes);
double filtration_ratio = std::max(min_filtration_ratio, 1.0 - task.size_predictor->filtered_rows_ration);
double filtration_ratio = std::max(min_filtration_ratio, 1.0 - task.size_predictor->filtered_rows_ratio);
size_t rows_to_read_for_max_size_column_with_filtration
= static_cast<size_t>( rows_to_read_for_max_size_column / filtration_ratio);
= static_cast<size_t>(rows_to_read_for_max_size_column / filtration_ratio);
return std::min(rows_to_read_for_block, rows_to_read_for_max_size_column_with_filtration);
};
// read rows from reader and clean columns
// read rows from reader and clear columns
auto skipRows = [& preferred_block_size_bytes, & estimateNumRows](
Block & block, MergeTreeRangeReader & reader, MergeTreeReadTask & task, size_t rows)
{
@ -109,7 +109,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
reader.read(block, rows_to_skip);
for (const auto i : ext::range(0, block.columns()))
{
auto & col = block.safeGetByPosition(i);
auto & col = block.getByPosition(i);
if (task.column_name_set.count(col.name))
col.column = col.column->cloneEmpty();
}
@ -122,7 +122,6 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
{
/// Let's read the full block of columns needed to calculate the expression in PREWHERE.
MarkRanges ranges_to_read;
//std::vector<size_t> rows_was_read;
/// Last range may be partl read. The same number of rows we need to read after prewhere
size_t rows_was_read_in_last_range = 0;
std::experimental::optional<MergeTreeRangeReader> pre_range_reader;
@ -201,8 +200,8 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
observed_column = column;
/** If the filter is a constant (for example, it says PREWHERE 1),
* then either return an empty block, or return the block unchanged.
*/
* then either return an empty block, or return the block unchanged.
*/
if (const auto column_const = typeid_cast<const ColumnConstUInt8 *>(observed_column.get()))
{
if (!column_const->getData())

View File

@ -86,16 +86,16 @@ struct MergeTreeBlockSizePredictor
{
double max_size_per_row = std::max<double>(std::max<size_t>(max_size_per_row_fixed, 1), max_size_per_row_dynamic);
return (bytes_quota > block_size_rows * max_size_per_row)
? static_cast<size_t>(bytes_quota / max_size_per_row) - block_size_rows
: 0;
? static_cast<size_t>(bytes_quota / max_size_per_row) - block_size_rows
: 0;
}
/// Predicts what number of rows should be read to exhaust byte quota per block
inline size_t estimateNumRows(size_t bytes_quota) const
{
return (bytes_quota > block_size_bytes)
? static_cast<size_t>((bytes_quota - block_size_bytes) / bytes_per_row_current)
: 0;
? static_cast<size_t>((bytes_quota - block_size_bytes) / bytes_per_row_current)
: 0;
}
/// Predicts what number of marks should be read to exhaust byte quota
@ -108,8 +108,9 @@ struct MergeTreeBlockSizePredictor
{
double alpha = std::pow(1. - decay, rows_was_read);
double current_ration = rows_was_filtered / std::max<double>(1, rows_was_read);
filtered_rows_ration = current_ration < filtered_rows_ration ? current_ration
: alpha * filtered_rows_ration + (1.0 - alpha) * current_ration;
filtered_rows_ratio = current_ration < filtered_rows_ratio
? current_ration
: alpha * filtered_rows_ratio + (1.0 - alpha) * current_ration;
}
/// Aggressiveness of bytes_per_row updates. See update() implementation.
@ -145,7 +146,7 @@ public:
double bytes_per_row_current = 0;
double bytes_per_row_global = 0;
double filtered_rows_ration = 0;
double filtered_rows_ratio = 0;
};
}

View File

@ -7,14 +7,14 @@ MergeTreeRangeReader::MergeTreeRangeReader(
MergeTreeReader & merge_tree_reader, size_t from_mark, size_t to_mark, size_t index_granularity)
: logger(&Poco::Logger::get("MergeTreeRangeReader"))
, merge_tree_reader(merge_tree_reader), current_mark(from_mark), last_mark(to_mark)
, read_rows_after_current_mark(0), index_granularity(index_granularity), seek_to_from_mark(true), is_reading_finished(false)
, read_rows_after_current_mark(0), index_granularity(index_granularity), continue_reading(false), is_reading_finished(false)
{
}
size_t MergeTreeRangeReader::skipToNextMark()
{
auto unread_rows_in_current_part = unreadRowsInCurrentGranule();
seek_to_from_mark = true;
continue_reading = false;
++current_mark;
read_rows_after_current_mark = 0;
return unread_rows_in_current_part;
@ -37,10 +37,10 @@ size_t MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read)
if (rows_to_read == 0)
return false;
auto read_rows = merge_tree_reader.get().readRange(current_mark, seek_to_from_mark, rows_to_read, res);
auto read_rows = merge_tree_reader.get().readRows(current_mark, continue_reading, rows_to_read, res);
if (!read_rows)
read_rows = rows_to_read;
seek_to_from_mark = false;
continue_reading = true;
read_rows_after_current_mark += read_rows;
size_t read_parts = read_rows_after_current_mark / index_granularity;
@ -56,7 +56,7 @@ size_t MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read)
MergeTreeRangeReader MergeTreeRangeReader::copyForReader(MergeTreeReader & reader)
{
MergeTreeRangeReader copy(reader, current_mark, last_mark, index_granularity);
copy.seek_to_from_mark = seek_to_from_mark;
copy.continue_reading = continue_reading;
copy.read_rows_after_current_mark = read_rows_after_current_mark;
return copy;
}

View File

@ -7,7 +7,7 @@ namespace DB
class MergeTreeReader;
// Used in MergeTreeReader to allow sequential reading for any number of rows between pairs of marks in the same part
/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part.
class MergeTreeRangeReader
{
public:
@ -16,17 +16,21 @@ public:
size_t readRowsInCurrentGranule() const { return read_rows_after_current_mark; }
// seek to next mark before next reading
/// Seek to next mark before next reading.
size_t skipToNextMark();
// returns state will be afrer reading rows_to_read, no reading happens
/// Seturn state will be afrer reading rows_to_read, no reading happens.
MergeTreeRangeReader getFutureState(size_t rows_to_read) const;
// returns the number of rows was read
/// If columns are not present in the block, adds them. If they are present - appends the values that have been read.
/// Do not add columns, if the files are not present for them.
/// Block should contain either no columns from the columns field, or all columns for which files are present.
/// Returns the number of rows was read.
size_t read(Block & res, size_t max_rows_to_read);
bool isReadingFinished() const { return is_reading_finished; }
void disableNextSeek() { seek_to_from_mark = false; }
// return the same state for other MergeTreeReader
void disableNextSeek() { continue_reading = true; }
/// Return the same state for other MergeTreeReader.
MergeTreeRangeReader copyForReader(MergeTreeReader & reader);
private:
@ -39,7 +43,7 @@ private:
size_t last_mark;
size_t read_rows_after_current_mark;
size_t index_granularity;
bool seek_to_from_mark;
bool continue_reading;
bool is_reading_finished;
friend class MergeTreeReader;

View File

@ -79,7 +79,7 @@ MergeTreeRangeReader MergeTreeReader::readRange(size_t from_mark, size_t to_mark
}
size_t MergeTreeReader::readRange(size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read, Block & res)
size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res)
{
size_t read_rows = 0;
try
@ -144,7 +144,7 @@ size_t MergeTreeReader::readRange(size_t from_mark, bool seek_to_from_mark, size
try
{
size_t column_size_before_reading = column.column->size();
readData(column.name, *column.type, *column.column, from_mark, seek_to_from_mark, max_rows_to_read, 0, read_offsets);
readData(column.name, *column.type, *column.column, from_mark, continue_reading, max_rows_to_read, 0, read_offsets);
read_rows = column.column->size() - column_size_before_reading;
}
catch (Exception & e)
@ -444,7 +444,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con
void MergeTreeReader::readData(
const String & name, const IDataType & type, IColumn & column,
size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
size_t level, bool read_offsets)
{
if (type.isNullable())
@ -459,13 +459,13 @@ void MergeTreeReader::readData(
std::string filename = name + NULL_MAP_EXTENSION;
Stream & stream = *(streams.at(filename));
if (seek_to_from_mark)
if (!continue_reading)
stream.seekToMark(from_mark);
IColumn & col8 = nullable_col.getNullMapConcreteColumn();
DataTypeUInt8{}.deserializeBinaryBulk(col8, *stream.data_buffer, max_rows_to_read, 0);
/// Then read data.
readData(name, nested_type, nested_col, from_mark, seek_to_from_mark, max_rows_to_read, level, read_offsets);
readData(name, nested_type, nested_col, from_mark, continue_reading, max_rows_to_read, level, read_offsets);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
@ -473,7 +473,7 @@ void MergeTreeReader::readData(
if (read_offsets)
{
Stream & stream = *streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)];
if (seek_to_from_mark)
if (!continue_reading)
stream.seekToMark(from_mark);
type_arr->deserializeOffsets(
column,
@ -488,7 +488,7 @@ void MergeTreeReader::readData(
name,
*type_arr->getNestedType(),
array.getData(),
from_mark, seek_to_from_mark, required_internal_size - array.getData().size(),
from_mark, continue_reading, required_internal_size - array.getData().size(),
level + 1);
size_t read_internal_size = array.getData().size();
@ -519,7 +519,7 @@ void MergeTreeReader::readData(
return;
double & avg_value_size_hint = avg_value_size_hints[name];
if (seek_to_from_mark)
if (!continue_reading)
stream.seekToMark(from_mark);
type.deserializeBinaryBulk(column, *stream.data_buffer, max_rows_to_read, avg_value_size_hint);

View File

@ -38,9 +38,7 @@ public:
const ValueSizeMap & getAvgValueSizeHints() const;
/// If columns are not present in the block, adds them. If they are present - appends the values that have been read.
/// Do not adds columns, if the files are not present for them (to add them, call fillMissingColumns).
/// Block should contain either no columns from the columns field, or all columns for which files are present.
/// Create MergeTreeRangeReader iterator, which allows reading arbitrary number of rows from range.
MergeTreeRangeReader readRange(size_t from_mark, size_t to_mark);
/// Add columns from ordered_names that are not present in the block.
@ -123,13 +121,14 @@ private:
void readData(
const String & name, const IDataType & type, IColumn & column,
size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read,
size_t from_mark, bool continue_reading, size_t max_rows_to_read,
size_t level = 0, bool read_offsets = true);
void fillMissingColumnsImpl(Block & res, const Names & ordered_names, bool always_reorder);
/// return the number of rows has been read or zero if ther is no columns to read
size_t readRange(size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read, Block & res);
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res);
friend class MergeTreeRangeReader;
};