PR strict versioning

This commit is contained in:
Igor Nikonov 2024-11-12 11:40:55 +00:00
parent 39e01d47b1
commit ee498e5f48
6 changed files with 44 additions and 9 deletions

View File

@ -0,0 +1,8 @@
#pragma once
namespace DB
{
static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4;
static constexpr auto DBMS_PARALLEL_REPLICAS_VERSION_WITH_RIGHT_JOINS_FIX_71162 = 5;
}

View File

@ -33,9 +33,7 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1; static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 1;
static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3; static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 5;
static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4;
static constexpr auto DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION = 4;
static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453; static constexpr auto DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS = 54453;
static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1; static constexpr auto DBMS_MERGE_TREE_PART_INFO_VERSION = 1;

View File

@ -5628,6 +5628,9 @@ Parts virtually divided into segments to be distributed between replicas for par
)", BETA) \ )", BETA) \
DECLARE(Bool, parallel_replicas_local_plan, true, R"( DECLARE(Bool, parallel_replicas_local_plan, true, R"(
Build local plan for local replica Build local plan for local replica
)", BETA) \
DECLARE(Bool, parallel_replicas_strict_versioning, true, R"(
Build local plan for local replica
)", BETA) \ )", BETA) \
\ \
DECLARE(Bool, allow_experimental_analyzer, true, R"( DECLARE(Bool, allow_experimental_analyzer, true, R"(

View File

@ -43,6 +43,7 @@ namespace Setting
extern const SettingsBool skip_unavailable_shards; extern const SettingsBool skip_unavailable_shards;
extern const SettingsOverflowMode timeout_overflow_mode; extern const SettingsOverflowMode timeout_overflow_mode;
extern const SettingsBool use_hedged_requests; extern const SettingsBool use_hedged_requests;
extern const SettingsBool parallel_replicas_strict_versioning;
} }
namespace ErrorCodes namespace ErrorCodes
@ -702,6 +703,15 @@ void RemoteQueryExecutor::processMergeTreeReadTaskRequest(ParallelReadRequest re
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived); ProfileEvents::increment(ProfileEvents::MergeTreeReadTaskRequestsReceived);
const auto & settings = context->getSettingsRef();
if (settings[Setting::parallel_replicas_strict_versioning] && request.protocol_version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION)
{
ParallelReadResponse response{.finish = true, .description = {}};
connections->sendMergeTreeReadTaskResponse(response);
return;
}
auto response = extension->parallel_reading_coordinator->handleRequest(std::move(request)); auto response = extension->parallel_reading_coordinator->handleRequest(std::move(request));
connections->sendMergeTreeReadTaskResponse(response); connections->sendMergeTreeReadTaskResponse(response);
} }
@ -711,6 +721,10 @@ void RemoteQueryExecutor::processMergeTreeInitialReadAnnouncement(InitialAllRang
if (!extension || !extension->parallel_reading_coordinator) if (!extension || !extension->parallel_reading_coordinator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Coordinator for parallel reading from replicas is not initialized");
const auto & settings = context->getSettingsRef();
if (settings[Setting::parallel_replicas_strict_versioning] && announcement.protocol_version != DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION)
return;
extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(std::move(announcement)); extension->parallel_reading_coordinator->handleInitialAllRangesAnnouncement(std::move(announcement));
} }

View File

@ -1,7 +1,7 @@
#include <chrono>
#include <Storages/MergeTree/RequestResponse.h> #include <Storages/MergeTree/RequestResponse.h>
#include <Core/ProtocolDefines.h> #include <Core/ProtocolDefines.h>
#include <Core/ParallelReplicasProtocolDefines.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/VarInt.h> #include <IO/VarInt.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -76,7 +76,7 @@ ParallelReadRequest ParallelReadRequest::deserialize(ReadBuffer & in)
readIntBinary(min_number_of_marks, in); readIntBinary(min_number_of_marks, in);
description.deserialize(in); description.deserialize(in);
return ParallelReadRequest(mode, replica_num, min_number_of_marks, std::move(description)); return ParallelReadRequest(mode, replica_num, min_number_of_marks, std::move(description), version);
} }
void ParallelReadRequest::merge(ParallelReadRequest & other) void ParallelReadRequest::merge(ParallelReadRequest & other)
@ -171,7 +171,7 @@ InitialAllRangesAnnouncement InitialAllRangesAnnouncement::deserialize(ReadBuffe
if (replica_protocol_version >= DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD) if (replica_protocol_version >= DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD)
readIntBinary(mark_segment_size, in); readIntBinary(mark_segment_size, in);
return InitialAllRangesAnnouncement{mode, description, replica_num, mark_segment_size}; return InitialAllRangesAnnouncement{mode, description, replica_num, mark_segment_size, version};
} }
} }

View File

@ -49,11 +49,13 @@ struct ParallelReadRequest
CoordinationMode mode_, CoordinationMode mode_,
size_t replica_num_, size_t replica_num_,
size_t min_number_of_marks_, size_t min_number_of_marks_,
RangesInDataPartsDescription description_) RangesInDataPartsDescription description_,
UInt64 protocol_version_ = 0)
: mode(mode_) : mode(mode_)
, replica_num(replica_num_) , replica_num(replica_num_)
, min_number_of_marks(min_number_of_marks_) , min_number_of_marks(min_number_of_marks_)
, description(std::move(description_)) , description(std::move(description_))
, protocol_version(protocol_version_)
{} {}
CoordinationMode mode; CoordinationMode mode;
@ -62,6 +64,7 @@ struct ParallelReadRequest
/// Extension for Ordered (InOrder or ReverseOrder) mode /// Extension for Ordered (InOrder or ReverseOrder) mode
/// Contains only data part names without mark ranges. /// Contains only data part names without mark ranges.
RangesInDataPartsDescription description; RangesInDataPartsDescription description;
UInt64 protocol_version;
void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const; void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const;
String describe() const; String describe() const;
@ -93,14 +96,23 @@ struct InitialAllRangesAnnouncement
/// No default constructor, you must initialize all fields at once. /// No default constructor, you must initialize all fields at once.
InitialAllRangesAnnouncement( InitialAllRangesAnnouncement(
CoordinationMode mode_, RangesInDataPartsDescription description_, size_t replica_num_, size_t mark_segment_size_) CoordinationMode mode_,
: mode(mode_), description(std::move(description_)), replica_num(replica_num_), mark_segment_size(mark_segment_size_) RangesInDataPartsDescription description_,
size_t replica_num_,
size_t mark_segment_size_,
UInt64 protocol_version_ = 0)
: mode(mode_)
, description(std::move(description_))
, replica_num(replica_num_)
, mark_segment_size(mark_segment_size_)
, protocol_version(protocol_version_)
{} {}
CoordinationMode mode; CoordinationMode mode;
RangesInDataPartsDescription description; RangesInDataPartsDescription description;
size_t replica_num; size_t replica_num;
size_t mark_segment_size; size_t mark_segment_size;
UInt64 protocol_version;
void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const; void serialize(WriteBuffer & out, UInt64 initiator_protocol_version) const;
String describe(); String describe();