More correct accounting of limits

This commit is contained in:
kssenii 2023-04-20 15:42:26 +02:00
parent ec1370b2ed
commit 59211c490a

View File

@ -424,11 +424,16 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
"Sum marks: {}, threads: {}, min_marks_per_thread: {}, result prefetch step marks: {}, prefetches limit: {}, total_size_approx: {}",
sum_marks, threads, min_marks_per_thread, settings.filesystem_prefetch_step_bytes, settings.filesystem_prefetches_limit, total_size_approx);
size_t current_prefetches_count = 0;
size_t allowed_memory_usage = settings.filesystem_prefetch_max_memory_usage;
if (!allowed_memory_usage)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `filesystem_prefetch_max_memory_usage` must be non-zero");
std::optional<size_t> allowed_prefetches_num = settings.filesystem_prefetches_limit
? std::optional<size_t>(settings.filesystem_prefetches_limit)
: std::nullopt;
prefetch_queue.reserve(total_prefetches_approx);
ThreadsTasks result_threads_tasks;
size_t memory_usage_approx = 0;
for (size_t i = 0, part_idx = 0; i < threads && part_idx < parts_infos.size(); ++i)
{
auto need_marks = min_marks_per_thread;
@ -516,22 +521,55 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr
read_task->priority = priority;
bool allow_prefetch = !settings.filesystem_prefetches_limit || current_prefetches_count + 1 <= settings.filesystem_prefetches_limit;
if (allow_prefetch && settings.filesystem_prefetch_max_memory_usage)
bool allow_prefetch = false;
if (allowed_memory_usage
&& (allowed_prefetches_num.has_value() == false || allowed_prefetches_num.value() > 0))
{
size_t num_readers = part.task_columns.columns.size();
if (reader_settings.apply_deleted_mask && part.data_part->hasLightweightDelete())
++num_readers;
if (prewhere_info)
num_readers += part.task_columns.pre_columns.size();
memory_usage_approx += settings.max_read_buffer_size * num_readers;
size_t estimated_memory_usage = 0;
size_t estimated_prefetches_num = 0;
for (const auto & col : part.task_columns.columns)
{
const auto col_size = part.data_part->getColumnSize(col.name).data_uncompressed;
estimated_memory_usage += std::min<size_t>(col_size, settings.max_read_buffer_size);
++estimated_prefetches_num;
}
if (reader_settings.apply_deleted_mask && part.data_part->hasLightweightDelete())
{
const auto col_size = part.data_part->getColumnSize(
LightweightDeleteDescription::FILTER_COLUMN.name).data_uncompressed;
estimated_memory_usage += std::min<size_t>(col_size, settings.max_read_buffer_size);
++estimated_prefetches_num;
}
if (prewhere_info)
{
for (const auto & columns : part.task_columns.pre_columns)
{
for (const auto & col : columns)
{
const size_t col_size = part.data_part->getColumnSize(col.name).data_uncompressed;
estimated_memory_usage += std::min<size_t>(col_size, settings.max_read_buffer_size);
++estimated_prefetches_num;
}
}
}
allow_prefetch = estimated_memory_usage <= allowed_memory_usage
&& (allowed_prefetches_num.has_value() == false
|| estimated_prefetches_num <= allowed_prefetches_num.value());
if (allow_prefetch)
{
allowed_memory_usage -= estimated_memory_usage;
if (allowed_prefetches_num.has_value())
*allowed_prefetches_num -= estimated_prefetches_num;
}
allow_prefetch = memory_usage_approx <= settings.filesystem_prefetch_max_memory_usage;
}
if (allow_prefetch)
{
prefetch_queue.emplace(TaskHolder(read_task.get()));
++current_prefetches_count;
}
++priority;