diff --git a/src/Core/ParallelReplicasProtocolDefines.h b/src/Core/ParallelReplicasProtocolDefines.h new file mode 100644 index 00000000000..9330d7b43f4 --- /dev/null +++ b/src/Core/ParallelReplicasProtocolDefines.h @@ -0,0 +1,8 @@ +#pragma once + +namespace DB +{ +static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; +static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4; +static constexpr auto DBMS_PARALLEL_REPLICAS_VERSION_WITH_RIGHT_JOINS_FIX_71162 = 5; +} diff --git a/src/Core/ProtocolDefines.h b/src/Core/ProtocolDefines.h index b68eff0aa5a..ebb4cf574bb 100644 --- a/src/Core/ProtocolDefines.h +++ b/src/Core/ProtocolDefines.h @@ -33,9 +33,7 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54 static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1; -static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; -static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4; -static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4; +static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 5; static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453; static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1; diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 4c8761e503e..277ded72ef8 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -5628,6 +5628,9 @@ Parts virtually divided into segments to be distributed between replicas for par )", BETA) \ DECLARE(Bool, parallel_replicas_local_plan, true, R"( Build local plan for local replica +)", BETA) \ + DECLARE(Bool, parallel_replicas_strict_versioning, true, R"( +Build local plan for local replica )", BETA) \ \ DECLARE(Bool, allow_experimental_analyzer, true, R"( diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 5faae03bc8f..5ff1c74008d 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -43,6 +43,7 @@ namespace Setting extern const SettingsBool skip_unavailable_shards; extern const SettingsOverflowMode timeout_overflow_mode; extern const SettingsBool use_hedged_requests; + extern const SettingsBool parallel_replicas_strict_versioning; } namespace ErrorCodes @@ -702,6 +703,15 @@ void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest re throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized"); ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived); + + const auto & settings = context->getSettingsRef(); + if (settings[Setting::parallel_replicas_strict_versioning] && request.protocol_version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) + { + ParallelReadResponse response{.finish = true, .description = {}}; + connections->sendMergeTreeReadTaskResponse(response); + return; + } + auto response = extension->parallel_reading_coordinator->handleRequest(std::move(request)); connections->sendMergeTreeReadTaskResponse(response); } @@ -711,6 +721,10 @@ void RemoteQueryExecutor::processMergeTreeInitialReadAnnouncement(InitialAllRang if (!extension || !extension->parallel_reading_coordinator) throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized"); + const auto & settings = context->getSettingsRef(); + if (settings[Setting::parallel_replicas_strict_versioning] && announcement.protocol_version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION) + return; + extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(std::move(announcement)); } diff --git a/src/Storages/MergeTree/RequestResponse.cpp b/src/Storages/MergeTree/RequestResponse.cpp index f6859b26908..657a7011ee0 100644 --- a/src/Storages/MergeTree/RequestResponse.cpp +++ b/src/Storages/MergeTree/RequestResponse.cpp @@ -1,7 +1,7 @@ -#include #include #include +#include #include #include #include @@ -76,7 +76,7 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in) readIntBinary(min_number_of_marks, in); description.deserialize(in); - return ParallelReadRequest(mode, replica_num, min_number_of_marks, std::move(description)); + return ParallelReadRequest(mode, replica_num, min_number_of_marks, std::move(description), version); } void ParallelReadRequest::merge(ParallelReadRequest & other) @@ -171,7 +171,7 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe if (replica_protocol_version >= DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD) readIntBinary(mark_segment_size, in); - return InitialAllRangesAnnouncement{mode, description, replica_num, mark_segment_size}; + return InitialAllRangesAnnouncement{mode, description, replica_num, mark_segment_size, version}; } } diff --git a/src/Storages/MergeTree/RequestResponse.h b/src/Storages/MergeTree/RequestResponse.h index 96b65c45bfa..74963516ee9 100644 --- a/src/Storages/MergeTree/RequestResponse.h +++ b/src/Storages/MergeTree/RequestResponse.h @@ -49,11 +49,13 @@ struct ParallelReadRequest CoordinationMode mode_, size_t replica_num_, size_t min_number_of_marks_, - RangesInDataPartsDescription description_) + RangesInDataPartsDescription description_, + UInt64 protocol_version_ = 0) : mode(mode_) , replica_num(replica_num_) , min_number_of_marks(min_number_of_marks_) , description(std::move(description_)) + , protocol_version(protocol_version_) {} CoordinationMode mode; @@ -62,6 +64,7 @@ struct ParallelReadRequest /// Extension for Ordered (InOrder or ReverseOrder) mode /// Contains only data part names without mark ranges. RangesInDataPartsDescription description; + UInt64 protocol_version; void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const; String describe() const; @@ -93,14 +96,23 @@ struct InitialAllRangesAnnouncement /// No default constructor, you must initialize all fields at once. InitialAllRangesAnnouncement( - CoordinationMode mode_, RangesInDataPartsDescription description_, size_t replica_num_, size_t mark_segment_size_) - : mode(mode_), description(std::move(description_)), replica_num(replica_num_), mark_segment_size(mark_segment_size_) + CoordinationMode mode_, + RangesInDataPartsDescription description_, + size_t replica_num_, + size_t mark_segment_size_, + UInt64 protocol_version_ = 0) + : mode(mode_) + , description(std::move(description_)) + , replica_num(replica_num_) + , mark_segment_size(mark_segment_size_) + , protocol_version(protocol_version_) {} CoordinationMode mode; RangesInDataPartsDescription description; size_t replica_num; size_t mark_segment_size; + UInt64 protocol_version; void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const; String describe();