Update MergeTreeRangeReader.

This commit is contained in:
Nikolai Kochetov 2019-09-26 20:29:41 +03:00
parent 5108ebeece
commit b65fe57319
5 changed files with 55 additions and 26 deletions

View File

@ -161,6 +161,14 @@ Chunk MergeTreeBaseSelectBlockInputProcessor::readFromPartImpl()
if (read_result.num_rows == 0)
read_result.columns.clear();
auto & sample_block = getPort().getHeader();
if (read_result.num_rows != 0 && sample_block.columns() != read_result.columns.size())
throw Exception("Inconsistent number of columns got from MergeTreeRangeReader. "
"Have " + toString(sample_block.columns()) + " in sample block "
"and " + toString(read_result.columns.size()) + " columns in list", ErrorCodes::LOGICAL_ERROR);
/// TODO: check columns have the same types as in header.
UInt64 num_filtered_rows = read_result.numReadRows() - read_result.num_rows;
/// TODO
@ -171,7 +179,7 @@ Chunk MergeTreeBaseSelectBlockInputProcessor::readFromPartImpl()
task->size_predictor->updateFilteredRowsRation(read_result.numReadRows(), num_filtered_rows);
if (!read_result.columns.empty())
task->size_predictor->update(read_result.columns);
task->size_predictor->update(sample_block, read_result.columns, read_result.num_rows);
}
return Chunk(std::move(read_result.columns), read_result.num_rows);

View File

@ -84,22 +84,25 @@ MergeTreeBlockSizePredictor::MergeTreeBlockSizePredictor(
{
number_of_rows_in_part = data_part->rows_count;
/// Initialize with sample block until update won't called.
initialize(sample_block, columns);
initialize(sample_block, {}, columns);
}
void MergeTreeBlockSizePredictor::initialize(const Block & sample_block, const Names & columns, bool from_update)
void MergeTreeBlockSizePredictor::initialize(const Block & sample_block, const Columns & columns, const Names & names, bool from_update)
{
fixed_columns_bytes_per_row = 0;
dynamic_columns_infos.clear();
std::unordered_set<String> names_set;
if (!from_update)
names_set.insert(columns.begin(), columns.end());
names_set.insert(names.begin(), names.end());
for (const auto & column_with_type_and_name : sample_block)
size_t num_columns = sample_block.columns();
for (size_t pos = 0; pos < num_columns; ++pos)
{
const auto & column_with_type_and_name = sample_block.getByPosition(pos);
const String & column_name = column_with_type_and_name.name;
const ColumnPtr & column_data = column_with_type_and_name.column;
const ColumnPtr & column_data = from_update ? columns[pos]
: column_with_type_and_name.column;
if (!from_update && !names_set.count(column_name))
continue;
@ -151,25 +154,30 @@ void MergeTreeBlockSizePredictor::startBlock()
/// TODO: add last_read_row_in_part parameter to take into account gaps between adjacent ranges
void MergeTreeBlockSizePredictor::update(const Block & block, double decay)
void MergeTreeBlockSizePredictor::update(const Block & sample_block, const Columns & columns, size_t num_rows, double decay)
{
if (columns.size() != sample_block.columns())
throw Exception("Inconsistent number of columns passed to MergeTreeBlockSizePredictor. "
"Have " + toString(sample_block.columns()) + " in sample block "
"and " + toString(columns.size()) + " columns in list", ErrorCodes::LOGICAL_ERROR);
if (!is_initialized_in_update)
{
/// Reinitialize with read block to update estimation for DEFAULT and MATERIALIZED columns without data.
initialize(block, {}, true);
initialize(sample_block, columns, {}, true);
is_initialized_in_update = true;
}
size_t new_rows = block.rows();
if (new_rows < block_size_rows)
if (num_rows < block_size_rows)
{
throw Exception("Updated block has less rows (" + toString(new_rows) + ") than previous one (" + toString(block_size_rows) + ")",
throw Exception("Updated block has less rows (" + toString(num_rows) + ") than previous one (" + toString(block_size_rows) + ")",
ErrorCodes::LOGICAL_ERROR);
}
size_t diff_rows = new_rows - block_size_rows;
block_size_bytes = new_rows * fixed_columns_bytes_per_row;
size_t diff_rows = num_rows - block_size_rows;
block_size_bytes = num_rows * fixed_columns_bytes_per_row;
bytes_per_row_current = fixed_columns_bytes_per_row;
block_size_rows = new_rows;
block_size_rows = num_rows;
/// Make recursive updates for each read row: v_{i+1} = (1 - decay) v_{i} + decay v_{target}
/// Use sum of geometric sequence formula to update multiple rows: v{n} = (1 - decay)^n v_{0} + (1 - (1 - decay)^n) v_{target}
@ -179,7 +187,7 @@ void MergeTreeBlockSizePredictor::update(const Block & block, double decay)
max_size_per_row_dynamic = 0;
for (auto & info : dynamic_columns_infos)
{
size_t new_size = block.getByName(info.name).column->byteSize();
size_t new_size = columns[sample_block.getPositionByName(info.name)]->byteSize();
size_t diff_size = new_size - info.size_bytes;
double local_bytes_per_row = static_cast<double>(diff_size) / diff_rows;

View File

@ -85,7 +85,7 @@ struct MergeTreeBlockSizePredictor
void startBlock();
/// Updates statistic for more accurate prediction
void update(const Block & block, double decay = DECAY());
void update(const Block & sample_block, const Columns & columns, size_t num_rows, double decay = DECAY());
/// Return current block size (after update())
inline size_t getBlockSize() const
@ -148,7 +148,7 @@ protected:
bool is_initialized_in_update = false;
void initialize(const Block & sample_block, const Names & columns, bool from_update = false);
void initialize(const Block & sample_block, const Columns & columns, const Names & names, bool from_update = false);
public:

View File

@ -1,7 +1,6 @@
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Columns/FilterDescription.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnNothing.h>
#include <ext/range.h>
#include <DataTypes/DataTypeNothing.h>
@ -409,14 +408,27 @@ void MergeTreeRangeReader::ReadResult::setFilter(const ColumnPtr & new_filter)
MergeTreeRangeReader::MergeTreeRangeReader(
MergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_,
ExpressionActionsPtr alias_actions_, ExpressionActionsPtr prewhere_actions_,
const String * prewhere_column_name_, const Names * ordered_names_,
bool always_reorder_, bool remove_prewhere_column_, bool last_reader_in_chain_)
const String * prewhere_column_name_, bool remove_prewhere_column_, bool last_reader_in_chain_)
: merge_tree_reader(merge_tree_reader_), index_granularity(&(merge_tree_reader->data_part->index_granularity))
, prev_reader(prev_reader_), prewhere_column_name(prewhere_column_name_)
, ordered_names(ordered_names_), alias_actions(std::move(alias_actions_)), prewhere_actions(std::move(prewhere_actions_))
, always_reorder(always_reorder_), remove_prewhere_column(remove_prewhere_column_)
, alias_actions(std::move(alias_actions_)), prewhere_actions(std::move(prewhere_actions_))
, remove_prewhere_column(remove_prewhere_column_)
, last_reader_in_chain(last_reader_in_chain_), is_initialized(true)
{
if (prev_reader)
sample_block = prev_reader->getSampleBlock();
for (auto & name_and_type : merge_tree_reader->getColumns())
sample_block.insert({name_and_type.type->createColumn(), name_and_type.type, name_and_type.name});
if (alias_actions)
alias_actions->execute(sample_block, true);
if (prewhere_actions)
prewhere_actions->execute(sample_block, true);
if (remove_prewhere_column)
sample_block.erase(*prewhere_column_name);
}
bool MergeTreeRangeReader::isReadingFinished() const

View File

@ -22,8 +22,7 @@ class MergeTreeRangeReader
public:
MergeTreeRangeReader(MergeTreeReader * merge_tree_reader_, MergeTreeRangeReader * prev_reader_,
ExpressionActionsPtr alias_actions_, ExpressionActionsPtr prewhere_actions_,
const String * prewhere_column_name_, const Names * ordered_names_,
bool always_reorder_, bool remove_prewhere_column_, bool last_reader_in_chain_);
const String * prewhere_column_name_, bool remove_prewhere_column_, bool last_reader_in_chain_);
MergeTreeRangeReader() = default;
@ -185,6 +184,8 @@ public:
ReadResult read(size_t max_rows, MarkRanges & ranges);
const Block & getSampleBlock() const { return sample_block; }
private:
ReadResult startReadingChain(size_t max_rows, MarkRanges & ranges);
@ -197,13 +198,13 @@ private:
MergeTreeRangeReader * prev_reader = nullptr; /// If not nullptr, read from prev_reader firstly.
const String * prewhere_column_name = nullptr;
const Names * ordered_names = nullptr;
ExpressionActionsPtr alias_actions = nullptr; /// If not nullptr, calculate aliases.
ExpressionActionsPtr prewhere_actions = nullptr; /// If not nullptr, calculate filter.
Stream stream;
bool always_reorder = true;
Block sample_block;
bool remove_prewhere_column = false;
bool last_reader_in_chain = false;
bool is_initialized = false;