This commit is contained in:
kssenii 2023-04-20 23:24:04 +02:00
parent 6edb61015d
commit 88784568dc

View File

@ -81,6 +81,9 @@ struct MergeTreePrefetchedReadPool::PartInfo
size_t approx_size_of_mark = 0;
size_t prefetch_step_marks = 0;
size_t estimated_memory_usage_for_single_prefetch = 0;
size_t required_readers_num = 0;
};
std::future<MergeTreeReaderPtr> MergeTreePrefetchedReadPool::createPrefetchedReader(
@ -301,6 +304,7 @@ MergeTreePrefetchedReadPool::PartsInfos MergeTreePrefetchedReadPool::getPartsInf
{
PartsInfos result;
Block sample_block = storage_snapshot->metadata->getSampleBlock();
const auto & settings = getContext()->getSettingsRef();
const bool predict_block_size_bytes = preferred_block_size_bytes > 0;
for (const auto & part : parts)
@ -339,6 +343,37 @@ MergeTreePrefetchedReadPool::PartsInfos MergeTreePrefetchedReadPool::getPartsInf
part_info->column_name_set = {required_column_names.begin(), required_column_names.end()};
part_info->task_columns = task_columns;
/// adjustBufferSize(), which is done in MergeTreeReaderStream and MergeTreeReaderCompact,
/// lowers buffer size if file size (or required read range) is less. So we know that the
/// settings.prefetch_buffer_size will be lowered there, therefore we account it here as well.
/// But here we make a more approximate lowering (because we do not have loaded marks yet),
/// while in adjustBufferSize it will be presize.
for (const auto & col : task_columns.columns)
{
const auto col_size = part.data_part->getColumnSize(col.name).data_uncompressed;
part_info->estimated_memory_usage_for_single_prefetch += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++part_info->required_readers_num;
}
if (reader_settings.apply_deleted_mask && part.data_part->hasLightweightDelete())
{
const auto col_size = part.data_part->getColumnSize(
LightweightDeleteDescription::FILTER_COLUMN.name).data_compressed;
part_info->estimated_memory_usage_for_single_prefetch += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++part_info->required_readers_num;
}
if (prewhere_info)
{
for (const auto & columns : task_columns.pre_columns)
{
for (const auto & col : columns)
{
const size_t col_size = part.data_part->getColumnSize(col.name).data_compressed;
part_info->estimated_memory_usage_for_single_prefetch += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++part_info->required_readers_num;
}
}
}
result.push_back(std::move(part_info));
}
@ -523,50 +558,16 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
if (allowed_memory_usage
&& (allowed_prefetches_num.has_value() == false || allowed_prefetches_num.value() > 0))
{
/// adjustBufferSize(), which is done in MergeTreeReaderStream and MergeTreeReaderCompact,
/// lowers buffer size if file size (or required read range) is less. So we know that the
/// settings.prefetch_buffer_size will be lowered there, therefore we account it here as well.
/// But here we make a more approximate lowering, while in adjustBufferSize it will be presize.
size_t estimated_memory_usage = 0;
size_t estimated_prefetches_num = 0;
for (const auto & col : part.task_columns.columns)
{
const auto col_size = part.data_part->getColumnSize(col.name).data_uncompressed;
estimated_memory_usage += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++estimated_prefetches_num;
}
if (reader_settings.apply_deleted_mask && part.data_part->hasLightweightDelete())
{
const auto col_size = part.data_part->getColumnSize(
LightweightDeleteDescription::FILTER_COLUMN.name).data_uncompressed;
estimated_memory_usage += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++estimated_prefetches_num;
}
if (prewhere_info)
{
for (const auto & columns : part.task_columns.pre_columns)
{
for (const auto & col : columns)
{
const size_t col_size = part.data_part->getColumnSize(col.name).data_uncompressed;
estimated_memory_usage += std::min<size_t>(col_size, settings.prefetch_buffer_size);
++estimated_prefetches_num;
}
}
}
allow_prefetch = estimated_memory_usage <= allowed_memory_usage
allow_prefetch = part.estimated_memory_usage_for_single_prefetch <= allowed_memory_usage
&& (allowed_prefetches_num.has_value() == false
|| estimated_prefetches_num <= allowed_prefetches_num.value());
|| part.required_readers_num <= allowed_prefetches_num.value());
if (allow_prefetch)
{
allowed_memory_usage -= estimated_memory_usage;
allowed_memory_usage -= part.estimated_memory_usage_for_single_prefetch;
if (allowed_prefetches_num.has_value())
*allowed_prefetches_num -= estimated_prefetches_num;
*allowed_prefetches_num -= part.required_readers_num;
}
}
if (allow_prefetch)