diff --git a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp index 3c069795345..96bfc8bc33b 100644 --- a/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp +++ b/src/Storages/MergeTree/MergeTreePrefetchedReadPool.cpp @@ -424,11 +424,16 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr "Sum marks: {}, threads: {}, min_marks_per_thread: {}, result prefetch step marks: {}, prefetches limit: {}, total_size_approx: {}", sum_marks, threads, min_marks_per_thread, settings.filesystem_prefetch_step_bytes, settings.filesystem_prefetches_limit, total_size_approx); - size_t current_prefetches_count = 0; + size_t allowed_memory_usage = settings.filesystem_prefetch_max_memory_usage; + if (!allowed_memory_usage) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `filesystem_prefetch_max_memory_usage` must be non-zero"); + std::optional allowed_prefetches_num = settings.filesystem_prefetches_limit + ? std::optional(settings.filesystem_prefetches_limit) + : std::nullopt; + prefetch_queue.reserve(total_prefetches_approx); ThreadsTasks result_threads_tasks; - size_t memory_usage_approx = 0; for (size_t i = 0, part_idx = 0; i < threads && part_idx < parts_infos.size(); ++i) { auto need_marks = min_marks_per_thread; @@ -516,22 +521,55 @@ MergeTreePrefetchedReadPool::ThreadsTasks MergeTreePrefetchedReadPool::createThr read_task->priority = priority; - bool allow_prefetch = !settings.filesystem_prefetches_limit || current_prefetches_count + 1 <= settings.filesystem_prefetches_limit; - if (allow_prefetch && settings.filesystem_prefetch_max_memory_usage) + bool allow_prefetch = false; + if (allowed_memory_usage + && (allowed_prefetches_num.has_value() == false || allowed_prefetches_num.value() > 0)) { - size_t num_readers = part.task_columns.columns.size(); - if (reader_settings.apply_deleted_mask && part.data_part->hasLightweightDelete()) - ++num_readers; - if (prewhere_info) - num_readers += part.task_columns.pre_columns.size(); - memory_usage_approx += settings.max_read_buffer_size * num_readers; + size_t estimated_memory_usage = 0; + size_t estimated_prefetches_num = 0; + + for (const auto & col : part.task_columns.columns) + { + const auto col_size = part.data_part->getColumnSize(col.name).data_uncompressed; + estimated_memory_usage += std::min(col_size, settings.max_read_buffer_size); + ++estimated_prefetches_num; + } + if (reader_settings.apply_deleted_mask && part.data_part->hasLightweightDelete()) + { + const auto col_size = part.data_part->getColumnSize( + LightweightDeleteDescription::FILTER_COLUMN.name).data_uncompressed; + estimated_memory_usage += std::min(col_size, settings.max_read_buffer_size); + ++estimated_prefetches_num; + } + if (prewhere_info) + { + for (const auto & columns : part.task_columns.pre_columns) + { + for (const auto & col : columns) + { + const size_t col_size = part.data_part->getColumnSize(col.name).data_uncompressed; + estimated_memory_usage += std::min(col_size, settings.max_read_buffer_size); + ++estimated_prefetches_num; + } + } + } + + allow_prefetch = estimated_memory_usage <= allowed_memory_usage + && (allowed_prefetches_num.has_value() == false + || estimated_prefetches_num <= allowed_prefetches_num.value()); + + if (allow_prefetch) + { + allowed_memory_usage -= estimated_memory_usage; + if (allowed_prefetches_num.has_value()) + *allowed_prefetches_num -= estimated_prefetches_num; + } - allow_prefetch = memory_usage_approx <= settings.filesystem_prefetch_max_memory_usage; } + if (allow_prefetch) { prefetch_queue.emplace(TaskHolder(read_task.get())); - ++current_prefetches_count; } ++priority;