diff --git a/dbms/src/Storages/MergeTree/MergeTreeBaseBlockInputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBaseBlockInputStream.cpp index 622d12d0c26..72865c25076 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBaseBlockInputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBaseBlockInputStream.cpp @@ -72,30 +72,30 @@ Block MergeTreeBaseBlockInputStream::readFromPart() if (task->size_predictor) task->size_predictor->startBlock(); + const auto max_block_size_rows = this->max_block_size_rows; const auto preferred_block_size_bytes = this->preferred_block_size_bytes; const auto preferred_max_column_in_block_size_bytes = this->preferred_max_column_in_block_size_bytes ? this->preferred_max_column_in_block_size_bytes : max_block_size_rows; const auto index_granularity = storage.index_granularity; - const auto default_block_size = std::max(1LU, max_block_size_rows); const double min_filtration_ratio = 0.00001; - auto estimateNumRows = [preferred_block_size_bytes, default_block_size, + auto estimateNumRows = [preferred_block_size_bytes, max_block_size_rows, index_granularity, preferred_max_column_in_block_size_bytes, min_filtration_ratio]( MergeTreeReadTask & task, MergeTreeRangeReader & reader) { if (!task.size_predictor) - return default_block_size; + return max_block_size_rows; size_t rows_to_read_for_block = task.size_predictor->estimateNumRows(preferred_block_size_bytes); size_t rows_to_read_for_max_size_column = task.size_predictor->estimateNumRowsForMaxSizeColumn(preferred_max_column_in_block_size_bytes); - double filtration_ratio = std::max(min_filtration_ratio, 1.0 - task.size_predictor->filtered_rows_ration); + double filtration_ratio = std::max(min_filtration_ratio, 1.0 - task.size_predictor->filtered_rows_ratio); size_t rows_to_read_for_max_size_column_with_filtration - = static_cast( rows_to_read_for_max_size_column / filtration_ratio); + = static_cast(rows_to_read_for_max_size_column / filtration_ratio); return std::min(rows_to_read_for_block, rows_to_read_for_max_size_column_with_filtration); }; - // read rows from reader and clean columns + // read rows from reader and clear columns auto skipRows = [& preferred_block_size_bytes, & estimateNumRows]( Block & block, MergeTreeRangeReader & reader, MergeTreeReadTask & task, size_t rows) { @@ -109,7 +109,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart() reader.read(block, rows_to_skip); for (const auto i : ext::range(0, block.columns())) { - auto & col = block.safeGetByPosition(i); + auto & col = block.getByPosition(i); if (task.column_name_set.count(col.name)) col.column = col.column->cloneEmpty(); } @@ -122,7 +122,6 @@ Block MergeTreeBaseBlockInputStream::readFromPart() { /// Let's read the full block of columns needed to calculate the expression in PREWHERE. MarkRanges ranges_to_read; - //std::vector rows_was_read; /// Last range may be partl read. The same number of rows we need to read after prewhere size_t rows_was_read_in_last_range = 0; std::experimental::optional pre_range_reader; @@ -201,8 +200,8 @@ Block MergeTreeBaseBlockInputStream::readFromPart() observed_column = column; /** If the filter is a constant (for example, it says PREWHERE 1), - * then either return an empty block, or return the block unchanged. - */ + * then either return an empty block, or return the block unchanged. + */ if (const auto column_const = typeid_cast(observed_column.get())) { if (!column_const->getData()) diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h index 86028620370..89d03de8160 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h @@ -86,16 +86,16 @@ struct MergeTreeBlockSizePredictor { double max_size_per_row = std::max(std::max(max_size_per_row_fixed, 1), max_size_per_row_dynamic); return (bytes_quota > block_size_rows * max_size_per_row) - ? static_cast(bytes_quota / max_size_per_row) - block_size_rows - : 0; + ? static_cast(bytes_quota / max_size_per_row) - block_size_rows + : 0; } /// Predicts what number of rows should be read to exhaust byte quota per block inline size_t estimateNumRows(size_t bytes_quota) const { return (bytes_quota > block_size_bytes) - ? static_cast((bytes_quota - block_size_bytes) / bytes_per_row_current) - : 0; + ? static_cast((bytes_quota - block_size_bytes) / bytes_per_row_current) + : 0; } /// Predicts what number of marks should be read to exhaust byte quota @@ -108,8 +108,9 @@ struct MergeTreeBlockSizePredictor { double alpha = std::pow(1. - decay, rows_was_read); double current_ration = rows_was_filtered / std::max(1, rows_was_read); - filtered_rows_ration = current_ration < filtered_rows_ration ? current_ration - : alpha * filtered_rows_ration + (1.0 - alpha) * current_ration; + filtered_rows_ratio = current_ration < filtered_rows_ratio + ? current_ration + : alpha * filtered_rows_ratio + (1.0 - alpha) * current_ration; } /// Aggressiveness of bytes_per_row updates. See update() implementation. @@ -145,7 +146,7 @@ public: double bytes_per_row_current = 0; double bytes_per_row_global = 0; - double filtered_rows_ration = 0; + double filtered_rows_ratio = 0; }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp index e5ffe73b138..fccb815a0c7 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -7,14 +7,14 @@ MergeTreeRangeReader::MergeTreeRangeReader( MergeTreeReader & merge_tree_reader, size_t from_mark, size_t to_mark, size_t index_granularity) : logger(&Poco::Logger::get("MergeTreeRangeReader")) , merge_tree_reader(merge_tree_reader), current_mark(from_mark), last_mark(to_mark) - , read_rows_after_current_mark(0), index_granularity(index_granularity), seek_to_from_mark(true), is_reading_finished(false) + , read_rows_after_current_mark(0), index_granularity(index_granularity), continue_reading(false), is_reading_finished(false) { } size_t MergeTreeRangeReader::skipToNextMark() { auto unread_rows_in_current_part = unreadRowsInCurrentGranule(); - seek_to_from_mark = true; + continue_reading = false; ++current_mark; read_rows_after_current_mark = 0; return unread_rows_in_current_part; @@ -37,10 +37,10 @@ size_t MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read) if (rows_to_read == 0) return false; - auto read_rows = merge_tree_reader.get().readRange(current_mark, seek_to_from_mark, rows_to_read, res); + auto read_rows = merge_tree_reader.get().readRows(current_mark, continue_reading, rows_to_read, res); if (!read_rows) read_rows = rows_to_read; - seek_to_from_mark = false; + continue_reading = true; read_rows_after_current_mark += read_rows; size_t read_parts = read_rows_after_current_mark / index_granularity; @@ -56,7 +56,7 @@ size_t MergeTreeRangeReader::read(Block & res, size_t max_rows_to_read) MergeTreeRangeReader MergeTreeRangeReader::copyForReader(MergeTreeReader & reader) { MergeTreeRangeReader copy(reader, current_mark, last_mark, index_granularity); - copy.seek_to_from_mark = seek_to_from_mark; + copy.continue_reading = continue_reading; copy.read_rows_after_current_mark = read_rows_after_current_mark; return copy; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h index d410bd773d3..d3670803914 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -7,7 +7,7 @@ namespace DB class MergeTreeReader; -// Used in MergeTreeReader to allow sequential reading for any number of rows between pairs of marks in the same part +/// MergeTreeReader iterator which allows sequential reading for arbitrary number of rows between pairs of marks in the same part. class MergeTreeRangeReader { public: @@ -16,17 +16,21 @@ public: size_t readRowsInCurrentGranule() const { return read_rows_after_current_mark; } - // seek to next mark before next reading + /// Seek to next mark before next reading. size_t skipToNextMark(); - // returns state will be afrer reading rows_to_read, no reading happens + /// Seturn state will be afrer reading rows_to_read, no reading happens. MergeTreeRangeReader getFutureState(size_t rows_to_read) const; - // returns the number of rows was read + + /// If columns are not present in the block, adds them. If they are present - appends the values that have been read. + /// Do not add columns, if the files are not present for them. + /// Block should contain either no columns from the columns field, or all columns for which files are present. + /// Returns the number of rows was read. size_t read(Block & res, size_t max_rows_to_read); bool isReadingFinished() const { return is_reading_finished; } - void disableNextSeek() { seek_to_from_mark = false; } - // return the same state for other MergeTreeReader + void disableNextSeek() { continue_reading = true; } + /// Return the same state for other MergeTreeReader. MergeTreeRangeReader copyForReader(MergeTreeReader & reader); private: @@ -39,7 +43,7 @@ private: size_t last_mark; size_t read_rows_after_current_mark; size_t index_granularity; - bool seek_to_from_mark; + bool continue_reading; bool is_reading_finished; friend class MergeTreeReader; diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp index ae8e8fad94f..267ce02dc01 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp @@ -79,7 +79,7 @@ MergeTreeRangeReader MergeTreeReader::readRange(size_t from_mark, size_t to_mark } -size_t MergeTreeReader::readRange(size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read, Block & res) +size_t MergeTreeReader::readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res) { size_t read_rows = 0; try @@ -144,7 +144,7 @@ size_t MergeTreeReader::readRange(size_t from_mark, bool seek_to_from_mark, size try { size_t column_size_before_reading = column.column->size(); - readData(column.name, *column.type, *column.column, from_mark, seek_to_from_mark, max_rows_to_read, 0, read_offsets); + readData(column.name, *column.type, *column.column, from_mark, continue_reading, max_rows_to_read, 0, read_offsets); read_rows = column.column->size() - column_size_before_reading; } catch (Exception & e) @@ -444,7 +444,7 @@ void MergeTreeReader::addStream(const String & name, const IDataType & type, con void MergeTreeReader::readData( const String & name, const IDataType & type, IColumn & column, - size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read, + size_t from_mark, bool continue_reading, size_t max_rows_to_read, size_t level, bool read_offsets) { if (type.isNullable()) @@ -459,13 +459,13 @@ void MergeTreeReader::readData( std::string filename = name + NULL_MAP_EXTENSION; Stream & stream = *(streams.at(filename)); - if (seek_to_from_mark) + if (!continue_reading) stream.seekToMark(from_mark); IColumn & col8 = nullable_col.getNullMapConcreteColumn(); DataTypeUInt8{}.deserializeBinaryBulk(col8, *stream.data_buffer, max_rows_to_read, 0); /// Then read data. - readData(name, nested_type, nested_col, from_mark, seek_to_from_mark, max_rows_to_read, level, read_offsets); + readData(name, nested_type, nested_col, from_mark, continue_reading, max_rows_to_read, level, read_offsets); } else if (const DataTypeArray * type_arr = typeid_cast(&type)) { @@ -473,7 +473,7 @@ void MergeTreeReader::readData( if (read_offsets) { Stream & stream = *streams[DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]; - if (seek_to_from_mark) + if (!continue_reading) stream.seekToMark(from_mark); type_arr->deserializeOffsets( column, @@ -488,7 +488,7 @@ void MergeTreeReader::readData( name, *type_arr->getNestedType(), array.getData(), - from_mark, seek_to_from_mark, required_internal_size - array.getData().size(), + from_mark, continue_reading, required_internal_size - array.getData().size(), level + 1); size_t read_internal_size = array.getData().size(); @@ -519,7 +519,7 @@ void MergeTreeReader::readData( return; double & avg_value_size_hint = avg_value_size_hints[name]; - if (seek_to_from_mark) + if (!continue_reading) stream.seekToMark(from_mark); type.deserializeBinaryBulk(column, *stream.data_buffer, max_rows_to_read, avg_value_size_hint); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.h b/dbms/src/Storages/MergeTree/MergeTreeReader.h index 983c344f77d..945607182a7 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.h @@ -38,9 +38,7 @@ public: const ValueSizeMap & getAvgValueSizeHints() const; - /// If columns are not present in the block, adds them. If they are present - appends the values that have been read. - /// Do not adds columns, if the files are not present for them (to add them, call fillMissingColumns). - /// Block should contain either no columns from the columns field, or all columns for which files are present. + /// Create MergeTreeRangeReader iterator, which allows reading arbitrary number of rows from range. MergeTreeRangeReader readRange(size_t from_mark, size_t to_mark); /// Add columns from ordered_names that are not present in the block. @@ -123,13 +121,14 @@ private: void readData( const String & name, const IDataType & type, IColumn & column, - size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read, + size_t from_mark, bool continue_reading, size_t max_rows_to_read, size_t level = 0, bool read_offsets = true); void fillMissingColumnsImpl(Block & res, const Names & ordered_names, bool always_reorder); - /// return the number of rows has been read or zero if ther is no columns to read - size_t readRange(size_t from_mark, bool seek_to_from_mark, size_t max_rows_to_read, Block & res); + /// Return the number of rows has been read or zero if there is no columns to read. + /// If continue_reading is true, continue reading from last state, otherwise seek to from_mark + size_t readRows(size_t from_mark, bool continue_reading, size_t max_rows_to_read, Block & res); friend class MergeTreeRangeReader; };