This commit is contained in:
Nikita Taranov 2024-08-18 17:44:16 +01:00
parent 8a0f41da7a
commit 30229a3bfd
5 changed files with 25 additions and 19 deletions

View File

@ -94,6 +94,4 @@ static constexpr auto DBMS_MIN_REVISION_WITH_ADAPTIVE_MARK_SEGMENT_FOR_PARALLEL_
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54470;
}
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54471;

View File

@ -1,6 +1,8 @@
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Core/Settings.h>
#include <IO/Operators.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
@ -8,6 +10,8 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/parseIdentifierOrStringLiteral.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/CollapsingSortedTransform.h>
@ -16,6 +20,7 @@
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/PartsSplitter.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Transforms/ExpressionTransform.h>
@ -24,10 +29,11 @@
#include <Processors/Transforms/SelectByIndicesTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeIndexVectorSimilarity.h>
#include <Storages/MergeTree/MergeTreeIndexLegacyVectorSimilarity.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeIndexMinMax.h>
#include <Storages/MergeTree/MergeTreeIndexVectorSimilarity.h>
#include <Storages/MergeTree/MergeTreePrefetchedReadPool.h>
#include <Storages/MergeTree/MergeTreeReadPool.h>
#include <Storages/MergeTree/MergeTreeReadPoolInOrder.h>
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicas.h>
#include <Storages/MergeTree/MergeTreeReadPoolParallelReplicasInOrder.h>
@ -41,18 +47,12 @@
#include <Common/JSONBuilder.h>
#include <Common/isLocalAddress.h>
#include <Common/logger_useful.h>
#include <Core/Settings.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Parsers/parseIdentifierOrStringLiteral.h>
#include <Parsers/ExpressionListParsers.h>
#include <Storages/MergeTree/MergeTreeIndexMinMax.h>
#include <algorithm>
#include <iterator>
#include <memory>
#include <unordered_map>
#include "Interpreters/Cluster.h"
#include "config.h"
using namespace DB;
@ -344,11 +344,12 @@ Pipe ReadFromMergeTree::readFromPoolParallelReplicas(
{
const auto & client_info = context->getClientInfo();
auto extension = ParallelReadingExtension{
auto extension = ParallelReadingExtension
{
.all_callback = all_ranges_callback.value(),
.callback = read_task_callback.value(),
.number_of_current_replica = client_info.number_of_current_replica,
.total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().begin()->getAllNodeCount(),
.total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount(),
};
/// We have a special logic for local replica. It has to read less data, because in some cases it should
@ -523,11 +524,12 @@ Pipe ReadFromMergeTree::readInOrder(
if (is_parallel_reading_from_replicas)
{
const auto & client_info = context->getClientInfo();
ParallelReadingExtension extension{
ParallelReadingExtension extension
{
.all_callback = all_ranges_callback.value(),
.callback = read_task_callback.value(),
.number_of_current_replica = client_info.number_of_current_replica,
.total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().begin()->getAllNodeCount(),
.total_nodes_count = context->getClusterForParallelReplicas()->getShardsInfo().at(0).getAllNodeCount(),
};
auto multiplier = context->getSettingsRef().parallel_replicas_single_task_marks_count_multiplier;

View File

@ -44,14 +44,16 @@ size_t chooseSegmentSize(
min_marks_per_task * threads,
sum_marks / number_of_replicas / number_of_replicas);
/// Here we take max of three numbers:
/// * user provided setting (0 by default)
/// Here we take max of two numbers:
/// * (min_marks_per_task * threads) = the number of marks we request from the coordinator each time - there is no point to have segments smaller than one unit of work for a replica
/// * (sum_marks / number_of_replicas^2) - we use consistent hashing for work distribution (including work stealing). If we have a really slow replica
/// everything up to (1/number_of_replicas) portion of its work will be stolen by other replicas. And it owns (1/number_of_replicas) share of total number of marks.
/// Also important to note here that sum_marks is calculated after PK analysis, it means in particular that different segment sizes might be used for the
/// same table for different queries (it is intentional).
mark_segment_size = std::max({mark_segment_size, min_marks_per_task * threads, sum_marks / number_of_replicas / number_of_replicas});
///
/// Positive `mark_segment_size` means it is a user provided value, we have to preserve it.
if (mark_segment_size == 0)
mark_segment_size = std::max(min_marks_per_task * threads, sum_marks / number_of_replicas / number_of_replicas);
/// Squeeze the value to the borders.
mark_segment_size = std::clamp(mark_segment_size, borders.front(), borders.back());

View File

@ -20,6 +20,10 @@ namespace ErrorCodes
namespace
{
/// Previously we had a separate protocol version number for parallel replicas.
/// But we didn't maintain backward compatibility and every protocol change was breaking.
/// Now we have to support at least minimal tail of the previous versions and the implementation
/// is based on the common tcp protocol version as in all other places.
constexpr UInt64 DEPRECATED_FIELD_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
CoordinationMode validateAndGet(uint8_t candidate)

View File

@ -94,7 +94,7 @@ struct InitialAllRangesAnnouncement
InitialAllRangesAnnouncement(
CoordinationMode mode_, RangesInDataPartsDescription description_, size_t replica_num_, size_t mark_segment_size_)
: mode(mode_), description(description_), replica_num(replica_num_), mark_segment_size(mark_segment_size_)
: mode(mode_), description(std::move(description_)), replica_num(replica_num_), mark_segment_size(mark_segment_size_)
{}
CoordinationMode mode;