Check boundaries for some settings in parallel replicas (#61455)

This commit is contained in:
Nikita Mikhaylov 2024-03-18 12:06:45 +01:00 committed by GitHub
parent e8a08baf23
commit 30b757b40d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 35 additions and 5 deletions

View File

@ -25,6 +25,18 @@ inline bool isFinite(T x)
return true;
}
template <typename T>
bool canConvertTo(Float64 x)
{
if constexpr (std::is_floating_point_v<T>)
return true;
if (!isFinite(x))
return false;
if (x > Float64(std::numeric_limits<T>::max()) || x < Float64(std::numeric_limits<T>::lowest()))
return false;
return true;
}
template <typename T>
T NaNOrZero()

View File

@ -96,6 +96,7 @@
#include <Common/scope_guard_safe.h>
#include <Common/typeid_cast.h>
#include <Common/ProfileEvents.h>
#include <Common/NaNUtils.h>
namespace ProfileEvents
@ -2553,10 +2554,13 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads.
if (max_streams > 1 && !is_sync_remote)
{
if (auto streams_with_ratio = max_streams * settings.max_streams_to_max_threads_ratio; streams_with_ratio < SIZE_MAX)
if (auto streams_with_ratio = max_streams * settings.max_streams_to_max_threads_ratio; canConvertTo<size_t>(streams_with_ratio))
max_streams = static_cast<size_t>(streams_with_ratio);
else
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, "Exceeded limit for `max_streams` with `max_streams_to_max_threads_ratio`. Make sure that `max_streams * max_streams_to_max_threads_ratio` not exceeds {}, current value: {}", SIZE_MAX, streams_with_ratio);
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
"Exceeded limit for `max_streams` with `max_streams_to_max_threads_ratio`. "
"Make sure that `max_streams * max_streams_to_max_threads_ratio` is in some reasonable boundaries, current value: {}",
streams_with_ratio);
}
auto & prewhere_info = analysis_result.prewhere_info;

View File

@ -131,6 +131,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int TOO_MANY_ROWS;
extern const int CANNOT_PARSE_TEXT;
extern const int PARAMETER_OUT_OF_BOUND;
}
static MergeTreeReaderSettings getMergeTreeReaderSettings(
@ -348,7 +349,14 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
/// We have a special logic for local replica. It has to read less data, because in some cases it should
/// merge states of aggregate functions or do some other important stuff other than reading from Disk.
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(pool_settings.min_marks_for_concurrent_read * context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier);
const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;
if (auto result = pool_settings.min_marks_for_concurrent_read * multiplier; canConvertTo<size_t>(result))
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(result);
else
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
"Exceeded limit for the number of marks per a single task for parallel replicas. "
"Make sure that `parallel_replicas_single_task_marks_count_multiplier` is in some reasonable boundaries, current value is: {}",
multiplier);
auto pool = std::make_shared<MergeTreeReadPoolParallelReplicas>(
std::move(extension),
@ -512,8 +520,14 @@ Pipe ReadFromMergeTree::readInOrder(
.columns_to_read = required_columns,
};
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(
pool_settings.min_marks_for_concurrent_read * context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier);
const auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;
if (auto result = pool_settings.min_marks_for_concurrent_read * multiplier; canConvertTo<size_t>(result))
pool_settings.min_marks_for_concurrent_read = static_cast<size_t>(result);
else
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND,
"Exceeded limit for the number of marks per a single task for parallel replicas. "
"Make sure that `parallel_replicas_single_task_marks_count_multiplier` is in some reasonable boundaries, current value is: {}",
multiplier);
CoordinationMode mode = read_type == ReadType::InOrder
? CoordinationMode::WithOrder