mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 18:32:29 +00:00
fix
This commit is contained in:
parent
67bd861bc3
commit
30d216f863
@ -1,10 +1,10 @@
|
||||
#include <Storages/MergeTree/MergeTreeReadPool.h>
|
||||
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
|
||||
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
|
||||
#include <Storages/MergeTree/MergeTreeBaseSelectProcessor.h>
|
||||
#include <Storages/MergeTree/MergeTreeReadPool.h>
|
||||
#include <base/range.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/formatReadable.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <base/range.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -56,26 +56,29 @@ MergeTreeReadPool::MergeTreeReadPool(
|
||||
, backoff_settings{context_->getSettingsRef()}
|
||||
, backoff_state{threads_}
|
||||
{
|
||||
const auto & settings = context_->getSettingsRef();
|
||||
|
||||
size_t total_compressed_bytes = 0;
|
||||
size_t total_marks = 0;
|
||||
for (const auto & part : parts_ranges)
|
||||
if (std::ranges::count(is_part_on_remote_disk, true))
|
||||
{
|
||||
total_compressed_bytes += getApproxSizeOfPart(
|
||||
*part.data_part, prewhere_info ? prewhere_info->prewhere_actions->getRequiredColumnsNames() : column_names_);
|
||||
total_marks += part.getMarksCount();
|
||||
}
|
||||
const auto & settings = context_->getSettingsRef();
|
||||
|
||||
if (total_marks)
|
||||
{
|
||||
const auto min_bytes_per_task = settings.merge_tree_min_bytes_per_task_for_remote_reading;
|
||||
const auto avg_mark_bytes = std::max<size_t>(total_compressed_bytes / total_marks, 1);
|
||||
/// We're taking min here because number of tasks shouldn't be too low - it will make task stealing impossible.
|
||||
const auto heuristic_min_marks = std::min(total_marks / threads_ / 8, min_bytes_per_task / avg_mark_bytes);
|
||||
if (heuristic_min_marks > min_marks_for_concurrent_read)
|
||||
size_t total_compressed_bytes = 0;
|
||||
size_t total_marks = 0;
|
||||
for (const auto & part : parts_ranges)
|
||||
{
|
||||
min_marks_for_concurrent_read = heuristic_min_marks;
|
||||
total_compressed_bytes += getApproxSizeOfPart(
|
||||
*part.data_part, prewhere_info ? prewhere_info->prewhere_actions->getRequiredColumnsNames() : column_names_);
|
||||
total_marks += part.getMarksCount();
|
||||
}
|
||||
|
||||
if (total_marks)
|
||||
{
|
||||
const auto min_bytes_per_task = settings.merge_tree_min_bytes_per_task_for_remote_reading;
|
||||
const auto avg_mark_bytes = std::max<size_t>(total_compressed_bytes / total_marks, 1);
|
||||
/// We're taking min here because number of tasks shouldn't be too low - it will make task stealing impossible.
|
||||
const auto heuristic_min_marks = std::min(total_marks / threads_, min_bytes_per_task / avg_mark_bytes);
|
||||
if (heuristic_min_marks > min_marks_for_concurrent_read)
|
||||
{
|
||||
min_marks_for_concurrent_read = heuristic_min_marks;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user