added preferred_max_column_in_block_size_bytes setting, added filtration ratio prediction [#CLICKHOUSE-3065]

This commit is contained in:
Nikolai Kochetov 2017-06-30 19:28:27 +03:00 committed by alexey-milovidov
parent 4470afc9a9
commit 8c65d6013e
11 changed files with 60 additions and 30 deletions

View File

@ -283,7 +283,10 @@ struct Settings
*/ \
M(SettingBool, fallback_to_stale_replicas_for_distributed_queries, 1) \
/** For development and testing purposes only still */ \
M(SettingBool, distributed_ddl_allow_replicated_alter, 0)
M(SettingBool, distributed_ddl_allow_replicated_alter, 0) \
/** Limit on max column size in block while reading. Helps to decrease cache misses count. \
* Should be close to L2 cache size. */ \
M(SettingUInt64, preferred_max_column_in_block_size_bytes, 250000)
/// Possible limits for query execution.

View File

@ -20,6 +20,7 @@ MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
const String & prewhere_column,
size_t max_block_size_rows,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
size_t min_bytes_to_use_direct_io,
size_t max_read_buffer_size,
bool use_uncompressed_cache,
@ -31,6 +32,7 @@ MergeTreeBaseBlockInputStream::MergeTreeBaseBlockInputStream(
prewhere_column(prewhere_column),
max_block_size_rows(max_block_size_rows),
preferred_block_size_bytes(preferred_block_size_bytes),
preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes),
min_bytes_to_use_direct_io(min_bytes_to_use_direct_io),
max_read_buffer_size(max_read_buffer_size),
use_uncompressed_cache(use_uncompressed_cache),
@ -71,26 +73,26 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
task->size_predictor->startBlock();
const auto preferred_block_size_bytes = this->preferred_block_size_bytes;
const auto preferred_max_column_in_block_size_bytes =
this->preferred_max_column_in_block_size_bytes ? this->preferred_max_column_in_block_size_bytes : max_block_size_rows;
const auto index_granularity = storage.index_granularity;
const auto default_block_size = std::max(1LU, max_block_size_rows);
const double min_filtration_ratio = 0.00001;
auto estimateNumRows = [preferred_block_size_bytes, default_block_size, index_granularity](
auto estimateNumRows = [preferred_block_size_bytes, default_block_size,
index_granularity, preferred_max_column_in_block_size_bytes, min_filtration_ratio](
MergeTreeReadTask & task, MergeTreeRangeReader & reader)
{
if (!task.size_predictor)
return default_block_size;
return task.size_predictor->estimateNumRowsMax(preferred_block_size_bytes);
/*
size_t recommended_rows = task.size_predictor->estimateNumRows(preferred_block_size_bytes);
recommended_rows = std::min(default_block_size, recommended_rows);
// size_t marks_to_read = (reader.readRowsInCurrentGranule() + recommended_rows + index_granularity / 2) / index_granularity;
// if (!marks_to_read)
// return recommended_rows;
// size_t rows_to_read = marks_to_read * index_granularity - reader.readRowsInCurrentGranule();
size_t rows_to_read = recommended_rows;
return rows_to_read;
// return 2 * recommended_rows > rows_to_read ? rows_to_read : recommended_rows;
*/
size_t rows_to_read_for_block = task.size_predictor->estimateNumRows(preferred_block_size_bytes);
size_t rows_to_read_for_max_size_column
= task.size_predictor->estimateNumRowsForMaxSizeColumn(preferred_max_column_in_block_size_bytes);
double filtration_ratio = std::max(min_filtration_ratio, 1.0 - task.size_predictor->filtered_rows_ration);
size_t rows_to_read_for_max_size_column_with_filtration
= static_cast<size_t>( rows_to_read_for_max_size_column / filtration_ratio);
return std::min(rows_to_read_for_block, rows_to_read_for_max_size_column_with_filtration);
};
// read rows from reader and clean columns
@ -147,11 +149,10 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
processNextRange(*task, *pre_reader);
/// FIXME: size prediction model is updated by filtered rows, but it predicts size of unfiltered rows also
size_t space_left = std::max(1LU, max_block_size_rows);
size_t recommended_rows = estimateNumRows(*task, *pre_range_reader);
if (res && recommended_rows < 1)
break;
space_left = std::min(space_left, std::max(1LU, recommended_rows));
size_t space_left = std::max(1LU, std::min(max_block_size_rows, recommended_rows));
while ((pre_range_reader || !task->mark_ranges.empty()) && space_left && !isCancelled())
{
@ -335,6 +336,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
if (!post_filter_pos)
{
task->size_predictor->updateFilteredRowsRation(pre_filter.size(), pre_filter.size());
res.clear();
continue;
}
@ -355,6 +357,7 @@ Block MergeTreeBaseBlockInputStream::readFromPart()
col.column->filter(task->column_name_set.count(col.name) ? post_filter : pre_filter, -1);
rows = col.column->size();
}
task->size_predictor->updateFilteredRowsRation(pre_filter.size(), pre_filter.size() - rows);
/// Replace column with condition value from PREWHERE to a constant.
if (!task->remove_prewhere_column)

View File

@ -21,6 +21,7 @@ public:
const String & prewhere_column,
size_t max_block_size_rows,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
size_t min_bytes_to_use_direct_io,
size_t max_read_buffer_size,
bool use_uncompressed_cache,
@ -52,6 +53,7 @@ protected:
size_t max_block_size_rows;
size_t preferred_block_size_bytes;
size_t preferred_max_column_in_block_size_bytes;
size_t min_bytes_to_use_direct_io;
size_t max_read_buffer_size;

View File

@ -20,6 +20,7 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
const MergeTreeData::DataPartPtr & owned_data_part_,
size_t max_block_size_rows_,
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
Names column_names,
const MarkRanges & mark_ranges_,
bool use_uncompressed_cache_,
@ -33,8 +34,9 @@ MergeTreeBlockInputStream::MergeTreeBlockInputStream(
size_t part_index_in_query_,
bool quiet)
:
MergeTreeBaseBlockInputStream{storage_, prewhere_actions_, prewhere_column_, max_block_size_rows_, preferred_block_size_bytes_,
min_bytes_to_use_direct_io_, max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names},
MergeTreeBaseBlockInputStream{storage_, prewhere_actions_, prewhere_column_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_, min_bytes_to_use_direct_io_,
max_read_buffer_size_, use_uncompressed_cache_, save_marks_in_cache_, virt_column_names},
ordered_names{column_names},
data_part{owned_data_part_},
part_columns_lock{new Poco::ScopedReadRWLock(data_part->columns_lock)},

View File

@ -20,6 +20,7 @@ public:
const MergeTreeData::DataPartPtr & owned_data_part,
size_t max_block_size_rows,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
Names column_names,
const MarkRanges & mark_ranges,
bool use_uncompressed_cache,

View File

@ -80,15 +80,17 @@ struct MergeTreeBlockSizePredictor
return block_size_bytes;
}
/// Predicts what number of rows should be read to exhaust byte quota
inline size_t estimateNumRowsMax(size_t bytes_quota) const
/// Predicts what number of rows should be read to exhaust byte quota per column
inline size_t estimateNumRowsForMaxSizeColumn(size_t bytes_quota) const
{
double max_size_per_row = std::max<double>(std::max<size_t>(max_size_per_row_fixed, 1), max_size_per_row_dynamic);
return (bytes_quota > block_size_rows * max_size_per_row)
? static_cast<size_t>(bytes_quota / max_size_per_row) - block_size_rows
: 0;
: 0;
}
/// Predicts what number of rows should be read to exhaust byte quota per block
inline size_t estimateNumRows(size_t bytes_quota) const
{
return (bytes_quota > block_size_bytes)
@ -102,6 +104,14 @@ struct MergeTreeBlockSizePredictor
return (estimateNumRows(bytes_quota) + index_granularity / 2) / index_granularity;
}
inline void updateFilteredRowsRation(size_t rows_was_read, size_t rows_was_filtered, double decay = DECAY())
{
double alpha = std::pow(1. - decay, rows_was_read);
double current_ration = rows_was_filtered / std::max<double>(1, rows_was_read);
filtered_rows_ration = current_ration < filtered_rows_ration ? current_ration
: alpha * filtered_rows_ration + (1.0 - alpha) * current_ration;
}
/// Aggressiveness of bytes_per_row updates. See update() implementation.
/// After n=NUM_UPDATES_TO_TARGET_WEIGHT updates v_{n} = (1 - TARGET_WEIGHT) * v_{0} + TARGET_WEIGHT * v_{target}
static constexpr double TARGET_WEIGHT = 0.5;
@ -125,6 +135,7 @@ protected:
size_t max_size_per_row_fixed = 0;
double max_size_per_row_dynamic = 0;
public:
size_t block_size_bytes = 0;
@ -133,6 +144,8 @@ public:
/// Total statistics
double bytes_per_row_current = 0;
double bytes_per_row_global = 0;
double filtered_rows_ration = 0;
};
}

View File

@ -1026,7 +1026,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
{
MarkRanges ranges{MarkRange(0, part->size)};
BlockInputStreamPtr part_in = std::make_shared<MergeTreeBlockInputStream>(
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, expression->getRequiredColumns(), ranges,
*this, part, DEFAULT_MERGE_BLOCK_SIZE, 0, 0, expression->getRequiredColumns(), ranges,
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
ExpressionBlockInputStream in(part_in, expression);

View File

@ -552,7 +552,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
for (size_t i = 0; i < parts.size(); ++i)
{
auto input = std::make_unique<MergeTreeBlockInputStream>(
data, parts[i], DEFAULT_MERGE_BLOCK_SIZE, 0, merging_column_names, MarkRanges(1, MarkRange(0, parts[i]->size)),
data, parts[i], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, merging_column_names, MarkRanges(1, MarkRange(0, parts[i]->size)),
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback(
@ -686,7 +686,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_stream = std::make_shared<MergeTreeBlockInputStream>(
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->size)},
data, parts[part_num], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_name_, MarkRanges{MarkRange(0, parts[part_num]->size)},
false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false, Names{}, 0, true);
column_part_stream->setProgressCallback(
@ -894,7 +894,7 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
auto input = std::make_unique<MergeTreeBlockInputStream>(
data, parts[i], DEFAULT_MERGE_BLOCK_SIZE, 0, column_names,
data, parts[i], DEFAULT_MERGE_BLOCK_SIZE, 0, 0, column_names,
ranges, false, nullptr, "", true, aio_threshold, DBMS_DEFAULT_BUFFER_SIZE, false);
input->setProgressCallback([&merge_entry, rows_total] (const Progress & value)

View File

@ -636,7 +636,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
for (std::size_t i = 0; i < num_streams; ++i)
{
res.emplace_back(std::make_shared<MergeTreeThreadBlockInputStream>(
i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes, data, use_uncompressed_cache,
i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache,
prewhere_actions, prewhere_column, settings, virt_columns));
if (i == 0)
@ -707,7 +708,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
}
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes, column_names, ranges_to_get_from_part,
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query);
@ -753,7 +755,8 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal
RangesInDataPart & part = parts[part_index];
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
virt_columns, part.part_index_in_query);

View File

@ -14,6 +14,7 @@ MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
const size_t min_marks_to_read_,
const size_t max_block_size_rows,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
MergeTreeData & storage,
const bool use_uncompressed_cache,
const ExpressionActionsPtr & prewhere_actions,
@ -21,8 +22,9 @@ MergeTreeThreadBlockInputStream::MergeTreeThreadBlockInputStream(
const Settings & settings,
const Names & virt_column_names)
:
MergeTreeBaseBlockInputStream{storage, prewhere_actions, prewhere_column, max_block_size_rows, preferred_block_size_bytes,
settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, use_uncompressed_cache, true, virt_column_names},
MergeTreeBaseBlockInputStream{storage, prewhere_actions, prewhere_column, max_block_size_rows,
preferred_block_size_bytes, preferred_max_column_in_block_size_bytes, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, use_uncompressed_cache, true, virt_column_names},
thread{thread},
pool{pool}
{

View File

@ -20,6 +20,7 @@ public:
const size_t min_marks_to_read,
const size_t max_block_size,
size_t preferred_block_size_bytes,
size_t preferred_max_column_in_block_size_bytes,
MergeTreeData & storage,
const bool use_uncompressed_cache,
const ExpressionActionsPtr & prewhere_actions,