diff --git a/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.cpp b/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.cpp index 731624d1997..1f899c6b592 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBaseSelectBlockInputStream.cpp @@ -161,6 +161,14 @@ Chunk MergeTreeBaseSelectBlockInputProcessor::readFromPartImpl() if (read_result.num_rows == 0) read_result.columns.clear(); + auto & sample_block = getPort().getHeader(); + if (read_result.num_rows != 0 && sample_block.columns() != read_result.columns.size()) + throw Exception("Inconsistent number of columns got from MergeTreeRangeReader. " + "Have " + toString(sample_block.columns()) + " in sample block " + "and " + toString(read_result.columns.size()) + " columns in list", ErrorCodes::LOGICAL_ERROR); + + /// TODO: check columns have the same types as in header. + UInt64 num_filtered_rows = read_result.numReadRows() - read_result.num_rows; /// TODO @@ -171,7 +179,7 @@ Chunk MergeTreeBaseSelectBlockInputProcessor::readFromPartImpl() task->size_predictor->updateFilteredRowsRation(read_result.numReadRows(), num_filtered_rows); if (!read_result.columns.empty()) - task->size_predictor->update(read_result.columns); + task->size_predictor->update(sample_block, read_result.columns, read_result.num_rows); } return Chunk(std::move(read_result.columns), read_result.num_rows); diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp index 7dc9a40e89a..920697f3c32 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.cpp @@ -84,22 +84,25 @@ MergeTreeBlockSizePredictor::MergeTreeBlockSizePredictor( { number_of_rows_in_part = data_part->rows_count; /// Initialize with sample block until update won't called. - initialize(sample_block, columns); + initialize(sample_block, {}, columns); } -void MergeTreeBlockSizePredictor::initialize(const Block & sample_block, const Names & columns, bool from_update) +void MergeTreeBlockSizePredictor::initialize(const Block & sample_block, const Columns & columns, const Names & names, bool from_update) { fixed_columns_bytes_per_row = 0; dynamic_columns_infos.clear(); std::unordered_set names_set; if (!from_update) - names_set.insert(columns.begin(), columns.end()); + names_set.insert(names.begin(), names.end()); - for (const auto & column_with_type_and_name : sample_block) + size_t num_columns = sample_block.columns(); + for (size_t pos = 0; pos < num_columns; ++pos) { + const auto & column_with_type_and_name = sample_block.getByPosition(pos); const String & column_name = column_with_type_and_name.name; - const ColumnPtr & column_data = column_with_type_and_name.column; + const ColumnPtr & column_data = from_update ? columns[pos] + : column_with_type_and_name.column; if (!from_update && !names_set.count(column_name)) continue; @@ -151,25 +154,30 @@ void MergeTreeBlockSizePredictor::startBlock() /// TODO: add last_read_row_in_part parameter to take into account gaps between adjacent ranges -void MergeTreeBlockSizePredictor::update(const Block & block, double decay) +void MergeTreeBlockSizePredictor::update(const Block & sample_block, const Columns & columns, size_t num_rows, double decay) { + if (columns.size() != sample_block.columns()) + throw Exception("Inconsistent number of columns passed to MergeTreeBlockSizePredictor. " + "Have " + toString(sample_block.columns()) + " in sample block " + "and " + toString(columns.size()) + " columns in list", ErrorCodes::LOGICAL_ERROR); + if (!is_initialized_in_update) { /// Reinitialize with read block to update estimation for DEFAULT and MATERIALIZED columns without data. - initialize(block, {}, true); + initialize(sample_block, columns, {}, true); is_initialized_in_update = true; } - size_t new_rows = block.rows(); - if (new_rows < block_size_rows) + + if (num_rows < block_size_rows) { - throw Exception("Updated block has less rows (" + toString(new_rows) + ") than previous one (" + toString(block_size_rows) + ")", + throw Exception("Updated block has less rows (" + toString(num_rows) + ") than previous one (" + toString(block_size_rows) + ")", ErrorCodes::LOGICAL_ERROR); } - size_t diff_rows = new_rows - block_size_rows; - block_size_bytes = new_rows * fixed_columns_bytes_per_row; + size_t diff_rows = num_rows - block_size_rows; + block_size_bytes = num_rows * fixed_columns_bytes_per_row; bytes_per_row_current = fixed_columns_bytes_per_row; - block_size_rows = new_rows; + block_size_rows = num_rows; /// Make recursive updates for each read row: v_{i+1} = (1 - decay) v_{i} + decay v_{target} /// Use sum of geometric sequence formula to update multiple rows: v{n} = (1 - decay)^n v_{0} + (1 - (1 - decay)^n) v_{target} @@ -179,7 +187,7 @@ void MergeTreeBlockSizePredictor::update(const Block & block, double decay) max_size_per_row_dynamic = 0; for (auto & info : dynamic_columns_infos) { - size_t new_size = block.getByName(info.name).column->byteSize(); + size_t new_size = columns[sample_block.getPositionByName(info.name)]->byteSize(); size_t diff_size = new_size - info.size_bytes; double local_bytes_per_row = static_cast(diff_size) / diff_rows; diff --git a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h index a031255b3ab..19c6adbd9c7 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h +++ b/dbms/src/Storages/MergeTree/MergeTreeBlockReadUtils.h @@ -85,7 +85,7 @@ struct MergeTreeBlockSizePredictor void startBlock(); /// Updates statistic for more accurate prediction - void update(const Block & block, double decay = DECAY()); + void update(const Block & sample_block, const Columns & columns, size_t num_rows, double decay = DECAY()); /// Return current block size (after update()) inline size_t getBlockSize() const @@ -148,7 +148,7 @@ protected: bool is_initialized_in_update = false; - void initialize(const Block & sample_block, const Names & columns, bool from_update = false); + void initialize(const Block & sample_block, const Columns & columns, const Names & names, bool from_update = false); public: diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp index 99d83789f45..8cac9fcfad8 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include @@ -409,14 +408,27 @@ void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter) MergeTreeRangeReader::MergeTreeRangeReader( MergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_, ExpressionActionsPtr alias_actions_, ExpressionActionsPtr prewhere_actions_, - const String * prewhere_column_name_, const Names * ordered_names_, - bool always_reorder_, bool remove_prewhere_column_, bool last_reader_in_chain_) + const String * prewhere_column_name_, bool remove_prewhere_column_, bool last_reader_in_chain_) : merge_tree_reader(merge_tree_reader_), index_granularity(&(merge_tree_reader->data_part->index_granularity)) , prev_reader(prev_reader_), prewhere_column_name(prewhere_column_name_) - , ordered_names(ordered_names_), alias_actions(std::move(alias_actions_)), prewhere_actions(std::move(prewhere_actions_)) - , always_reorder(always_reorder_), remove_prewhere_column(remove_prewhere_column_) + , alias_actions(std::move(alias_actions_)), prewhere_actions(std::move(prewhere_actions_)) + , remove_prewhere_column(remove_prewhere_column_) , last_reader_in_chain(last_reader_in_chain_), is_initialized(true) { + if (prev_reader) + sample_block = prev_reader->getSampleBlock(); + + for (auto & name_and_type : merge_tree_reader->getColumns()) + sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name}); + + if (alias_actions) + alias_actions->execute(sample_block, true); + + if (prewhere_actions) + prewhere_actions->execute(sample_block, true); + + if (remove_prewhere_column) + sample_block.erase(*prewhere_column_name); } bool MergeTreeRangeReader::isReadingFinished() const diff --git a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h index d3f1333289b..67d5cbc3908 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeRangeReader.h @@ -22,8 +22,7 @@ class MergeTreeRangeReader public: MergeTreeRangeReader(MergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_, ExpressionActionsPtr alias_actions_, ExpressionActionsPtr prewhere_actions_, - const String * prewhere_column_name_, const Names * ordered_names_, - bool always_reorder_, bool remove_prewhere_column_, bool last_reader_in_chain_); + const String * prewhere_column_name_, bool remove_prewhere_column_, bool last_reader_in_chain_); MergeTreeRangeReader() = default; @@ -185,6 +184,8 @@ public: ReadResult read(size_t max_rows, MarkRanges & ranges); + const Block & getSampleBlock() const { return sample_block; } + private: ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges); @@ -197,13 +198,13 @@ private: MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly. const String * prewhere_column_name = nullptr; - const Names * ordered_names = nullptr; ExpressionActionsPtr alias_actions = nullptr; /// If not nullptr, calculate aliases. ExpressionActionsPtr prewhere_actions = nullptr; /// If not nullptr, calculate filter. Stream stream; - bool always_reorder = true; + Block sample_block; + bool remove_prewhere_column = false; bool last_reader_in_chain = false; bool is_initialized = false;